乐趣区

关于jdk:一种面向业务配置基于JSF广播定时生效的工具

作者:京东物流 王北永 姚再毅 李振

1 背景

目前,ducc 实现了实时近乎所有配置动静失效的场景,然而配置是否实时失效,不能直观展现每个机器上 jvm 内对象对应的参数是否已变更为精确的值,大部分时候须要查看日志确认是否失效。

2 技术依赖

1)Jsf: 京东 RPC 框架,用作机器之间的通信工具

2)redis/redisson:redis,用作配置信息的存储

3)ZK/Curator: Zookeeper,用作配置信息的存储 和 redis 二选一

3)clover:定时工作集群,用作工作提早或周期性执行

3 实现原理

1)接入方:

各个接入零碎通过接入治理模块获取 token,并指定所在零碎公布的的服务器 ip, 用作后续的 ip 鉴权。当系统启动时,主动在各个系统生成接口提供方,并注册到 JSF 注册核心。别名需各个系统惟一不反复。鉴权为对立服务端做 IP 鉴权。

2)对立配置服务端:

提供按不同接入方、不同零碎、不同环境的配置界面。业务人员可设定主动失效工夫或者立刻失效工夫。如果是立即失效,则通过 JSF 播送或者指定机器失效配置。如果是定时失效,则新增定时器并指定失效规定,达到工夫后触发播送告诉。

整个接入方和对立配置服务端的架构如下图

4 实现步骤

1)重写 JSF 类 ConsumerConfig 类办法 refer,将其中的轮训调用客户端改为播送调用客户端 BroadCastClient。

this.client= new BroadCastClient(this);
this.proxyInvoker = new ClientProxyInvoker(this);
ProtocolFactory.check(Constants.ProtocolType.valueOf(this.getProtocol()), Constants.CodecType.valueOf(this.getSerialization()));
this.proxyIns = (T) ProxyFactory.buildProxy(this.getProxy(), this.getProxyClass(), this.proxyInvoker);

2)播送调用客户端办法别离获取以后注册核心有心跳的服务提供者和已失去连贯的机器列表。对对立配置来讲,要么同时失败,要么同时胜利,判断如果存在不失常的服务提供方,则不同步。只有全副提供方存在才能够开始播送配置信息

 ConcurrentHashMap<Provider, ClientTransport>  concurrentHashMap = this.connectionHolder.getAliveConnections();
        ConcurrentHashMap<Provider, ClientTransportConfig>  deadConcurrentHashMap = this.connectionHolder.getDeadConnections();
        if(deadConcurrentHashMap!=null && deadConcurrentHashMap.size()>0){log.warn("以后别名 {} 存在不失常服务提供方数量{},请关注!",msg.getAlias(),deadConcurrentHashMap.size());
            throw new RpcException(String.format("以后别名 %s 存在不失常服务提供方数量 %s,请关注!",msg.getAlias(),deadConcurrentHashMap.size()));
        }
        if(concurrentHashMap.isEmpty()){log.info("以后别名 {} 不存在失常服务提供方",msg.getAlias());
            throw new RpcException(String.format("以后别名 %s 不存在失常服务提供方",msg.getAlias()));
        }
        Iterator aliveConnections = concurrentHashMap.entrySet().iterator();
        log.info("以后别名 {} 存在失常服务提供方数量{}",msg.getAlias(),concurrentHashMap.size());
        while (aliveConnections.hasNext()) {Entry<Provider, ClientTransport> entry = (Entry) aliveConnections.next();
            Provider provider = (Provider) entry.getKey();
            log.info("以后连贯 ip={}、port={}、datacenterCode={}",provider.getIp(),provider.getPort(),provider.getDatacenterCode());
            ClientTransport connection = (ClientTransport) entry.getValue();
            if (connection != null && connection.isOpen()) {
                try {result = super.sendMsg0(new Connection(provider, connection), msg);
                } catch (RpcException rpc) {
                    exception = rpc;
                    log.warn(rpc.getMessage(), rpc);
                } catch (Throwable e) {exception = new RpcException(e.getMessage(), e);
                    log.warn(e.getMessage(), e);
                }
            }
        }

3)服务配置端,当业务人员配置及时失效或者工作达到时,则依据配置,生成服务调用方,通过对立刷新接口将配置同步刷新到对应的接入零碎中, 如下图为操作界面,当增删改查时,会将属性增量同步。

服务端在下面操作增删改时,通过以下形式获取服务调用方

 public static ExcuteAction createJsfConsumer(String alias, String token) {RegistryConfig jsfRegistry = new RegistryConfig();
        jsfRegistry.setIndex("i.jsf.jd.com");
        BroadCastConsumerConfig consumerConfig = new BroadCastConsumerConfig<>();
        Map<String, String> parameters = new HashMap<>();
        parameters.put(".token",token);
        consumerConfig.setParameters(parameters);
        consumerConfig.setInterfaceId(RefreshRemoteService.class.getName());
        consumerConfig.setRegistry(jsfRegistry);
        consumerConfig.setProtocol("jsf");
        consumerConfig.setAlias(alias);
        consumerConfig.setRetries(2);
        return  new ExcuteAction(consumerConfig);
    }

通过以上的配置的客户端,调用服务提供方办法 refreshRemoteService#refresh,将配置信息进行同步到各个接入零碎

public void call(Map<String,Object> propertiesValue){
        try{RefreshRemoteService refreshRemoteService = (RefreshRemoteService)consumerConfig.refer();
            if(refreshRemoteService!=null){refreshRemoteService.refresh(propertiesValue);
            }
        }catch (Exception e){log.error(e.getMessage());
            throw new EasyConfigException(e);
        }finally {consumerConfig.unrefer(); ;
        }
    }

4)接入方启动时,须要依据本人配置,将存在 redis 或者 zk 的配置一次加载到实例变量中。并注册刷新接口到 JSF 注册核心。

其中注册刷新接口到 JSF 注册核心代码如下

 @Bean(name = "refreshPorpertiesService")
    public ProviderConfig createJsfProvider() throws Exception {ProviderConfig providerConfig = new ProviderConfig();
        providerConfig.setId("refreshPorpertiesService");
        providerConfig.setInterfaceId(RefreshRemoteService.class.getName());
        providerConfig.setRef(new RefreshRemoteServiceDelage(applicationContext));
        providerConfig.setTimeout(30000);
        providerConfig.setAlias(EasyConfigure.getAppCode()+EasyConfigure.getEnv());
        providerConfig.setServer(serverConfig);
        providerConfig.setRegistry(jsfRegistry);
        providerConfig.setParameter("token", MD5Util.md5(EasyConfigure.getAppCode()));
        providerConfig.export();
        return providerConfig;
    }

其中
RefreshRemoteServiceDelage 类提供刷新接口的理论逻辑如下,需判断以后实例是 jdk 动静代理还是 cglib 代理

判断逻辑如下

 if(AopUtils.isJdkDynamicProxy(object)) {object= AopUtil.getJdkDynamicProxyTargetObject(object);
 } else if(AopUtils.isCglibProxy(object)){ //cglib
         object= AopUtil.getCglibProxyTargetObject(object);
   }

实例对象变量值依据自定义的参数转换形式转换后赋值实例变量

if(autoValue.convert()!=null && !autoValue.getClass().isInterface()){if(!autoValue.convert().newInstance().getInClassType().isAssignableFrom(newVal.getClass()) ){continue;}
           newVal = autoValue.convert().newInstance().convert(newVal);
            if(newVal!=null){if(!autoValue.convert().newInstance().getOutClassType().isAssignableFrom(newVal.getClass()) ){continue;}
              field.setAccessible(true);
               Object value = ReflectionUtils.getField(field,object);
              log.info("change  properties{} for object {} before value {}",field.getName(),object.getClass().getName(),value);
               ReflectionUtils.setField(field,object,newVal);
                log.info("change  properties{} for object {} after value {}",field.getName(),object.getClass().getName(),newVal);
         }              
}

5 实际

1)pom 引入

<dependency>
   <groupId>com.jdl</groupId>
   <artifactId>easyconfig</artifactId>
   <version>1.0-SNAPSHOT</version>
</dependency>

2)配置存储配置(比方 redis 形式)

refresh:
  config:
    appCode: zdzq-worker-appcode
    redisUrl: redis://:@127.0.0.1

3)类全局变量须要实时刷新配置,需在类对立指定注解 PropertiryChangeListener,实例变量须要减少注解 AutoValue 并指定数据格式转换器

@PropertiryChangeListener
public class ChanceServiceImpl implement ChanceService{@AutoValue(convert = DateConvert.class,alias = "config-id")
private Date  signDate;
@AutoValue(convert = SpmKaApply Convert.class,alias = "config-id")
private SpmKaApply  spmKaApply;
}

以上 convert 办法自定义,反对各种简单配置对象,举例数据转换为 List 如下

public  class Convert2 implements Convert<Map<String, String>, Set<String>> {public Convert2(){}
    @Override
    public Set<String> convert(Map<String, String> siteInfoMap) {.... 你的对象值转换}
    @Override
    public Class<?> getInClassType() {return Map.class;}
    @Override
    public Class<?> getOutClassType() {return Set.class;}

接入应用服务启动后,可拜访 /refreshUI 可查看利用在集群中为主动配置的实例,并显示以后实例中变量值参数。key 为实例变量名。

6 总结

1、反对 jdk 动静代理的实例对象和 cglib 代理对象的参数动静配置

2、反对定时刷新配置

3、间接查看和验证利用集群中实例变量是否统一

退出移动版