聊聊puma的ClientPositionService

51次阅读

共计 4897 个字符,预计需要花费 13 分钟才能阅读完成。

本文主要研究一下 puma 的 ClientPositionService

ClientPositionService

puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java

public interface ClientPositionService {List<ClientPositionEntity> findAll();

    ClientPositionEntity find(String clientName);

    void update(ClientPositionEntity clientPositionEntity, boolean flush);

    void flush();

    void cleanUpTestClients();}
  • ClientPositionService 定义了 findAll、find、update、flush、cleanUpTestClients 方法

ClientPositionServiceImpl

puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java

@Service
public class ClientPositionServiceImpl implements ClientPositionService {private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class);

    @Autowired
    private ClientPositionDao clientPositionDao;

    private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>();

    @Override
    public List<ClientPositionEntity> findAll() {return clientPositionDao.findAll();
    }

    @Override
    public ClientPositionEntity find(String clientName) {return clientPositionDao.findByClientName(clientName);
    }

    @Override
    public void update(ClientPositionEntity clientPositionEntity, boolean flush) {if (flush) {positionEntityMap.remove(clientPositionEntity.getClientName());
            insertOrUpdate(clientPositionEntity);
        } else {positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity);
        }
    }

    @Scheduled(fixedDelay = 5000)
    public void flush() {Set<String> keys = positionEntityMap.keySet();
        for (String key : keys) {ClientPositionEntity entity = positionEntityMap.remove(key);
            if (entity == null) {continue;}
            insertOrUpdate(entity);
        }
    }

    private void insertOrUpdate(ClientPositionEntity entity) {
        try {entity.setUpdateTime(new Date());
            int updateRow = clientPositionDao.update(entity);
            if (updateRow == 0) {clientPositionDao.insert(entity);
            }
        } catch (Exception e) {logger.error(e.getMessage(), e);
        }
    }

    public void cleanUpTestClients() {List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient();
        for (ClientPositionEntity entity : clients) {clientPositionDao.delete(entity.getId());
        }
    }
}
  • ClientPositionServiceImpl 实现了 ClientPositionService 接口,其 findAll 方法执行 clientPositionDao.findAll();其 find 方法执行 clientPositionDao.findByClientName(clientName);其 update 方法在 flush 为 true 时执行 positionEntityMap.remove 及 insertOrUpdate,在 flush 为 false 时执行 positionEntityMap.put;其 flush 方法遍历 positionEntityMap,挨个移除,然后执行 insertOrUpdate(entity)

ClientPositionDao

puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java

public interface ClientPositionDao {List<ClientPositionEntity> findAll();

    ClientPositionEntity findByClientName(String clientName);

    int update(ClientPositionEntity entity);

    int insert(ClientPositionEntity entity);

    int delete(int id);

    List<ClientPositionEntity> findOldTestClient();}
  • ClientPositionDao 定义了 findAll、findByClientName、update、insert、delete、findOldTestClient 方法

ClientPositionEntity

puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.java

public class ClientPositionEntity extends BaseEntity {

    private String clientName;

    private String binlogFile;

    private long binlogPosition;

    private long serverId;

    private int eventIndex;

    private long timestamp;

    //......

}
  • ClientPositionEntity 继承了 BaseEntity,定义了 clientName、binlogFile、binlogPosition、serverId、eventIndex、timestamp 属性

ClientPositionMapper

puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao">

    <select id="findAll" resultType="ClientPositionEntity">
        SELECT * FROM ClientPosition
    </select>

    <select id="findByClientName" resultType="ClientPositionEntity">
        SELECT * FROM ClientPosition where ClientName = #{clientName}
    </select>

    <update id="update" parameterType="ClientPositionEntity">
        update ClientPosition
        set
        BinlogFile = #{binlogFile},
        BinlogPosition = #{binlogPosition},
        ServerId = #{serverId},
        EventIndex = #{eventIndex},
        Timestamp = #{timestamp},
        UpdateTime = #{updateTime}
        where
        ClientName = #{clientName}
    </update>

    <insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id">
        insert into ClientPosition
        (
        ClientName,
        BinlogFile,
        BinlogPosition,
        ServerId,
        EventIndex,
        Timestamp,
        UpdateTime
        )
        values
        (#{clientName},
        #{binlogFile},
        #{binlogPosition},
        #{serverId},
        #{eventIndex},
        #{timestamp},
        #{updateTime}
        )
    </insert>

    <select id="findOldTestClient" resultType="ClientPositionEntity">
        select * from ClientPosition
        where UpdateTime &lt; NOW() - INTERVAL 10 DAY
        and ClientName like '%test'
    </select>

    <delete id="delete" parameterType="int">
        delete from ClientPosition where id = #{id}
    </delete>

</mapper>
  • ClientPositionMapper 实现了 ClientPositionDao 定义的方法

小结

ClientPositionService 定义了 findAll、find、update、flush、cleanUpTestClients 方法;ClientPositionServiceImpl 实现了 ClientPositionService 接口,其 findAll 方法执行 clientPositionDao.findAll();其 find 方法执行 clientPositionDao.findByClientName(clientName);其 update 方法在 flush 为 true 时执行 positionEntityMap.remove 及 insertOrUpdate,在 flush 为 false 时执行 positionEntityMap.put;其 flush 方法遍历 positionEntityMap,挨个移除,然后执行 insertOrUpdate(entity)

doc

  • ClientPositionService

正文完
 0