序
本文次要钻研一下 rocketmq-streams 的 ILeaseService
ILeaseService
/**
* 通过 db 实现租约和锁,能够更轻量级,缩小其余中间件的依赖 应用主备场景,只有一个实例运行,当以后实例挂掉,在肯定工夫内,会被其余实例接手 也能够用于全局锁
*/
public interface ILeaseService {
/**
* 默认锁定工夫
*/
static final int DEFALUT_LOCK_TIME = 60 * 5;
/**
* 查看某用户以后工夫是否具备租约。这个办法是纯内存操作,无性能开销
*
* @return true,租约无效;false,租约有效
*/
boolean hasLease(String name);
/**
* 申请租约,会启动一个线程,不停申请租约,直到申请胜利。申请胜利后,每 租期 /2 续约。如果目前被其余租户获取租约,只有在对方租约生效,后才容许新的租户获取租约
*
* @param name 租约名称,无特殊要求,雷同名称会竞争租约
*/
void startLeaseTask(String name);
/**
* 申请租约,会启动一个线程,不停申请租约,直到申请胜利。申请胜利后,每 租期 /2 续约。如果目前被其余租户获取租约,只有在对方租约生效,后才容许新的租户获取租约
*
* @param name 租约名称,无特殊要求,雷同名称会竞争租约
* @param callback 当第一获取租约时,回调此函数
*/
void startLeaseTask(final String name, ILeaseGetCallback callback);
/**
* 申请租约,会启动一个线程,不停申请租约,直到申请胜利。申请胜利后,每 租期 /2 续约。如果目前被其余租户获取租约,只有在对方租约生效,后才容许新的租户获取租约
*
* @param name 租约名称,无特殊要求,雷同名称会竞争租约
* @param leaseTermSecond 租期,在租期内能够做业务解决,单位是秒
* @param callback 当第一获取租约时,回调此函数
*/
void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);
/**
* 申请锁, 无论胜利与否,立即返回。如果不开释,最大锁定工夫是 5 分钟
*
* @param name 业务名称
* @param lockerName 锁名称
* @return 是否桎梏胜利
*/
boolean lock(String name, String lockerName);
/**
* 申请锁, 无论胜利与否,立即返回。默认锁定工夫是 5 分钟
*
* @param name 业务名称
* @param lockerName 锁名称
* @param lockTimeSecond 如果不开释,锁定的最大工夫,单位是秒
* @return 是否桎梏胜利
* @return
*/
boolean lock(String name, String lockerName, int lockTimeSecond);
/**
* 申请锁,如果没有则期待,等待时间能够指定,如果是-1 则有限期待。如果不开释,最大锁定工夫是 5 分钟
*
* @param name 业务名称
* @param lockerName 锁名称
* @param waitTime 没获取锁时,最大期待多长时间,如果是-1 则有限期待
* @return 是否桎梏胜利
*/
boolean tryLocker(String name, String lockerName, long waitTime);
/**
* 申请锁,如果没有则期待,等待时间能够指定,如果是-1 则有限期待。如果不开释,最大锁定工夫是 lockTimeSecond
*
* @param name 业务名称
* @param lockerName 锁名称
* @param waitTime 没获取锁时,最大期待多长时间,如果是-1 则有限期待
* @param lockTimeSecond 如果不开释,锁定的最大工夫,单位是秒
* @return 是否桎梏胜利
*/
boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);
/**
* 开释锁
*
* @param name
* @param lockerName
* @return
*/
boolean unlock(String name, String lockerName);
/**
* 对于曾经获取锁的,能够通过这个办法,始终持有锁。和租约的区别是,当开释锁后,无其余实例抢占。无奈实现主备模式
*
* @param name 业务名称
* @param lockerName 锁名称
* @param lockTimeSecond 租期,这个办法会主动续约,如果不被动开释,会始终持有锁
* @return 是否胜利获取锁
*/
boolean holdLock(String name, String lockerName, int lockTimeSecond);
/**
* 是否持有锁,不会申请锁。如果以前申请过,且未过期,返回 true,否则返回 false
*
* @param name 业务名称
* @param lockerName 锁名称
* @return
*/
boolean hasHoldLock(String name, String lockerName);
List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);
}
- ILeaseService 接口定义了 hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix 办法
BasedLesaseImpl
public abstract class BasedLesaseImpl implements ILeaseService {private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);
private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";
private static final AtomicBoolean syncStart = new AtomicBoolean(false);
private static final int synTime = 120; // 5 分钟的一致性 hash 同步工夫太久了,改为 2 分钟
protected ScheduledExecutorService taskExecutor = null;
protected int leaseTerm = 300 * 2; // 租约工夫
// protected transient JDBCDriver jdbcDataSource = null;
protected ILeaseStorage leaseStorage;
protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>(); // 每个 lease name 对应的租约到期工夫
public BasedLesaseImpl() {taskExecutor = new ScheduledThreadPoolExecutor(10);
}
/**
* lease_name: consistent_hash_ip, lease_user_ip: ip, 定时刷新 lease_info 表,查看一致性 hash 环的节点状况
*
* @param name
* @return
*/
@Override
public boolean hasLease(String name) {
// 内存中没有租约信息则示意 没有租约
Date leaseEndTime = leaseName2Date.get(name);
if (leaseEndTime == null) {// LOG.info("内存中依据" + name + "没有查问到租约信息,示意没有租约");
return false;
}
// LOG.info("查问是否有租约 name:" + name + ", 以后工夫:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
// + "租约到期工夫" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));
// 有租约工夫,并且租约工夫大于以后工夫,示意有租约信息
if (new Date().before(leaseEndTime)) {return true;}
return false;
}
private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();
@Override
public void startLeaseTask(final String name) {startLeaseTask(name, this.leaseTerm, null);
}
@Override
public void startLeaseTask(final String name, ILeaseGetCallback callback) {startLeaseTask(name, this.leaseTerm, callback);
}
@Override
public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);
startLeaseTask(name, applyTask, leaseTerm / 2, true);
}
/**
* 启动定时器,定时执行工作,确保工作可重入
*
* @param name
* @param runnable 具体任务
* @param scheduleTime 调度工夫
* @param startNow 是否立即启动一次
*/
protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {AtomicBoolean isStartLease = startLeaseMap.get(name);// 屡次调用,只启动一次定时工作
if (isStartLease == null) {synchronized (this) {isStartLease = startLeaseMap.get(name);
if (isStartLease == null) {isStartLease = new AtomicBoolean(false);
startLeaseMap.put(name, isStartLease);
}
}
}
if (isStartLease.compareAndSet(false, true)) {if (startNow) {runnable.run();
}
taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);
}
}
//......
}
- BasedLesaseImpl 申明实现了 ILeaseService,它依赖 ILeaseStorage,startLeaseTask 办法会创立 ApplyTask,而后以固定距离调度执行
ApplyTask
/**
* 续约工作
*/
protected class ApplyTask implements Runnable {
protected String name;
protected int leaseTerm;
protected ILeaseGetCallback callback;
public ApplyTask(int leaseTerm, String name) {this(leaseTerm, name, null);
}
public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {
this.name = name;
this.leaseTerm = leaseTerm;
this.callback = callback;
}
@Override
public void run() {
try {// LOG.info("LeaseServiceImpl name:" + name + "开始获取租约...");
AtomicBoolean newApplyLease = new AtomicBoolean(false);
Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);
if (leaseDate != null) {leaseName2Date.put(name, leaseDate);
LOG.info("LeaseServiceImpl, name:" + name + "" + getSelfUser() +" 获取租约胜利, 租约到期工夫为 "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));
} else {
// fix.2020.08.13 这时 name 对应的租约可能还在有效期内, 或者本机还持有租约,须要 remove
// leaseName2Date.remove(name);
LOG.info("LeaseServiceImpl name:" + name + "" + getSelfUser() +" 获取租约失败 ");
}
if (newApplyLease.get() && callback != null) {callback.callback(leaseDate);
}
} catch (Exception e) {LOG.error("LeaseServiceImpl name:" + name + "" + getSelfUser() +" 获取租约出现异常 ", e);
}
}
}
/**
* 申请租约,如果当期租约无效,间接更新一个租约周期,如果以后租约有效,先查问是否有无效的租约,如果有申请失败,否则间接申请租约
*/
protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {
// 计算下一次租约工夫 = 以后工夫 + 租约时长
Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);
// 1 如果曾经有租约,则更新租约工夫 (内存和数据库) 即可
if (hasLease(name)) {// LOG.info("用户已有租约,更新数据库和内存中的租约信息");
// 更新数据库
LeaseInfo leaseInfo = queryValidateLease(name);
if (leaseInfo == null) {LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");
return null;
}
// fix.2020.08.13, 与本机 ip 相等且满足一致性 hash 调配策略,才续约,其余状况为 null
String leaseUserIp = leaseInfo.getLeaseUserIp();
if (!leaseUserIp.equals(getSelfUser())) {return null;}
leaseInfo.setLeaseEndDate(nextLeaseDate);
updateLeaseInfo(leaseInfo);
return nextLeaseDate;
}
// 2 没有租约状况 判断是否能够获取租约,只有租约没有被其他人获取,则阐明有无效租约
boolean success = canGetLease(name);
if (!success) { // 示意被其余机器获取到了无效的租约
// LOG.info("其余机器获取到了无效的租约");
return null;
}
// 3 没有租约而且能够获取租约的状况,则尝试应用数据库原子更新的形式获取租约,保障只有一台机器胜利获取租约,而且能够运行
boolean flag = tryGetLease(name, nextLeaseDate);
if (flag) { // 获取租约胜利
newApplyLease.set(true);
return nextLeaseDate;
}
return null;
}
- ApplyTask 外部调用 applyLeaseTask,如果已有租约则更新租约工夫,没有租约则判断是否能够获取租约,能够则执行 tryGetLease
tryGetLease
/**
* 更新数据库,占用租期并更新租期工夫
*
* @param time
*/
protected boolean tryGetLease(String name, Date time) {
// LOG.info("尝试获取租约 lease name is :" + name + "下次到期工夫:"
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));
LeaseInfo validateLeaseInfo = queryValidateLease(name);
if (validateLeaseInfo == null) {// 这里有两种状况 1 数据库外面没有租约信息 2 数据库外面有租约信息然而曾经过期
Integer count = countLeaseInfo(name);
if (count == null || count == 0) {// 示意当初数据库外面没有任何租约信息,插入租约胜利则示意获取胜利,失败示意在这一时刻其余机器获取了租约
// LOG.info("数据库中临时没有租约信息,尝试原子插入租约:" + name);
// fix.2020.08.13, 通过一致性 hash 计算,该名字的工作不应该在本机执行,间接返回,无需插入。只有调配到 hash 执行权限的机器才能够插入并获取租约
if (!getSelfUser().equals(getConsistentHashHost(name))) {return false;}
validateLeaseInfo = new LeaseInfo();
validateLeaseInfo.setLeaseName(name);
validateLeaseInfo.setLeaseUserIp(getSelfUser());
validateLeaseInfo.setLeaseEndDate(time);
validateLeaseInfo.setStatus(1);
validateLeaseInfo.setVersion(1);
if (insert(validateLeaseInfo)) {LOG.info("数据库中临时没有租约信息,原子插入胜利,获取租约胜利:" + name);
return true;
} else {LOG.info("数据库中临时没有租约信息,原子插入失败,曾经被其余机器获取租约:" + name);
return false;
}
} else { // 示意数据库外面有一条然而有效,这里须要两台机器依照 version 进行原子更新,更新胜利的获取租约
// LOG.info("数据库中有一条有效的租约信息,尝试依据版本号去原子更新租约信息:" + name);
LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);
if (inValidateLeaseInfo == null) {// 阐明这个时候另外一台机器获取胜利了
LOG.info("另外一台机器获取胜利了租约:" + name);
return false;
}
// fix.2020.08.13, 机器重启之后,该名字的工作曾经不调配在此机器上执行,间接返回,无需更新数据库
if (!getSelfUser().equals(getConsistentHashHost(name))) {return false;}
inValidateLeaseInfo.setLeaseName(name);
inValidateLeaseInfo.setLeaseUserIp(getSelfUser());
inValidateLeaseInfo.setLeaseEndDate(time);
inValidateLeaseInfo.setStatus(1);
boolean success = updateDBLeaseInfo(inValidateLeaseInfo);
if (success) {LOG.info("LeaseServiceImpl 原子更新租约胜利,以后机器获取到了租约信息:" + name);
} else {LOG.info("LeaseServiceImpl 原子更新租约失败,租约被其余机器获取:" + name);
}
return success;
}
} else { // 判断是否是本人获取了租约,如果是本人获取了租约则更新工夫(内存和数据库),// 这里是为了解决机器重启的状况,机器重启,内存中没有租约信息,然而实际上该用户是有租约权限的
// fix.2020.08.13, 租约的 ip 与本机 ip 相等,且满足一致性 hash 策略,才会被本机执行
String leaseUserIp = validateLeaseInfo.getLeaseUserIp();
if (leaseUserIp.equals(getSelfUser())) {
// 如果当期用户有租约信息,则更新数据库
validateLeaseInfo.setLeaseEndDate(time);
boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);
if (hasUpdate) {
LOG.info("LeaseServiceImpl 机器重启状况,以后用户有租约信息,并且更新数据库胜利,租约信息为 name :" + validateLeaseInfo.getLeaseName()
+ "ip :" + validateLeaseInfo.getLeaseUserIp() + "到期工夫 :" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return true;
} else {LOG.info("LeaseServiceImpl 机器重启状况,以后用户有租约信息,并且更新数据库失败,示意失去租约:" + name);
return false;
}
}
// LOG.info("LeaseServiceImpl 租约被其余机器获取,租约信息为 name :" + validateLeaseInfo.getLeaseName() + "ip :"
// + validateLeaseInfo.getLeaseUserIp() + "到期工夫 :"
// + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));
return false;
}
}
protected LeaseInfo queryValidateLease(String name) {//String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "'and status=1 and lease_end_time>now()";
//// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);
//return queryLease(name, sql);
return leaseStorage.queryValidateLease(name);
}
- tryGetLease 先通过 queryValidateLease 查问租约信息,若没有租约则插入,若过期则依据版本号更新,若已有租约则判断是否是本人获取了租约,是则更新租约信息
LeaseServiceImpl
public class LeaseServiceImpl extends BasedLesaseImpl {private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);
private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();
protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();
// 如果是抢占锁状态中,则不容许申请锁
public LeaseServiceImpl() {super();
}
/**
* 尝试获取锁,能够期待 waitTime,如果到点未返回,则间接返回。如果是 -1,则始终期待
*
* @param name 业务名称
* @param lockerName 锁名称
* @param waitTime 等待时间,是微秒单位
* @return
*/
@Override
public boolean tryLocker(String name, String lockerName, long waitTime) {return tryLocker(name, lockerName, waitTime, ILeaseService.DEFALUT_LOCK_TIME);
}
@Override
public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {long now = System.currentTimeMillis();
boolean success = lock(name, lockerName, lockTimeSecond);
while (!success) {if (waitTime > -1 && (System.currentTimeMillis() - now > waitTime)) {break;}
success = lock(name, lockerName, lockTimeSecond);
if (success) {return success;}
try {Thread.sleep(100);
} catch (InterruptedException e) {LOG.error("LeaseServiceImpl try locker error", e);
}
}
return success;
}
@Override
public boolean lock(String name, String lockerName) {return lock(name, lockerName, ILeaseService.DEFALUT_LOCK_TIME);
}
@Override
public boolean lock(String name, String lockerName, int leaseSecond) {lockerName = createLockName(name, lockerName);
Future future = seizeLockingFuntures.get(lockerName);
if (future != null && ((HoldLockFunture)future).isDone == false) {return false;}
Date nextLeaseDate =
DateUtil.addSecond(new Date(), leaseSecond);// 默认锁定 5 分钟,用完须要立即开释. 如果工夫不同步,可能导致锁失败
return tryGetLease(lockerName, nextLeaseDate);
}
@Override
public boolean unlock(String name, String lockerName) {// LOG.info("LeaseServiceImpl unlock,name:" + name);
lockerName = createLockName(name, lockerName);
LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);
if (validateLeaseInfo == null) {LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);
}
if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {validateLeaseInfo.setStatus(0);
updateDBLeaseInfo(validateLeaseInfo);
}
HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);
if (holdLockTask != null) {holdLockTask.close();
}
leaseName2Date.remove(lockerName);
return false;
}
/**
* 如果有锁,则始终持有,如果不能获取,则完结。和租约不同,租约是没有也会尝试重试,一备对方挂机,本人能够接手工作
*
* @param name
* @param secondeName
* @param lockTimeSecond 获取锁的工夫
* @return
*/
@Override
public boolean holdLock(String name, String secondeName, int lockTimeSecond) {if (hasHoldLock(name, secondeName)) {return true;}
synchronized (this) {if (hasHoldLock(name, secondeName)) {return true;}
String lockerName = createLockName(name, secondeName);
Date nextLeaseDate =
DateUtil.addSecond(new Date(), lockTimeSecond);
boolean success = tryGetLease(lockerName, nextLeaseDate);// 申请锁,锁的工夫是 leaseTerm
if (!success) {return false;}
leaseName2Date.put(lockerName, nextLeaseDate);
if (!holdLockTasks.containsKey(lockerName)) {HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);
holdLockTask.start();
holdLockTasks.putIfAbsent(lockerName, holdLockTask);
}
}
return true;
}
/**
* 是否持有锁,不拜访数据库,间接看本地
*
* @param name
* @param secondeName
* @return
*/
@Override
public boolean hasHoldLock(String name, String secondeName) {String lockerName = createLockName(name, secondeName);
return hasLease(lockerName);
}
@Override
public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);
return queryValidateLeaseByNamePrefix(leaseNamePrefix);
}
//......
}
- LeaseServiceImpl 继承了 BasedLesaseImpl,tryLocker 办法会依据等待时间循环执行 lock,lock 办法则执行 tryGetLease,unlock 办法则更新租约信息,同时移除内存记录;holdLock 则通过 hasHoldLock 判断是否持有锁,若有则返回,没有则执行 tryGetLease
ILeaseStorage
public interface ILeaseStorage {
/**
* 更新 lease info,须要是原子操作,存储保障多线程操作的原子性
*
* @param leaseInfo 租约表数据
* @return
*/
boolean updateLeaseInfo(LeaseInfo leaseInfo);
/**
* 统计这个租约名称下,LeaseInfo 对象个数
*
* @param leaseName 租约名称,无特殊要求,雷同名称会竞争租约
* @return
*/
Integer countLeaseInfo(String leaseName);
/**
* 查问有效的的租约
*
* @param leaseName 租约名称,无特殊要求,雷同名称会竞争租约
* @return
*/
LeaseInfo queryInValidateLease(String leaseName);
/**
* 查问无效的的租约
*
* @param leaseName 租约名称,无特殊要求,雷同名称会竞争租约
* @return
*/
LeaseInfo queryValidateLease(String leaseName);
/**
* 按前缀查问无效的租约信息
*
* @param namePrefix
* @return
*/
List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);
/**
* 减少租约
*
* @param leaseInfo 租约名称,无特殊要求,雷同名称会竞争租约
*/
void addLeaseInfo(LeaseInfo leaseInfo);
}
- ILeaseStorage 接口定义了 updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo 办法
DBLeaseStorage
public class DBLeaseStorage implements ILeaseStorage {private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);
protected JDBCDriver jdbcDataSource;
private String url;
protected String userName;
protected String password;
protected String jdbc;
public DBLeaseStorage(String jdbc, String url, String userName, String password) {
this.jdbc = jdbc;
this.url = url;
this.userName = userName;
this.password = password;
jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);
}
@Override
public boolean updateLeaseInfo(LeaseInfo leaseInfo) {String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";
String whereSQL = "WHERE id=#{id} and version=#{version}";
if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {sql += ",lease_name=#{leaseName}";
}
if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {sql += ",lease_user_ip=#{leaseUserIp}";
}
if (leaseInfo.getLeaseEndDate() != null) {sql += ",lease_end_time=#{leaseEndDate}";
}
sql += whereSQL;
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {int count = getOrCreateJDBCDataSource().update(sql);
boolean success = count > 0;
if (success) {synchronized (this) {leaseInfo.setVersion(leaseInfo.getVersion() + 1);
}
} else {System.out.println(count);
}
return success;
} catch (Exception e) {LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);
throw new RuntimeException("execute sql error" + sql, e);
}
}
@Override
public Integer countLeaseInfo(String leaseName) {String sql = "SELECT count(*) as c FROM lease_info WHERE lease_name ='" + leaseName + "'and status = 1";
try {List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {return null;}
Long value = (Long) rows.get(0).get("c");
return value.intValue();} catch (Exception e) {throw new RuntimeException("execute sql error" + sql, e);
}
}
@Override
public LeaseInfo queryInValidateLease(String leaseName) {String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "'and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";
LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);
return queryLease(leaseName, sql);
}
@Override
public LeaseInfo queryValidateLease(String leaseName) {String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "'and status=1 and lease_end_time>now()";
return queryLease(leaseName, sql);
}
@Override
public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {String sql = "SELECT * FROM lease_info WHERE lease_name like'" + namePrefix + "%' and status=1 and lease_end_time>now()";
try {List<LeaseInfo> leaseInfos = new ArrayList<>();
List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {return null;}
for (Map<String, Object> row : rows) {LeaseInfo leaseInfo = convert(row);
leaseInfos.add(leaseInfo);
}
return leaseInfos;
} catch (Exception e) {throw new RuntimeException("execute sql error" + sql, e);
}
}
@Override
public void addLeaseInfo(LeaseInfo leaseInfo) {
String sql =
"REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"
+ "VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";
sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);
try {getOrCreateJDBCDataSource().execute(sql);
} catch (Exception e) {LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);
throw new RuntimeException("execute sql error" + sql, e);
}
}
protected JDBCDriver getOrCreateJDBCDataSource() {if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {synchronized (this) {if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {
this.jdbcDataSource =
DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);
}
}
}
return jdbcDataSource;
}
protected LeaseInfo queryLease(String name, String sql) {
try {List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);
if (rows == null || rows.size() == 0) {return null;}
return convert(rows.get(0));
} catch (Exception e) {throw new RuntimeException("execute sql error" + sql, e);
}
}
protected LeaseInfo convert(Map<String, Object> map) {LeaseInfo leaseInfo = new LeaseInfo();
leaseInfo.setId(getMapLongValue("id", map));
leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));
leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));
leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));
leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));
Integer status = getMapValue("status", map, Integer.class);
if (status != null) {leaseInfo.setStatus(status);
}
leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));
Long version = getMapLongValue("version", map);
if (version != null) {leaseInfo.setVersion(version);
}
return leaseInfo;
}
@SuppressWarnings("unchecked")
private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {Object value = map.get(fieldName);
if (value == null) {return null;}
return (T) value;
}
private Long getMapLongValue(String fieldName, Map<String, Object> map) {Object value = map.get(fieldName);
if (value == null) {return null;}
if (value instanceof Long) {return (Long) value;
}
if (value instanceof BigInteger) {return ((BigInteger) value).longValue();}
return null;
}
private Date getMapDateValue(String fieldName, Map<String, Object> map) {Object value = map.get(fieldName);
if (value == null) {return null;}
if (value instanceof Date) {return (Date) value;
}
if (value instanceof String) {return DateUtil.parseTime(((String) value));
}
return null;
}
}
- DBLeaseStorage 实现了 ILeaseStorage 接口,应用 jdbc 实现了其办法
小结
rocketmq-streams 的 LeaseService 基于 db 实现了租约和锁,可用于主备场景切换。