共计 4897 个字符,预计需要花费 13 分钟才能阅读完成。
序
本文主要研究一下 puma 的 ClientPositionService
ClientPositionService
puma/puma/src/main/java/com/dianping/puma/biz/service/ClientPositionService.java
public interface ClientPositionService {List<ClientPositionEntity> findAll();
ClientPositionEntity find(String clientName);
void update(ClientPositionEntity clientPositionEntity, boolean flush);
void flush();
void cleanUpTestClients();}
- ClientPositionService 定义了 findAll、find、update、flush、cleanUpTestClients 方法
ClientPositionServiceImpl
puma/puma/src/main/java/com/dianping/puma/biz/service/impl/ClientPositionServiceImpl.java
@Service
public class ClientPositionServiceImpl implements ClientPositionService {private final static Logger logger = LoggerFactory.getLogger(ClientPositionServiceImpl.class);
@Autowired
private ClientPositionDao clientPositionDao;
private Map<String, ClientPositionEntity> positionEntityMap = new ConcurrentHashMap<String, ClientPositionEntity>();
@Override
public List<ClientPositionEntity> findAll() {return clientPositionDao.findAll();
}
@Override
public ClientPositionEntity find(String clientName) {return clientPositionDao.findByClientName(clientName);
}
@Override
public void update(ClientPositionEntity clientPositionEntity, boolean flush) {if (flush) {positionEntityMap.remove(clientPositionEntity.getClientName());
insertOrUpdate(clientPositionEntity);
} else {positionEntityMap.put(clientPositionEntity.getClientName(), clientPositionEntity);
}
}
@Scheduled(fixedDelay = 5000)
public void flush() {Set<String> keys = positionEntityMap.keySet();
for (String key : keys) {ClientPositionEntity entity = positionEntityMap.remove(key);
if (entity == null) {continue;}
insertOrUpdate(entity);
}
}
private void insertOrUpdate(ClientPositionEntity entity) {
try {entity.setUpdateTime(new Date());
int updateRow = clientPositionDao.update(entity);
if (updateRow == 0) {clientPositionDao.insert(entity);
}
} catch (Exception e) {logger.error(e.getMessage(), e);
}
}
public void cleanUpTestClients() {List<ClientPositionEntity> clients = clientPositionDao.findOldTestClient();
for (ClientPositionEntity entity : clients) {clientPositionDao.delete(entity.getId());
}
}
}
- ClientPositionServiceImpl 实现了 ClientPositionService 接口,其 findAll 方法执行 clientPositionDao.findAll();其 find 方法执行 clientPositionDao.findByClientName(clientName);其 update 方法在 flush 为 true 时执行 positionEntityMap.remove 及 insertOrUpdate,在 flush 为 false 时执行 positionEntityMap.put;其 flush 方法遍历 positionEntityMap,挨个移除,然后执行 insertOrUpdate(entity)
ClientPositionDao
puma/biz/src/main/java/com/dianping/puma/biz/dao/ClientPositionDao.java
public interface ClientPositionDao {List<ClientPositionEntity> findAll();
ClientPositionEntity findByClientName(String clientName);
int update(ClientPositionEntity entity);
int insert(ClientPositionEntity entity);
int delete(int id);
List<ClientPositionEntity> findOldTestClient();}
- ClientPositionDao 定义了 findAll、findByClientName、update、insert、delete、findOldTestClient 方法
ClientPositionEntity
puma/biz/src/main/java/com/dianping/puma/biz/entity/ClientPositionEntity.java
public class ClientPositionEntity extends BaseEntity {
private String clientName;
private String binlogFile;
private long binlogPosition;
private long serverId;
private int eventIndex;
private long timestamp;
//......
}
- ClientPositionEntity 继承了 BaseEntity,定义了 clientName、binlogFile、binlogPosition、serverId、eventIndex、timestamp 属性
ClientPositionMapper
puma/biz/src/main/resources/sqlmap/ClientPositionMapper.xml
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.dianping.puma.biz.dao.ClientPositionDao">
<select id="findAll" resultType="ClientPositionEntity">
SELECT * FROM ClientPosition
</select>
<select id="findByClientName" resultType="ClientPositionEntity">
SELECT * FROM ClientPosition where ClientName = #{clientName}
</select>
<update id="update" parameterType="ClientPositionEntity">
update ClientPosition
set
BinlogFile = #{binlogFile},
BinlogPosition = #{binlogPosition},
ServerId = #{serverId},
EventIndex = #{eventIndex},
Timestamp = #{timestamp},
UpdateTime = #{updateTime}
where
ClientName = #{clientName}
</update>
<insert id="insert" parameterType="ClientPositionEntity" useGeneratedKeys="true" keyProperty="id">
insert into ClientPosition
(
ClientName,
BinlogFile,
BinlogPosition,
ServerId,
EventIndex,
Timestamp,
UpdateTime
)
values
(#{clientName},
#{binlogFile},
#{binlogPosition},
#{serverId},
#{eventIndex},
#{timestamp},
#{updateTime}
)
</insert>
<select id="findOldTestClient" resultType="ClientPositionEntity">
select * from ClientPosition
where UpdateTime < NOW() - INTERVAL 10 DAY
and ClientName like '%test'
</select>
<delete id="delete" parameterType="int">
delete from ClientPosition where id = #{id}
</delete>
</mapper>
- ClientPositionMapper 实现了 ClientPositionDao 定义的方法
小结
ClientPositionService 定义了 findAll、find、update、flush、cleanUpTestClients 方法;ClientPositionServiceImpl 实现了 ClientPositionService 接口,其 findAll 方法执行 clientPositionDao.findAll();其 find 方法执行 clientPositionDao.findByClientName(clientName);其 update 方法在 flush 为 true 时执行 positionEntityMap.remove 及 insertOrUpdate,在 flush 为 false 时执行 positionEntityMap.put;其 flush 方法遍历 positionEntityMap,挨个移除,然后执行 insertOrUpdate(entity)
doc
- ClientPositionService