本文主要研究一下puma的ClientPositionService

ClientPositionService

puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java

public interface ClientPositionService {    List<ClientPositionEntity> findAll();    ClientPositionEntity find(String clientName);    void update(ClientPositionEntity clientPositionEntity, boolean flush);    void flush();    void cleanUpTestClients();}
  • ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法

ClientPositionServiceImpl

puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java

@Servicepublic class ClientPositionServiceImpl implements ClientPositionService {    private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class);    @Autowired    private ClientPositionDao clientPositionDao;    private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>();    @Override    public List<ClientPositionEntity> findAll() {        return clientPositionDao.findAll();    }    @Override    public ClientPositionEntity find(String clientName) {        return clientPositionDao.findByClientName(clientName);    }    @Override    public void update(ClientPositionEntity clientPositionEntity, boolean flush) {        if (flush) {            positionEntityMap.remove(clientPositionEntity.getClientName());            insertOrUpdate(clientPositionEntity);        } else {            positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity);        }    }    @Scheduled(fixedDelay = 5000)    public void flush() {        Set<String> keys = positionEntityMap.keySet();        for (String key : keys) {            ClientPositionEntity entity = positionEntityMap.remove(key);            if (entity == null) {                continue;            }            insertOrUpdate(entity);        }    }    private void insertOrUpdate(ClientPositionEntity entity) {        try {            entity.setUpdateTime(new Date());            int updateRow = clientPositionDao.update(entity);            if (updateRow == 0) {                clientPositionDao.insert(entity);            }        } catch (Exception e) {            logger.error(e.getMessage(), e);        }    }    public void cleanUpTestClients() {        List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient();        for (ClientPositionEntity entity : clients) {            clientPositionDao.delete(entity.getId());        }    }}
  • ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)

ClientPositionDao

puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java

public interface ClientPositionDao {    List<ClientPositionEntity> findAll();    ClientPositionEntity findByClientName(String clientName);    int update(ClientPositionEntity entity);    int insert(ClientPositionEntity entity);    int delete(int id);    List<ClientPositionEntity> findOldTestClient();}
  • ClientPositionDao定义了findAll、findByClientName、update、insert、delete、findOldTestClient方法

ClientPositionEntity

puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.java

public class ClientPositionEntity extends BaseEntity {    private String clientName;    private String binlogFile;    private long binlogPosition;    private long serverId;    private int eventIndex;    private long timestamp;    //......}
  • ClientPositionEntity继承了BaseEntity,定义了clientName、binlogFile、binlogPosition、serverId、eventIndex、timestamp属性

ClientPositionMapper

puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xml

<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"        "http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao">    <select id="findAll" resultType="ClientPositionEntity">        SELECT * FROM ClientPosition    </select>    <select id="findByClientName" resultType="ClientPositionEntity">        SELECT * FROM ClientPosition where ClientName = #{clientName}    </select>    <update id="update" parameterType="ClientPositionEntity">        update ClientPosition        set        BinlogFile = #{binlogFile},        BinlogPosition = #{binlogPosition},        ServerId = #{serverId},        EventIndex = #{eventIndex},        Timestamp = #{timestamp},        UpdateTime = #{updateTime}        where        ClientName = #{clientName}    </update>    <insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id">        insert into ClientPosition        (        ClientName,        BinlogFile,        BinlogPosition,        ServerId,        EventIndex,        Timestamp,        UpdateTime        )        values        (        #{clientName},        #{binlogFile},        #{binlogPosition},        #{serverId},        #{eventIndex},        #{timestamp},        #{updateTime}        )    </insert>    <select id="findOldTestClient" resultType="ClientPositionEntity">        select * from ClientPosition        where UpdateTime &lt; NOW() - INTERVAL 10 DAY        and ClientName like '%test'    </select>    <delete id="delete" parameterType="int">        delete from ClientPosition where id = #{id}    </delete></mapper>
  • ClientPositionMapper实现了ClientPositionDao定义的方法

小结

ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法;ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)

doc

  • ClientPositionService