共计 24194 个字符,预计需要花费 61 分钟才能阅读完成。
浏览这篇文章之前,倡议先浏览和这篇文章关联的内容。
1. 具体分析散布式微服务架构下网络通信的底层实现原理(图解)
2. (年薪 60W 的技巧)工作了 5 年,你真的了解 Netty 以及为什么要用吗?(深度干货)
3. 深度解析 Netty 中的外围组件(图解 + 实例)
4. BAT 面试必问细节:对于 Netty 中的 ByteBuf 详解
5. 通过大量实战案例合成 Netty 中是如何解决拆包黏包问题的?
6. 基于 Netty 实现自定义音讯通信协议(协定设计及解析利用实战)
7. 全网最具体最齐全的序列化技术及深度解析与利用实战
8. 手把手教你基于 Netty 实现一个根底的 RPC 框架(通俗易懂)
在本篇文章中,咱们持续围绕 Netty 手写实现 RPC 根底篇进行优化,次要引入几个点
- 集成 spring,实现注解驱动配置
- 集成 zookeeper,实现服务注册
- 减少负载平衡实现
源代码,加「跟着 Mic 学架构」微信号,回复『rpc』获取。
减少注解驱动
次要波及到的批改模块
- netty-rpc-protocol
- netty-rpc-provider
netty-rpc-protocol
以后模块次要批改的类如下。
<center> 图 7 -1</center>
上面针对 netty-rpc-protocol 模块的批改如下
减少注解驱动
这个注解的作用是用来指定某些服务为近程服务
@Target(ElementType.TYPE)// Target 阐明了 Annotation 所润饰的对象范畴, TYPE: 用于形容类、接口(包含注解类型) 或 enum 申明
@Retention(RetentionPolicy.RUNTIME)// Reteniton 的作用是定义被它所注解的注解保留多久,保留至运行时。所以咱们能够通过反射去获取注解信息。@Component
public @interface GpRemoteService {}
SpringRpcProviderBean
这个类次要用来在启动 NettyServer,以及保留 bean 的映射关系
@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {
private final int serverPort;
private final String serverAddress;
public SpringRpcProviderBean(int serverPort) throws UnknownHostException {
this.serverPort = serverPort;
InetAddress address=InetAddress.getLocalHost();
this.serverAddress=address.getHostAddress();}
@Override
public void afterPropertiesSet() throws Exception {log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
new Thread(()->{
try {new NettyServer(this.serverAddress,this.serverPort).startNettyServer();} catch (Exception e) {log.error("start Netty Server Occur Exception,",e);
e.printStackTrace();}
}).start();}
//bean 实例化后调用
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ // 针对存在该注解的服务进行公布
Method[] methods=bean.getClass().getDeclaredMethods();
for(Method method: methods){ // 保留须要公布的 bean 的映射
String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName();
BeanMethod beanMethod=new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.beanMethodMap.put(key,beanMethod);
}
}
return bean;
}
}
Mediator
次要治理 bean 以及调用
BeanMethod
@Data
public class BeanMethod {
private Object bean;
private Method method;
}
Mediator
负责持有公布 bean 的治理,以及 bean 的反射调用
public class Mediator {public static Map<String,BeanMethod> beanMethodMap=new ConcurrentHashMap<>();
private volatile static Mediator instance=null;
private Mediator(){}
public static Mediator getInstance(){if(instance==null){synchronized (Mediator.class){if(instance==null){instance=new Mediator();
}
}
}
return instance;
}
public Object processor(RpcRequest rpcRequest){String key=rpcRequest.getClassName()+"."+rpcRequest.getMethodName();
BeanMethod beanMethod=beanMethodMap.get(key);
if(beanMethod==null){return null;}
Object bean=beanMethod.getBean();
Method method=beanMethod.getMethod();
try {return method.invoke(bean,rpcRequest.getParams());
} catch (IllegalAccessException e) {e.printStackTrace();
} catch (InvocationTargetException e) {e.printStackTrace();
}
return null;
}
}
RpcServerProperties
定义配置属性
@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {private int servicePort;}
RpcProviderAutoConfiguration
定义主动配置类
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {
@Bean
public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {return new SpringRpcProviderBean(rpcServerProperties.getServicePort());
}
}
批改 RpcServerHandler
批改调用形式,间接应用 Mediator 的调用即可。
public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {RpcProtocol resProtocol=new RpcProtocol<>();
Header header=msg.getHeader();
header.setReqType(ReqType.RESPONSE.code());
Object result=Mediator.getInstance().processor(msg.getContent()); // 次要批改这个局部
resProtocol.setHeader(header);
RpcResponse response=new RpcResponse();
response.setData(result);
response.setMsg("success");
resProtocol.setContent(response);
ctx.writeAndFlush(resProtocol);
}
}
netty-rpc-provider
这个模块中次要批改两个局部
- application.properties
- NettyRpcProviderMain
NettyRpcProviderMain
@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"})
@SpringBootApplication
public class NettyRpcProviderMain {public static void main(String[] args) throws Exception {SpringApplication.run(NettyRpcProviderMain.class, args);
// 去掉原来的实例化局部
}
}
application.properties
减少一个配置属性。
gp.rpc.servicePort=20880
UserServiceImpl
把以后服务公布进来。
@GpRemoteService // 示意将以后服务公布成近程服务
@Slf4j
public class UserServiceImpl implements IUserService {
@Override
public String saveUser(String name) {log.info("begin saveUser:"+name);
return "Save User Success!";
}
}
批改客户端的注解驱动
客户端同样也须要通过注解的形式来援用服务,这样就可能彻底的屏蔽掉近程通信的细节内容,代码构造如图 7 - 2 所示
<center> 图 7 -2</center>
减少客户端注解
在 netty-rpc-protocol 模块的 annotation 目录下创立上面这个注解。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface GpRemoteReference {}
SpringRpcReferenceBean
定义工厂 Bean,用来构建近程通信的代理
public class SpringRpcReferenceBean implements FactoryBean<Object> {
private Class<?> interfaceClass;
private Object object;
private String serviceAddress;
private int servicePort;
@Override
public Object getObject() throws Exception {return object;}
public void init(){this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
new Class<?>[]{this.interfaceClass},
new RpcInvokerProxy(this.serviceAddress,this.servicePort));
}
@Override
public Class<?> getObjectType() {return this.interfaceClass;}
public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}
public void setServiceAddress(String serviceAddress) {this.serviceAddress = serviceAddress;}
public void setServicePort(int servicePort) {this.servicePort = servicePort;}
}
SpringRpcReferencePostProcessor
用来实现近程 Bean 的动静代理注入:
- BeanClassLoaderAware:获取 Bean 的类装载器
- BeanFactoryPostProcessor:在 spring 容器加载了 bean 的定义文件之后,在 bean 实例化之前执行
- ApplicationContextAware:获取上下文对象 ApplicationContenxt
@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
private ApplicationContext context;
private ClassLoader classLoader;
private RpcClientProperties clientProperties;
public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {this.clientProperties = clientProperties;}
// 保留公布的援用 bean 信息
private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();
@Override
public void setBeanClassLoader(ClassLoader classLoader) {this.classLoader=classLoader;}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context=applicationContext;}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
// 遍历 bean 定义,而后获取到加载的 bean,遍历这些 bean 中的字段,是否携带 GpRemoteReference 注解
// 如果有,则须要构建一个动静代理实现
BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
String beanClassName=beanDefinition.getBeanClassName();
if(beanClassName!=null){
// 和 forName 办法雷同,外部就是间接调用的 forName 办法
Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
// 针对以后类中的指定字段,动态创建一个 Bean
ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
}
}
// 将 @GpRemoteReference 注解的 bean,构建一个动静代理对象
BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{if(context.containsBean(beanName)){log.warn("SpringContext already register bean {}",beanName);
return;
}
// 把动态创建的 bean 注册到容器中
registry.registerBeanDefinition(beanName,beanDefinition);
log.info("registered RpcReferenceBean {} success.",beanName);
});
}
private void parseRpcReference(Field field){GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
if(gpRemoteReference!=null) {BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
builder.addPropertyValue("interfaceClass",field.getType());
builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
builder.addPropertyValue("servicePort",clientProperties.getServicePort());
BeanDefinition beanDefinition=builder.getBeanDefinition();
rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
}
}
}
须要在 RpcConstant 常量中减少一个 INIT_METHOD_NAME 属性
public class RpcConstant {
//header 局部的总字节数
public final static int HEAD_TOTAL_LEN=16;
// 魔数
public final static short MAGIC=0xca;
public static final String INIT_METHOD_NAME = "init";
}
RpcClientProperties
@Data
public class RpcClientProperties {
private String serviceAddress="192.168.1.102";
private int servicePort=20880;
}
RpcRefernceAutoConfiguration
@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{
@Bean
public SpringRpcReferencePostProcessor postProcessor(){String address=environment.getProperty("gp.serviceAddress");
int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
RpcClientProperties rc=new RpcClientProperties();
rc.setServiceAddress(address);
rc.setServicePort(port);
return new SpringRpcReferencePostProcessor(rc);
}
private Environment environment;
@Override
public void setEnvironment(Environment environment) {this.environment=environment;}
}
netty-rpc-consumer
批改 netty-rpc-consumer 模块
- 把该模块变成一个 spring boot 我的项目
- 减少 web 依赖
- 增加测试类
<center> 图 7 -3 netty-rpc-consumer 模块 </center>
引入 jar 包依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
HelloController
@RestController
public class HelloController {
@GpRemoteReference
private IUserService userService;
@GetMapping("/test")
public String test(){return userService.saveUser("Mic");
}
}
NettyConsumerMain
@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"})
@SpringBootApplication
public class NettyConsumerMain {public static void main(String[] args) {SpringApplication.run(NettyConsumerMain.class, args);
}
}
application.properties
gp.serviceAddress=192.168.1.102
servicePort.servicePort=20880
拜访测试
- 启动 Netty-Rpc-Server
- 启动 Netty-Rpc-Consumer
如果启动过程没有任何问题,则能够拜访 HelloController 来测试近程服务的拜访。
引入注册核心
创立一个 netty-rpc-registry 模块,代码构造如图 7 - 4 所示。
引入相干依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>4.2.0</version>
</dependency>
IRegistryService
public interface IRegistryService {
/**
* 注册服务
* @param serviceInfo
* @throws Exception
*/
void register(ServiceInfo serviceInfo) throws Exception;
/**
* 勾销注册
* @param serviceInfo
* @throws Exception
*/
void unRegister(ServiceInfo serviceInfo) throws Exception;
/**
* 动静发现服务
* @param serviceName
* @return
* @throws Exception
*/
ServiceInfo discovery(String serviceName) throws Exception;
}
ServiceInfo
@Data
public class ServiceInfo {
private String serviceName;
private String serviceAddress;
private int servicePort;
}
ZookeeperRegistryService
@Slf4j
public class ZookeeperRegistryService implements IRegistryService {
private static final String REGISTRY_PATH="/registry";
//Curator 中提供的服务注册与发现的组件封装,它对此形象出了 ServiceInstance、// ServiceProvider、ServiceDiscovery 三个接口,通过它咱们能够很轻易的实现 Service Discovery
private final ServiceDiscovery<ServiceInfo> serviceDiscovery;
private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;
public ZookeeperRegistryService(String registryAddress) throws Exception {
CuratorFramework client= CuratorFrameworkFactory
.newClient(registryAddress,new ExponentialBackoffRetry(1000,3));
JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class);
this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class)
.client(client)
.serializer(serializer)
.basePath(REGISTRY_PATH)
.build();
this.serviceDiscovery.start();
loadBalance=new RandomLoadBalance();}
@Override
public void register(ServiceInfo serviceInfo) throws Exception {log.info("开始注册服务,{}",serviceInfo);
ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance
.<ServiceInfo>builder().name(serviceInfo.getServiceName())
.address(serviceInfo.getServiceAddress())
.port(serviceInfo.getServicePort())
.payload(serviceInfo)
.build();
serviceDiscovery.registerService(serviceInstance);
}
@Override
public void unRegister(ServiceInfo serviceInfo) throws Exception {ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder()
.name(serviceInfo.getServiceName())
.address(serviceInfo.getServiceAddress())
.port(serviceInfo.getServicePort())
.payload(serviceInfo)
.build();
serviceDiscovery.unregisterService(serviceInstance);
}
@Override
public ServiceInfo discovery(String serviceName) throws Exception {
Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery
.queryForInstances(serviceName);
// 通过负载平衡返回某个具体实例
ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances);
if(serviceInstance!=null){return serviceInstance.getPayload();
}
return null;
}
}
引入负载平衡算法
因为服务端发现服务时可能有多个,所以须要用到负载平衡算法来实现
ILoadBalance
public interface ILoadBalance<T> {T select(List<T> servers);
}
AbstractLoadBalance
public abstract class AbstractLoadBanalce implements ILoadBalance<ServiceInstance<ServiceInfo>> {
@Override
public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers){if(servers==null||servers.size()==0){return null;}
if(servers.size()==1){return servers.get(0);
}
return doSelect(servers);
}
protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);
}
RandomLoadBalance
public class RandomLoadBalance extends AbstractLoadBanalce {
@Override
protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) {int length=servers.size();
Random random=new Random();
return servers.get(random.nextInt(length));
}
}
RegistryType
public enum RegistryType {ZOOKEEPER((byte)0),
EUREKA((byte)1);
private byte code;
RegistryType(byte code) {this.code=code;}
public byte code(){return this.code;}
public static RegistryType findByCode(byte code) {for (RegistryType rt : RegistryType.values()) {if (rt.code() == code) {return rt;}
}
return null;
}
}
RegistryFactory
public class RegistryFactory {public static IRegistryService createRegistryService(String address,RegistryType registryType){
IRegistryService registryService=null;
try {switch (registryType) {
case ZOOKEEPER:
registryService = new ZookeeperRegistryService(address);
break;
case EUREKA:
//TODO
break;
default:
registryService = new ZookeeperRegistryService(address);
break;
}
}catch (Exception e){e.printStackTrace();
}
return registryService;
}
}
批改服务端减少服务注册
批改 netty-rpc-protocol 模块,退出注册核心的反对
SpringRpcProviderBean
依照上面 case 标注局部,示意要批改的内容
@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {
private final int serverPort;
private final String serverAddress;
private final IRegistryService registryService; // 批改局部, 减少注册核心实现
public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException {
this.serverPort = serverPort;
InetAddress address=InetAddress.getLocalHost();
this.serverAddress=address.getHostAddress();
this.registryService=registryService; // 批改局部, 减少注册核心实现
}
@Override
public void afterPropertiesSet() throws Exception {log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
new Thread(()->{
try {new NettyServer(this.serverAddress,this.serverPort).startNettyServer();} catch (Exception e) {log.error("start Netty Server Occur Exception,",e);
e.printStackTrace();}
}).start();}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ // 针对存在该注解的服务进行公布
Method[] methods=bean.getClass().getDeclaredMethods();
for(Method method: methods){String serviceName=bean.getClass().getInterfaces()[0].getName();
String key=serviceName+"."+method.getName();
BeanMethod beanMethod=new BeanMethod();
beanMethod.setBean(bean);
beanMethod.setMethod(method);
Mediator.beanMethodMap.put(key,beanMethod);
try {
// 批改局部, 减少注册核心实现
ServiceInfo serviceInfo = new ServiceInfo();
serviceInfo.setServiceAddress(this.serverAddress);
serviceInfo.setServicePort(this.serverPort);
serviceInfo.setServiceName(serviceName);
registryService.register(serviceInfo);// 批改局部, 减少注册核心实现
}catch (Exception e){log.error("register service {} faild",serviceName,e);
}
}
}
return bean;
}
}
RpcServerProperties
批改 RpcServerProperties,减少注册核心的配置
@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {
private int servicePort;
private byte registerType;
private String registryAddress;
}
RpcProviderAutoConfiguration
减少注册核心的注入。
@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {
@Bean
public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
// 增加注册核心
IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType()));
return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService);
}
}
application.properties
批改 netty-rpc-provider 中的 application.properties。
gp.rpc.servicePort=20880
gp.rpc.registerType=0
gp.rpc.registryAddress=192.168.221.128:2181
批改客户端,减少服务发现
客户端须要批改的中央较多,上面这些批改的代码,都是 netty-rpc-protocol 模块中的类。
RpcClientProperties
减少注册核心类型和注册核心地址的选项
@Data
public class RpcClientProperties {
private String serviceAddress="192.168.1.102";
private int servicePort=20880;
private byte registryType;
private String registryAddress;
}
批改 NettyClient
本来是动态地址,当初批改成了从注册核心获取地址
@Slf4j
public class NettyClient {
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
/* private String serviceAddress;
private int servicePort;*/
public NettyClient(){log.info("begin init NettyClient");
bootstrap=new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new RpcClientInitializer());
/* this.serviceAddress=serviceAddress;
this.servicePort=servicePort;*/
}
public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception {ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName());
ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync();
future.addListener(listener->{if(future.isSuccess()){log.info("connect rpc server {} success.",serviceInfo.getServiceAddress());
}else{log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress());
future.cause().printStackTrace();
eventLoopGroup.shutdownGracefully();}
});
log.info("begin transfer data");
future.channel().writeAndFlush(protocol);
}
}
批改 RpcInvokerProxy
将动态 ip 和地址,批改成 IRegistryService
@Slf4j
public class RpcInvokerProxy implements InvocationHandler {
/* private String serviceAddress;
private int servicePort;*/
IRegistryService registryService;
public RpcInvokerProxy(IRegistryService registryService) {
/* this.serviceAddress = serviceAddress;
this.servicePort = servicePort;*/
this.registryService=registryService;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {log.info("begin invoke target server");
// 组装参数
RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
protocol.setHeader(header);
RpcRequest request=new RpcRequest();
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParams(args);
protocol.setContent(request);
// 发送申请
NettyClient nettyClient=new NettyClient();
// 构建异步数据处理
RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
RequestHolder.REQUEST_MAP.put(requestId,future);
nettyClient.sendRequest(protocol,this.registryService);
return future.getPromise().get().getData();}
}
SpringRpcReferenceBean
批改援用 bean,减少注册核心配置
public class SpringRpcReferenceBean implements FactoryBean<Object> {
private Class<?> interfaceClass;
private Object object;
/* private String serviceAddress;
private int servicePort;*/
// 批改减少注册核心
private byte registryType;
private String registryAddress;
@Override
public Object getObject() throws Exception {return object;}
public void init(){
// 批改减少注册核心
IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType));
this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
new Class<?>[]{this.interfaceClass},
new RpcInvokerProxy(registryService));
}
@Override
public Class<?> getObjectType() {return this.interfaceClass;}
public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}
/* public void setServiceAddress(String serviceAddress) {this.serviceAddress = serviceAddress;}
public void setServicePort(int servicePort) {this.servicePort = servicePort;}*/
public void setRegistryType(byte registryType) {this.registryType = registryType;}
public void setRegistryAddress(String registryAddress) {this.registryAddress = registryAddress;}
}
SpringRpcReferencePostProcessor
@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
private ApplicationContext context;
private ClassLoader classLoader;
private RpcClientProperties clientProperties;
public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {this.clientProperties = clientProperties;}
// 保留公布的援用 bean 信息
private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();
@Override
public void setBeanClassLoader(ClassLoader classLoader) {this.classLoader=classLoader;}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context=applicationContext;}
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
// 遍历 bean 定义,而后获取到加载的 bean,遍历这些 bean 中的字段,是否携带 GpRemoteReference 注解
// 如果有,则须要构建一个动静代理实现
BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
String beanClassName=beanDefinition.getBeanClassName();
if(beanClassName!=null){Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
}
}
// 将 @GpRemoteReference 注解的 bean,构建一个动静代理对象
BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{if(context.containsBean(beanName)){log.warn("SpringContext already register bean {}",beanName);
return;
}
registry.registerBeanDefinition(beanName,beanDefinition);
log.info("registered RpcReferenceBean {} success.",beanName);
});
}
private void parseRpcReference(Field field){GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
if(gpRemoteReference!=null) {BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
builder.addPropertyValue("interfaceClass",field.getType());
/*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
builder.addPropertyValue("servicePort",clientProperties.getServicePort());*/
builder.addPropertyValue("registryType",clientProperties.getRegistryType());
builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress());
BeanDefinition beanDefinition=builder.getBeanDefinition();
rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
}
}
}
RpcRefernceAutoConfiguration
@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{
@Bean
public SpringRpcReferencePostProcessor postProcessor(){String address=environment.getProperty("gp.serviceAddress");
int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
RpcClientProperties rc=new RpcClientProperties();
rc.setServiceAddress(address);
rc.setServicePort(port);
rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType")));
rc.setRegistryAddress(environment.getProperty("gp.registryAddress"));
return new SpringRpcReferencePostProcessor(rc);
}
private Environment environment;
@Override
public void setEnvironment(Environment environment) {this.environment=environment;}
}
application.properties
批改 netty-rpc-consumer 模块中的配置
gp.serviceAddress=192.168.1.102
gp.servicePort=20880
gp.registryType=0
gp.registryAddress=192.168.221.128:2181
负载平衡的测试
减少一个服务端的启动类,并且批改端口。而后客户端不须要重启的状况下刷新浏览器,即可看到负载平衡的成果。
<center> 图 7 -5</center>
须要源码的同学,请关注公众号[跟着 Mic 学架构],回复关键字[rpc],即可取得
版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自
Mic 带你学架构
!
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!