序
本文主要研究一下puma的ClientPositionService
ClientPositionService
puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java
public interface ClientPositionService { List<ClientPositionEntity> findAll(); ClientPositionEntity find(String clientName); void update(ClientPositionEntity clientPositionEntity, boolean flush); void flush(); void cleanUpTestClients();}
- ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法
ClientPositionServiceImpl
puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java
@Servicepublic class ClientPositionServiceImpl implements ClientPositionService { private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class); @Autowired private ClientPositionDao clientPositionDao; private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>(); @Override public List<ClientPositionEntity> findAll() { return clientPositionDao.findAll(); } @Override public ClientPositionEntity find(String clientName) { return clientPositionDao.findByClientName(clientName); } @Override public void update(ClientPositionEntity clientPositionEntity, boolean flush) { if (flush) { positionEntityMap.remove(clientPositionEntity.getClientName()); insertOrUpdate(clientPositionEntity); } else { positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity); } } @Scheduled(fixedDelay = 5000) public void flush() { Set<String> keys = positionEntityMap.keySet(); for (String key : keys) { ClientPositionEntity entity = positionEntityMap.remove(key); if (entity == null) { continue; } insertOrUpdate(entity); } } private void insertOrUpdate(ClientPositionEntity entity) { try { entity.setUpdateTime(new Date()); int updateRow = clientPositionDao.update(entity); if (updateRow == 0) { clientPositionDao.insert(entity); } } catch (Exception e) { logger.error(e.getMessage(), e); } } public void cleanUpTestClients() { List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient(); for (ClientPositionEntity entity : clients) { clientPositionDao.delete(entity.getId()); } }}
- ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)
ClientPositionDao
puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java
public interface ClientPositionDao { List<ClientPositionEntity> findAll(); ClientPositionEntity findByClientName(String clientName); int update(ClientPositionEntity entity); int insert(ClientPositionEntity entity); int delete(int id); List<ClientPositionEntity> findOldTestClient();}
- ClientPositionDao定义了findAll、findByClientName、update、insert、delete、findOldTestClient方法
ClientPositionEntity
puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.java
public class ClientPositionEntity extends BaseEntity { private String clientName; private String binlogFile; private long binlogPosition; private long serverId; private int eventIndex; private long timestamp; //......}
- ClientPositionEntity继承了BaseEntity,定义了clientName、binlogFile、binlogPosition、serverId、eventIndex、timestamp属性
ClientPositionMapper
puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xml
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao"> <select id="findAll" resultType="ClientPositionEntity"> SELECT * FROM ClientPosition </select> <select id="findByClientName" resultType="ClientPositionEntity"> SELECT * FROM ClientPosition where ClientName = #{clientName} </select> <update id="update" parameterType="ClientPositionEntity"> update ClientPosition set BinlogFile = #{binlogFile}, BinlogPosition = #{binlogPosition}, ServerId = #{serverId}, EventIndex = #{eventIndex}, Timestamp = #{timestamp}, UpdateTime = #{updateTime} where ClientName = #{clientName} </update> <insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id"> insert into ClientPosition ( ClientName, BinlogFile, BinlogPosition, ServerId, EventIndex, Timestamp, UpdateTime ) values ( #{clientName}, #{binlogFile}, #{binlogPosition}, #{serverId}, #{eventIndex}, #{timestamp}, #{updateTime} ) </insert> <select id="findOldTestClient" resultType="ClientPositionEntity"> select * from ClientPosition where UpdateTime < NOW() - INTERVAL 10 DAY and ClientName like '%test' </select> <delete id="delete" parameterType="int"> delete from ClientPosition where id = #{id} </delete></mapper>
- ClientPositionMapper实现了ClientPositionDao定义的方法
小结
ClientPositionService定义了findAll、find、update、flush、cleanUpTestClients方法;ClientPositionServiceImpl实现了ClientPositionService接口,其findAll方法执行clientPositionDao.findAll();其find方法执行clientPositionDao.findByClientName(clientName);其update方法在flush为true时执行positionEntityMap.remove及insertOrUpdate,在flush为false时执行positionEntityMap.put;其flush方法遍历positionEntityMap,挨个移除,然后执行insertOrUpdate(entity)
doc
- ClientPositionService