本文次要钻研一下rocketmq-streams的ILeaseService

ILeaseService

/** * 通过db实现租约和锁,能够更轻量级,缩小其余中间件的依赖 应用主备场景,只有一个实例运行,当以后实例挂掉,在肯定工夫内,会被其余实例接手 也能够用于全局锁 */public interface ILeaseService {    /**     * 默认锁定工夫     */    static final int DEFALUT_LOCK_TIME = 60 * 5;    /**     * 查看某用户以后工夫是否具备租约。这个办法是纯内存操作,无性能开销     *     * @return true,租约无效;false,租约有效     */    boolean hasLease(String name);    /**     * 申请租约,会启动一个线程,不停申请租约,直到申请胜利。 申请胜利后,每 租期/2 续约。 如果目前被其余租户获取租约,只有在对方租约生效,后才容许新的租户获取租约     *     * @param name 租约名称,无特殊要求,雷同名称会竞争租约     */    void startLeaseTask(String name);    /**     * 申请租约,会启动一个线程,不停申请租约,直到申请胜利。 申请胜利后,每 租期/2 续约。 如果目前被其余租户获取租约,只有在对方租约生效,后才容许新的租户获取租约     *     * @param name     租约名称,无特殊要求,雷同名称会竞争租约     * @param callback 当第一获取租约时,回调此函数     */    void startLeaseTask(final String name, ILeaseGetCallback callback);    /**     * 申请租约,会启动一个线程,不停申请租约,直到申请胜利。 申请胜利后,每 租期/2 续约。 如果目前被其余租户获取租约,只有在对方租约生效,后才容许新的租户获取租约     *     * @param name            租约名称,无特殊要求,雷同名称会竞争租约     * @param leaseTermSecond 租期,在租期内能够做业务解决,单位是秒     * @param callback        当第一获取租约时,回调此函数     */    void startLeaseTask(final String name, int leaseTermSecond, ILeaseGetCallback callback);    /**     * 申请锁,无论胜利与否,立即返回。如果不开释,最大锁定工夫是5分钟     *     * @param name       业务名称     * @param lockerName 锁名称     * @return 是否桎梏胜利     */    boolean lock(String name, String lockerName);    /**     * 申请锁,无论胜利与否,立即返回。默认锁定工夫是5分钟     *     * @param name           业务名称     * @param lockerName     锁名称     * @param lockTimeSecond 如果不开释,锁定的最大工夫,单位是秒     * @return 是否桎梏胜利     * @return     */    boolean lock(String name, String lockerName, int lockTimeSecond);    /**     * 申请锁,如果没有则期待,等待时间能够指定,如果是-1 则有限期待。如果不开释,最大锁定工夫是5分钟     *     * @param name       业务名称     * @param lockerName 锁名称     * @param waitTime   没获取锁时,最大期待多长时间,如果是-1 则有限期待     * @return 是否桎梏胜利     */    boolean tryLocker(String name, String lockerName, long waitTime);    /**     * 申请锁,如果没有则期待,等待时间能够指定,如果是-1 则有限期待。如果不开释,最大锁定工夫是lockTimeSecond     *     * @param name           业务名称     * @param lockerName     锁名称     * @param waitTime       没获取锁时,最大期待多长时间,如果是-1 则有限期待     * @param lockTimeSecond 如果不开释,锁定的最大工夫,单位是秒     * @return 是否桎梏胜利     */    boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond);    /**     * 开释锁     *     * @param name     * @param lockerName     * @return     */    boolean unlock(String name, String lockerName);    /**     * 对于曾经获取锁的,能够通过这个办法,始终持有锁。 和租约的区别是,当开释锁后,无其余实例抢占。无奈实现主备模式     *     * @param name           业务名称     * @param lockerName     锁名称     * @param lockTimeSecond 租期,这个办法会主动续约,如果不被动开释,会始终持有锁     * @return 是否胜利获取锁     */    boolean holdLock(String name, String lockerName, int lockTimeSecond);    /**     * 是否持有锁,不会申请锁。如果以前申请过,且未过期,返回true,否则返回false     *     * @param name       业务名称     * @param lockerName 锁名称     * @return     */    boolean hasHoldLock(String name, String lockerName);    List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix);}
  • ILeaseService接口定义了hasLease、startLeaseTask、lock、tryLocker、unlock、holdLock、hasHoldLock、queryLockedInstanceByNamePrefix办法

BasedLesaseImpl

public abstract class BasedLesaseImpl implements ILeaseService {    private static final Log LOG = LogFactory.getLog(BasedLesaseImpl.class);    private static final String CONSISTENT_HASH_PREFIX = "consistent_hash_";    private static final AtomicBoolean syncStart = new AtomicBoolean(false);    private static final int synTime = 120;  // 5分钟的一致性hash同步工夫太久了,改为2分钟    protected ScheduledExecutorService taskExecutor = null;    protected int leaseTerm = 300 * 2;                                  // 租约工夫    // protected transient JDBCDriver jdbcDataSource = null;    protected ILeaseStorage leaseStorage;    protected volatile Map<String, Date> leaseName2Date = new ConcurrentHashMap<>();    // 每个lease name对应的租约到期工夫    public BasedLesaseImpl() {        taskExecutor = new ScheduledThreadPoolExecutor(10);    }    /**     * lease_name: consistent_hash_ip, lease_user_ip: ip,定时刷新lease_info表,查看一致性hash环的节点状况     *     * @param name     * @return     */    @Override    public boolean hasLease(String name) {        // 内存中没有租约信息则示意 没有租约        Date leaseEndTime = leaseName2Date.get(name);        if (leaseEndTime == null) {            // LOG.info("内存中依据 " + name + "没有查问到租约信息,示意没有租约");            return false;        }        // LOG.info("查问是否有租约 name:" + name + " ,以后工夫:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())        // + " 租约到期工夫 " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseEndTime));        // 有租约工夫,并且租约工夫大于以后工夫,示意有租约信息        if (new Date().before(leaseEndTime)) {            return true;        }        return false;    }    private final Map<String, AtomicBoolean> startLeaseMap = new HashMap<>();    @Override    public void startLeaseTask(final String name) {        startLeaseTask(name, this.leaseTerm, null);    }    @Override    public void startLeaseTask(final String name, ILeaseGetCallback callback) {        startLeaseTask(name, this.leaseTerm, callback);    }    @Override    public void startLeaseTask(final String name, int leaseTerm, ILeaseGetCallback callback) {        ApplyTask applyTask = new ApplyTask(leaseTerm, name, callback);        startLeaseTask(name, applyTask, leaseTerm / 2, true);    }    /**     * 启动定时器,定时执行工作,确保工作可重入     *     * @param name     * @param runnable     具体任务     * @param scheduleTime 调度工夫     * @param startNow     是否立即启动一次     */    protected void startLeaseTask(final String name, Runnable runnable, int scheduleTime, boolean startNow) {        AtomicBoolean isStartLease = startLeaseMap.get(name);//屡次调用,只启动一次定时工作        if (isStartLease == null) {            synchronized (this) {                isStartLease = startLeaseMap.get(name);                if (isStartLease == null) {                    isStartLease = new AtomicBoolean(false);                    startLeaseMap.put(name, isStartLease);                }            }        }        if (isStartLease.compareAndSet(false, true)) {            if (startNow) {                runnable.run();            }            taskExecutor.scheduleWithFixedDelay(runnable, 0, scheduleTime, TimeUnit.SECONDS);        }    }    //......}
  • BasedLesaseImpl申明实现了ILeaseService,它依赖ILeaseStorage,startLeaseTask办法会创立ApplyTask,而后以固定距离调度执行

ApplyTask

    /**     * 续约工作     */    protected class ApplyTask implements Runnable {        protected String name;        protected int leaseTerm;        protected ILeaseGetCallback callback;        public ApplyTask(int leaseTerm, String name) {            this(leaseTerm, name, null);        }        public ApplyTask(int leaseTerm, String name, ILeaseGetCallback callback) {            this.name = name;            this.leaseTerm = leaseTerm;            this.callback = callback;        }        @Override        public void run() {            try {                // LOG.info("LeaseServiceImpl name: " + name + "开始获取租约...");                AtomicBoolean newApplyLease = new AtomicBoolean(false);                Date leaseDate = applyLeaseTask(leaseTerm, name, newApplyLease);                if (leaseDate != null) {                    leaseName2Date.put(name, leaseDate);                    LOG.info("LeaseServiceImpl, name: " + name + " " + getSelfUser() + " 获取租约胜利, 租约到期工夫为 "                        + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(leaseDate));                } else {                    // fix.2020.08.13 这时name对应的租约可能还在有效期内,或者本机还持有租约,须要remove                    //  leaseName2Date.remove(name);                    LOG.info("LeaseServiceImpl name: " + name + " " + getSelfUser() + " 获取租约失败 ");                }                if (newApplyLease.get() && callback != null) {                    callback.callback(leaseDate);                }            } catch (Exception e) {                LOG.error(" LeaseServiceImpl name: " + name + "  " + getSelfUser() + " 获取租约出现异常 ", e);            }        }    }    /**     * 申请租约,如果当期租约无效,间接更新一个租约周期,如果以后租约有效,先查问是否有无效的租约,如果有申请失败,否则间接申请租约     */    protected Date applyLeaseTask(int leaseTerm, String name, AtomicBoolean newApplyLease) {        // 计算下一次租约工夫 = 以后工夫 + 租约时长        Date nextLeaseDate = DateUtil.addSecond(new Date(), leaseTerm);        // 1 如果曾经有租约,则更新租约工夫(内存和数据库)即可        if (hasLease(name)) {            // LOG.info("用户已有租约,更新数据库和内存中的租约信息");            // 更新数据库            LeaseInfo leaseInfo = queryValidateLease(name);            if (leaseInfo == null) {                LOG.error("LeaseServiceImpl applyLeaseTask leaseInfo is null");                return null;            }            // fix.2020.08.13,与本机ip相等且满足一致性hash调配策略,才续约,其余状况为null            String leaseUserIp = leaseInfo.getLeaseUserIp();            if (!leaseUserIp.equals(getSelfUser())) {                return null;            }            leaseInfo.setLeaseEndDate(nextLeaseDate);            updateLeaseInfo(leaseInfo);            return nextLeaseDate;        }        // 2 没有租约状况 判断是否能够获取租约,只有租约没有被其他人获取,则阐明有无效租约        boolean success = canGetLease(name);        if (!success) { // 示意被其余机器获取到了无效的租约            // LOG.info("其余机器获取到了无效的租约");            return null;        }        // 3 没有租约而且能够获取租约的状况,则尝试应用数据库原子更新的形式获取租约,保障只有一台机器胜利获取租约,而且能够运行        boolean flag = tryGetLease(name, nextLeaseDate);        if (flag) { // 获取租约胜利            newApplyLease.set(true);            return nextLeaseDate;        }        return null;    }    
  • ApplyTask外部调用applyLeaseTask,如果已有租约则更新租约工夫,没有租约则判断是否能够获取租约,能够则执行tryGetLease

tryGetLease

    /**     * 更新数据库,占用租期并更新租期工夫     *     * @param time     */    protected boolean tryGetLease(String name, Date time) {        // LOG.info("尝试获取租约 lease name is : " + name + " 下次到期工夫: "        // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(time));        LeaseInfo validateLeaseInfo = queryValidateLease(name);        if (validateLeaseInfo == null) {// 这里有两种状况 1 数据库外面没有租约信息 2 数据库外面有租约信息然而曾经过期            Integer count = countLeaseInfo(name);            if (count == null || count == 0) {// 示意当初数据库外面没有任何租约信息,插入租约胜利则示意获取胜利,失败示意在这一时刻其余机器获取了租约                // LOG.info("数据库中临时没有租约信息,尝试原子插入租约:" + name);                // fix.2020.08.13,通过一致性hash计算,该名字的工作不应该在本机执行,间接返回,无需插入。只有调配到hash执行权限的机器才能够插入并获取租约                if (!getSelfUser().equals(getConsistentHashHost(name))) {                    return false;                }                validateLeaseInfo = new LeaseInfo();                validateLeaseInfo.setLeaseName(name);                validateLeaseInfo.setLeaseUserIp(getSelfUser());                validateLeaseInfo.setLeaseEndDate(time);                validateLeaseInfo.setStatus(1);                validateLeaseInfo.setVersion(1);                if (insert(validateLeaseInfo)) {                    LOG.info("数据库中临时没有租约信息,原子插入胜利,获取租约胜利:" + name);                    return true;                } else {                    LOG.info("数据库中临时没有租约信息,原子插入失败,曾经被其余机器获取租约:" + name);                    return false;                }            } else { // 示意数据库外面有一条然而有效,这里须要两台机器依照version进行原子更新,更新胜利的获取租约                // LOG.info("数据库中有一条有效的租约信息,尝试依据版本号去原子更新租约信息:" + name);                LeaseInfo inValidateLeaseInfo = queryInValidateLease(name);                if (inValidateLeaseInfo == null) {// 阐明这个时候另外一台机器获取胜利了                    LOG.info("另外一台机器获取胜利了租约:" + name);                    return false;                }                // fix.2020.08.13,机器重启之后,该名字的工作曾经不调配在此机器上执行,间接返回,无需更新数据库                if (!getSelfUser().equals(getConsistentHashHost(name))) {                    return false;                }                inValidateLeaseInfo.setLeaseName(name);                inValidateLeaseInfo.setLeaseUserIp(getSelfUser());                inValidateLeaseInfo.setLeaseEndDate(time);                inValidateLeaseInfo.setStatus(1);                boolean success = updateDBLeaseInfo(inValidateLeaseInfo);                if (success) {                    LOG.info("LeaseServiceImpl 原子更新租约胜利,以后机器获取到了租约信息:" + name);                } else {                    LOG.info("LeaseServiceImpl 原子更新租约失败,租约被其余机器获取:" + name);                }                return success;            }        } else { // 判断是否是本人获取了租约,如果是本人获取了租约则更新工夫(内存和数据库),            // 这里是为了解决机器重启的状况,机器重启,内存中没有租约信息,然而实际上该用户是有租约权限的            // fix.2020.08.13,租约的ip与本机ip相等,且满足一致性hash策略,才会被本机执行            String leaseUserIp = validateLeaseInfo.getLeaseUserIp();            if (leaseUserIp.equals(getSelfUser())) {                // 如果当期用户有租约信息,则更新数据库                validateLeaseInfo.setLeaseEndDate(time);                boolean hasUpdate = updateLeaseInfo(validateLeaseInfo);                if (hasUpdate) {                    LOG.info(                        "LeaseServiceImpl机器重启状况,以后用户有租约信息,并且更新数据库胜利,租约信息为 name :" + validateLeaseInfo.getLeaseName()                            + " ip : " + validateLeaseInfo.getLeaseUserIp() + " 到期工夫 : " + new SimpleDateFormat(                            "yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));                    return true;                } else {                    LOG.info("LeaseServiceImpl 机器重启状况,以后用户有租约信息,并且更新数据库失败,示意失去租约:" + name);                    return false;                }            }            // LOG.info("LeaseServiceImpl 租约被其余机器获取,租约信息为 name :" + validateLeaseInfo.getLeaseName() + " ip : "            // + validateLeaseInfo.getLeaseUserIp() + " 到期工夫 : "            // + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(validateLeaseInfo.getLeaseEndDate()));            return false;        }    }    protected LeaseInfo queryValidateLease(String name) {        //String sql = "SELECT * FROM lease_info WHERE lease_name ='" + name + "' and status=1 and lease_end_time>now()";        //// LOG.info("LeaseServiceImpl query validate lease sql:" + sql);        //return queryLease(name, sql);        return leaseStorage.queryValidateLease(name);    }    
  • tryGetLease先通过queryValidateLease查问租约信息,若没有租约则插入,若过期则依据版本号更新,若已有租约则判断是否是本人获取了租约,是则更新租约信息

LeaseServiceImpl

public class LeaseServiceImpl extends BasedLesaseImpl {    private static final Log LOG = LogFactory.getLog(LeaseServiceImpl.class);    private transient ConcurrentHashMap<String, HoldLockTask> holdLockTasks = new ConcurrentHashMap();    protected ConcurrentHashMap<String, HoldLockFunture> seizeLockingFuntures = new ConcurrentHashMap<>();    //如果是抢占锁状态中,则不容许申请锁    public LeaseServiceImpl() {        super();    }    /**     * 尝试获取锁,能够期待waitTime,如果到点未返回,则间接返回。如果是-1,则始终期待     *     * @param name       业务名称     * @param lockerName 锁名称     * @param waitTime   等待时间,是微秒单位     * @return     */    @Override    public boolean tryLocker(String name, String lockerName, long waitTime) {        return tryLocker(name, lockerName, waitTime, ILeaseService.DEFALUT_LOCK_TIME);    }    @Override    public boolean tryLocker(String name, String lockerName, long waitTime, int lockTimeSecond) {        long now = System.currentTimeMillis();        boolean success = lock(name, lockerName, lockTimeSecond);        while (!success) {            if (waitTime > -1 && (System.currentTimeMillis() - now > waitTime)) {                break;            }            success = lock(name, lockerName, lockTimeSecond);            if (success) {                return success;            }            try {                Thread.sleep(100);            } catch (InterruptedException e) {                LOG.error("LeaseServiceImpl try locker error", e);            }        }        return success;    }    @Override    public boolean lock(String name, String lockerName) {        return lock(name, lockerName, ILeaseService.DEFALUT_LOCK_TIME);    }    @Override    public boolean lock(String name, String lockerName, int leaseSecond) {        lockerName = createLockName(name, lockerName);        Future future = seizeLockingFuntures.get(lockerName);        if (future != null && ((HoldLockFunture)future).isDone == false) {            return false;        }        Date nextLeaseDate =            DateUtil.addSecond(new Date(), leaseSecond);// 默认锁定5分钟,用完须要立即开释.如果工夫不同步,可能导致锁失败        return tryGetLease(lockerName, nextLeaseDate);    }    @Override    public boolean unlock(String name, String lockerName) {        // LOG.info("LeaseServiceImpl unlock,name:" + name);        lockerName = createLockName(name, lockerName);        LeaseInfo validateLeaseInfo = queryValidateLease(lockerName);        if (validateLeaseInfo == null) {            LOG.warn("LeaseServiceImpl unlock,validateLeaseInfo is null,lockerName:" + lockerName);        }        if (validateLeaseInfo != null && validateLeaseInfo.getLeaseUserIp().equals(getSelfUser())) {            validateLeaseInfo.setStatus(0);            updateDBLeaseInfo(validateLeaseInfo);        }        HoldLockTask holdLockTask = holdLockTasks.remove(lockerName);        if (holdLockTask != null) {            holdLockTask.close();        }        leaseName2Date.remove(lockerName);        return false;    }    /**     * 如果有锁,则始终持有,如果不能获取,则完结。和租约不同,租约是没有也会尝试重试,一备对方挂机,本人能够接手工作     *     * @param name     * @param secondeName     * @param lockTimeSecond 获取锁的工夫     * @return     */    @Override    public boolean holdLock(String name, String secondeName, int lockTimeSecond) {        if (hasHoldLock(name, secondeName)) {            return true;        }        synchronized (this) {            if (hasHoldLock(name, secondeName)) {                return true;            }            String lockerName = createLockName(name, secondeName);            Date nextLeaseDate =                DateUtil.addSecond(new Date(), lockTimeSecond);            boolean success = tryGetLease(lockerName, nextLeaseDate);// 申请锁,锁的工夫是leaseTerm            if (!success) {                return false;            }            leaseName2Date.put(lockerName, nextLeaseDate);            if (!holdLockTasks.containsKey(lockerName)) {                HoldLockTask holdLockTask = new HoldLockTask(lockTimeSecond, lockerName, this);                holdLockTask.start();                holdLockTasks.putIfAbsent(lockerName, holdLockTask);            }        }        return true;    }    /**     * 是否持有锁,不拜访数据库,间接看本地     *     * @param name     * @param secondeName     * @return     */    @Override    public boolean hasHoldLock(String name, String secondeName) {        String lockerName = createLockName(name, secondeName);        return hasLease(lockerName);    }    @Override    public List<LeaseInfo> queryLockedInstanceByNamePrefix(String name, String lockerNamePrefix) {        String leaseNamePrefix = MapKeyUtil.createKey(name, lockerNamePrefix);        return queryValidateLeaseByNamePrefix(leaseNamePrefix);    }    //......}
  • LeaseServiceImpl继承了BasedLesaseImpl,tryLocker办法会依据等待时间循环执行lock,lock办法则执行tryGetLease,unlock办法则更新租约信息,同时移除内存记录;holdLock则通过hasHoldLock判断是否持有锁,若有则返回,没有则执行tryGetLease

ILeaseStorage

public interface ILeaseStorage {    /**     * 更新lease info,须要是原子操作,存储保障多线程操作的原子性     *     * @param leaseInfo 租约表数据     * @return     */    boolean updateLeaseInfo(LeaseInfo leaseInfo);    /**     * 统计这个租约名称下,LeaseInfo对象个数     *     * @param leaseName 租约名称,无特殊要求,雷同名称会竞争租约     * @return     */    Integer countLeaseInfo(String leaseName);    /**     * 查问有效的的租约     *     * @param leaseName 租约名称,无特殊要求,雷同名称会竞争租约     * @return     */    LeaseInfo queryInValidateLease(String leaseName);    /**     * 查问无效的的租约     *     * @param leaseName 租约名称,无特殊要求,雷同名称会竞争租约     * @return     */    LeaseInfo queryValidateLease(String leaseName);    /**     * 按前缀查问无效的租约信息     *     * @param namePrefix     * @return     */    List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix);    /**     * 减少租约     *     * @param leaseInfo 租约名称,无特殊要求,雷同名称会竞争租约     */    void addLeaseInfo(LeaseInfo leaseInfo);}
  • ILeaseStorage接口定义了updateLeaseInfo、countLeaseInfo、queryInValidateLease、queryValidateLease、queryValidateLeaseByNamePrefix、addLeaseInfo办法

DBLeaseStorage

public class DBLeaseStorage implements ILeaseStorage {    private static final Log LOG = LogFactory.getLog(DBLeaseStorage.class);    protected JDBCDriver jdbcDataSource;    private String url;    protected String userName;    protected String password;    protected String jdbc;    public DBLeaseStorage(String jdbc, String url, String userName, String password) {        this.jdbc = jdbc;        this.url = url;        this.userName = userName;        this.password = password;        jdbcDataSource = DriverBuilder.createDriver(jdbc, url, userName, password);    }    @Override    public boolean updateLeaseInfo(LeaseInfo leaseInfo) {        String sql = "UPDATE lease_info SET version=version+1,status=#{status},gmt_modified=now()";        String whereSQL = " WHERE id=#{id} and version=#{version}";        if (StringUtil.isNotEmpty(leaseInfo.getLeaseName())) {            sql += ",lease_name=#{leaseName}";        }        if (StringUtil.isNotEmpty(leaseInfo.getLeaseUserIp())) {            sql += ",lease_user_ip=#{leaseUserIp}";        }        if (leaseInfo.getLeaseEndDate() != null) {            sql += ",lease_end_time=#{leaseEndDate}";        }        sql += whereSQL;        sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);        try {            int count = getOrCreateJDBCDataSource().update(sql);            boolean success = count > 0;            if (success) {                synchronized (this) {                    leaseInfo.setVersion(leaseInfo.getVersion() + 1);                }            } else {                System.out.println(count);            }            return success;        } catch (Exception e) {            LOG.error("LeaseServiceImpl updateLeaseInfo excuteUpdate error", e);            throw new RuntimeException("execute sql error " + sql, e);        }    }    @Override    public Integer countLeaseInfo(String leaseName) {        String sql = "SELECT count(*) as c FROM lease_info  WHERE lease_name = '" + leaseName + "' and status = 1";        try {            List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);            if (rows == null || rows.size() == 0) {                return null;            }            Long value = (Long) rows.get(0).get("c");            return value.intValue();        } catch (Exception e) {            throw new RuntimeException("execute sql error " + sql, e);        }    }    @Override    public LeaseInfo queryInValidateLease(String leaseName) {        String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time<'" + DateUtil.getCurrentTimeString() + "'";        LOG.info("LeaseServiceImpl queryInValidateLease builder:" + sql);        return queryLease(leaseName, sql);    }    @Override    public LeaseInfo queryValidateLease(String leaseName) {        String sql = "SELECT * FROM lease_info WHERE lease_name ='" + leaseName + "' and status=1 and lease_end_time>now()";        return queryLease(leaseName, sql);    }    @Override    public List<LeaseInfo> queryValidateLeaseByNamePrefix(String namePrefix) {        String sql = "SELECT * FROM lease_info WHERE lease_name like '" + namePrefix + "%' and status=1 and lease_end_time>now()";        try {            List<LeaseInfo> leaseInfos = new ArrayList<>();            List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);            if (rows == null || rows.size() == 0) {                return null;            }            for (Map<String, Object> row : rows) {                LeaseInfo leaseInfo = convert(row);                leaseInfos.add(leaseInfo);            }            return leaseInfos;        } catch (Exception e) {            throw new RuntimeException("execute sql error " + sql, e);        }    }    @Override    public void addLeaseInfo(LeaseInfo leaseInfo) {        String sql =            " REPLACE INTO lease_info(lease_name,lease_user_ip,lease_end_time,status,version,gmt_create,gmt_modified)"                + " VALUES (#{leaseName},#{leaseUserIp},#{leaseEndDate},#{status},#{version},now(),now())";        sql = SQLUtil.parseIbatisSQL(leaseInfo, sql);        try {            getOrCreateJDBCDataSource().execute(sql);        } catch (Exception e) {            LOG.error("LeaseServiceImpl execute sql error,sql:" + sql, e);            throw new RuntimeException("execute sql error " + sql, e);        }    }    protected JDBCDriver getOrCreateJDBCDataSource() {        if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {            synchronized (this) {                if (this.jdbcDataSource == null || !this.jdbcDataSource.isValidate()) {                    this.jdbcDataSource =                        DriverBuilder.createDriver(this.jdbc, this.url, this.userName, this.password);                }            }        }        return jdbcDataSource;    }    protected LeaseInfo queryLease(String name, String sql) {        try {            List<Map<String, Object>> rows = getOrCreateJDBCDataSource().queryForList(sql);            if (rows == null || rows.size() == 0) {                return null;            }            return convert(rows.get(0));        } catch (Exception e) {            throw new RuntimeException("execute sql error " + sql, e);        }    }    protected LeaseInfo convert(Map<String, Object> map) {        LeaseInfo leaseInfo = new LeaseInfo();        leaseInfo.setId(getMapLongValue("id", map));        leaseInfo.setCreateTime(getMapDateValue("gmt_create", map));        leaseInfo.setLeaseEndDate(getMapDateValue("lease_end_time", map));        leaseInfo.setLeaseName(getMapValue("lease_name", map, String.class));        leaseInfo.setLeaseUserIp(getMapValue("lease_user_ip", map, String.class));        Integer status = getMapValue("status", map, Integer.class);        if (status != null) {            leaseInfo.setStatus(status);        }        leaseInfo.setUpdateTime(getMapDateValue("gmt_modified", map));        Long version = getMapLongValue("version", map);        if (version != null) {            leaseInfo.setVersion(version);        }        return leaseInfo;    }    @SuppressWarnings("unchecked")    private <T> T getMapValue(String fieldName, Map<String, Object> map, Class<T> integerClass) {        Object value = map.get(fieldName);        if (value == null) {            return null;        }        return (T) value;    }    private Long getMapLongValue(String fieldName, Map<String, Object> map) {        Object value = map.get(fieldName);        if (value == null) {            return null;        }        if (value instanceof Long) {            return (Long) value;        }        if (value instanceof BigInteger) {            return ((BigInteger) value).longValue();        }        return null;    }    private Date getMapDateValue(String fieldName, Map<String, Object> map) {        Object value = map.get(fieldName);        if (value == null) {            return null;        }        if (value instanceof Date) {            return (Date) value;        }        if (value instanceof String) {            return DateUtil.parseTime(((String) value));        }        return null;    }}
  • DBLeaseStorage实现了ILeaseStorage接口,应用jdbc实现了其办法

小结

rocketmq-streams的LeaseService基于db实现了租约和锁,可用于主备场景切换。