关于java:年薪60W分水岭基于Netty手写实现RPC框架进阶篇带注册中心和注解

8次阅读

共计 24194 个字符,预计需要花费 61 分钟才能阅读完成。

浏览这篇文章之前,倡议先浏览和这篇文章关联的内容。

1. 具体分析散布式微服务架构下网络通信的底层实现原理(图解)

2. (年薪 60W 的技巧)工作了 5 年,你真的了解 Netty 以及为什么要用吗?(深度干货)

3. 深度解析 Netty 中的外围组件(图解 + 实例)

4. BAT 面试必问细节:对于 Netty 中的 ByteBuf 详解

5. 通过大量实战案例合成 Netty 中是如何解决拆包黏包问题的?

6. 基于 Netty 实现自定义音讯通信协议(协定设计及解析利用实战)

7. 全网最具体最齐全的序列化技术及深度解析与利用实战

8. 手把手教你基于 Netty 实现一个根底的 RPC 框架(通俗易懂)

在本篇文章中,咱们持续围绕 Netty 手写实现 RPC 根底篇进行优化,次要引入几个点

  • 集成 spring,实现注解驱动配置
  • 集成 zookeeper,实现服务注册
  • 减少负载平衡实现

源代码,加「跟着 Mic 学架构」微信号,回复『rpc』获取。

减少注解驱动

次要波及到的批改模块

  • netty-rpc-protocol
  • netty-rpc-provider

netty-rpc-protocol

以后模块次要批改的类如下。

<center> 图 7 -1</center>

上面针对 netty-rpc-protocol 模块的批改如下

减少注解驱动

这个注解的作用是用来指定某些服务为近程服务

@Target(ElementType.TYPE)// Target 阐明了 Annotation 所润饰的对象范畴, TYPE: 用于形容类、接口(包含注解类型) 或 enum 申明
@Retention(RetentionPolicy.RUNTIME)// Reteniton 的作用是定义被它所注解的注解保留多久,保留至运行时。所以咱们能够通过反射去获取注解信息。@Component
public @interface GpRemoteService {}

SpringRpcProviderBean

这个类次要用来在启动 NettyServer,以及保留 bean 的映射关系

@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;
    private final String serverAddress; 
    public SpringRpcProviderBean(int serverPort) throws UnknownHostException {
        this.serverPort = serverPort;
        InetAddress address=InetAddress.getLocalHost();
        this.serverAddress=address.getHostAddress();}

    @Override
    public void afterPropertiesSet() throws Exception {log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
        new Thread(()->{
            try {new NettyServer(this.serverAddress,this.serverPort).startNettyServer();} catch (Exception e) {log.error("start Netty Server Occur Exception,",e);
                e.printStackTrace();}
        }).start();}
    
    //bean 实例化后调用
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ // 针对存在该注解的服务进行公布
            Method[] methods=bean.getClass().getDeclaredMethods();
            for(Method method: methods){ // 保留须要公布的 bean 的映射
                String key=bean.getClass().getInterfaces()[0].getName()+"."+method.getName();
                BeanMethod beanMethod=new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key,beanMethod);
            }
        }
        return bean;
    }
}

Mediator

次要治理 bean 以及调用

BeanMethod

@Data
public class BeanMethod {
    private Object bean;
    private Method method;
}

Mediator

负责持有公布 bean 的治理,以及 bean 的反射调用

public class Mediator {public static Map<String,BeanMethod> beanMethodMap=new ConcurrentHashMap<>();

    private volatile static Mediator instance=null;

    private Mediator(){}

    public static Mediator getInstance(){if(instance==null){synchronized (Mediator.class){if(instance==null){instance=new Mediator();
                }
            }
        }
        return instance;
    }
    public Object processor(RpcRequest rpcRequest){String key=rpcRequest.getClassName()+"."+rpcRequest.getMethodName();
        BeanMethod beanMethod=beanMethodMap.get(key);
        if(beanMethod==null){return null;}
        Object bean=beanMethod.getBean();
        Method method=beanMethod.getMethod();
        try {return method.invoke(bean,rpcRequest.getParams());
        } catch (IllegalAccessException e) {e.printStackTrace();
        } catch (InvocationTargetException e) {e.printStackTrace();
        }
        return null;
    }
}

RpcServerProperties

定义配置属性

@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {private int servicePort;}

RpcProviderAutoConfiguration

定义主动配置类

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {return new SpringRpcProviderBean(rpcServerProperties.getServicePort());
    }
}

批改 RpcServerHandler

批改调用形式,间接应用 Mediator 的调用即可。

public class RpcServerHandler extends SimpleChannelInboundHandler<RpcProtocol<RpcRequest>> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcProtocol<RpcRequest> msg) throws Exception {RpcProtocol resProtocol=new RpcProtocol<>();
        Header header=msg.getHeader();
        header.setReqType(ReqType.RESPONSE.code());
        Object result=Mediator.getInstance().processor(msg.getContent()); // 次要批改这个局部
        resProtocol.setHeader(header);
        RpcResponse response=new RpcResponse();
        response.setData(result);
        response.setMsg("success");
        resProtocol.setContent(response);

        ctx.writeAndFlush(resProtocol);
    }
}

netty-rpc-provider

这个模块中次要批改两个局部

  • application.properties
  • NettyRpcProviderMain

NettyRpcProviderMain

@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.spring.service","com.example.service"})
@SpringBootApplication
public class NettyRpcProviderMain {public static void main(String[] args) throws Exception {SpringApplication.run(NettyRpcProviderMain.class, args);
        // 去掉原来的实例化局部
    }
}

application.properties

减少一个配置属性。

gp.rpc.servicePort=20880

UserServiceImpl

把以后服务公布进来。

@GpRemoteService // 示意将以后服务公布成近程服务
@Slf4j
public class UserServiceImpl implements IUserService {
    @Override
    public String saveUser(String name) {log.info("begin saveUser:"+name);
        return "Save User Success!";
    }
}

批改客户端的注解驱动

客户端同样也须要通过注解的形式来援用服务,这样就可能彻底的屏蔽掉近程通信的细节内容,代码构造如图 7 - 2 所示

<center> 图 7 -2</center>

减少客户端注解

在 netty-rpc-protocol 模块的 annotation 目录下创立上面这个注解。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Autowired
public @interface GpRemoteReference {}

SpringRpcReferenceBean

定义工厂 Bean,用来构建近程通信的代理

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;
    private Object object;
    private String serviceAddress;
    private int servicePort;

    @Override
    public Object getObject() throws Exception {return object;}

    public void init(){this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
                new Class<?>[]{this.interfaceClass},
                new RpcInvokerProxy(this.serviceAddress,this.servicePort));
    }

    @Override
    public Class<?> getObjectType() {return this.interfaceClass;}

    public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}

    public void setServiceAddress(String serviceAddress) {this.serviceAddress = serviceAddress;}

    public void setServicePort(int servicePort) {this.servicePort = servicePort;}
}

SpringRpcReferencePostProcessor

用来实现近程 Bean 的动静代理注入:

  • BeanClassLoaderAware:获取 Bean 的类装载器
  • BeanFactoryPostProcessor:在 spring 容器加载了 bean 的定义文件之后,在 bean 实例化之前执行
  • ApplicationContextAware:获取上下文对象 ApplicationContenxt
@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private ApplicationContext context;
    private ClassLoader classLoader;
    private RpcClientProperties clientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {this.clientProperties = clientProperties;}

    // 保留公布的援用 bean 信息
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {this.classLoader=classLoader;}
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context=applicationContext;}

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
            // 遍历 bean 定义,而后获取到加载的 bean,遍历这些 bean 中的字段,是否携带 GpRemoteReference 注解
            // 如果有,则须要构建一个动静代理实现
            BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
            String beanClassName=beanDefinition.getBeanClassName();
            if(beanClassName!=null){
                // 和 forName 办法雷同,外部就是间接调用的 forName 办法
                Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
                // 针对以后类中的指定字段,动态创建一个 Bean
                ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
            }
        }
        // 将 @GpRemoteReference 注解的 bean,构建一个动静代理对象
        BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{if(context.containsBean(beanName)){log.warn("SpringContext already register bean {}",beanName);
                return;
            }
            // 把动态创建的 bean 注册到容器中
            registry.registerBeanDefinition(beanName,beanDefinition);
            log.info("registered RpcReferenceBean {} success.",beanName);
        });
    }
    private void parseRpcReference(Field field){GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
        if(gpRemoteReference!=null) {BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass",field.getType());
            builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort",clientProperties.getServicePort());
            BeanDefinition beanDefinition=builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
        }
    }
}

须要在 RpcConstant 常量中减少一个 INIT_METHOD_NAME 属性

public class RpcConstant {
    //header 局部的总字节数
    public final static int HEAD_TOTAL_LEN=16;
    // 魔数
    public final static short MAGIC=0xca;

    public static final String INIT_METHOD_NAME = "init";
}

RpcClientProperties

@Data
public class RpcClientProperties {

    private String serviceAddress="192.168.1.102";

    private int servicePort=20880;
}

RpcRefernceAutoConfiguration

@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{

    @Bean
    public SpringRpcReferencePostProcessor postProcessor(){String address=environment.getProperty("gp.serviceAddress");
        int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
        RpcClientProperties rc=new RpcClientProperties();
        rc.setServiceAddress(address);
        rc.setServicePort(port);
        return new SpringRpcReferencePostProcessor(rc);
    }

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {this.environment=environment;}
}

netty-rpc-consumer

批改 netty-rpc-consumer 模块

  • 把该模块变成一个 spring boot 我的项目
  • 减少 web 依赖
  • 增加测试类

<center> 图 7 -3 netty-rpc-consumer 模块 </center>

引入 jar 包依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

HelloController

@RestController
public class HelloController {

    @GpRemoteReference
    private IUserService userService;

    @GetMapping("/test")
    public String test(){return userService.saveUser("Mic");
    }
}

NettyConsumerMain

@ComponentScan(basePackages = {"com.example.spring.annotation","com.example.controller","com.example.spring.reference"})
@SpringBootApplication
public class NettyConsumerMain {public static void main(String[] args) {SpringApplication.run(NettyConsumerMain.class, args);
    }
}

application.properties

gp.serviceAddress=192.168.1.102
servicePort.servicePort=20880

拜访测试

  • 启动 Netty-Rpc-Server
  • 启动 Netty-Rpc-Consumer

如果启动过程没有任何问题,则能够拜访 HelloController 来测试近程服务的拜访。

引入注册核心

创立一个 netty-rpc-registry 模块,代码构造如图 7 - 4 所示。

引入相干依赖

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>4.2.0</version>
</dependency>
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-x-discovery</artifactId>
    <version>4.2.0</version>
</dependency>

IRegistryService

public interface IRegistryService {

    /**
     * 注册服务
     * @param serviceInfo
     * @throws Exception
     */
    void register(ServiceInfo serviceInfo) throws Exception;

    /**
     * 勾销注册
     * @param serviceInfo
     * @throws Exception
     */
    void unRegister(ServiceInfo serviceInfo) throws Exception;

    /**
     * 动静发现服务
     * @param serviceName
     * @return
     * @throws Exception
     */
    ServiceInfo discovery(String serviceName) throws Exception;
}

ServiceInfo

@Data
public class ServiceInfo {
    private String serviceName;
    private String serviceAddress;
    private int servicePort;
}

ZookeeperRegistryService

@Slf4j
public class ZookeeperRegistryService implements IRegistryService {

    private static final String REGISTRY_PATH="/registry";
    //Curator 中提供的服务注册与发现的组件封装,它对此形象出了 ServiceInstance、// ServiceProvider、ServiceDiscovery 三个接口,通过它咱们能够很轻易的实现 Service Discovery
    private final ServiceDiscovery<ServiceInfo> serviceDiscovery;

    private ILoadBalance<ServiceInstance<ServiceInfo>> loadBalance;

    public ZookeeperRegistryService(String registryAddress) throws Exception {
        CuratorFramework client= CuratorFrameworkFactory
                .newClient(registryAddress,new ExponentialBackoffRetry(1000,3));
        JsonInstanceSerializer<ServiceInfo> serializer=new JsonInstanceSerializer<>(ServiceInfo.class);
        this.serviceDiscovery= ServiceDiscoveryBuilder.builder(ServiceInfo.class)
                .client(client)
                .serializer(serializer)
                .basePath(REGISTRY_PATH)
                .build();
        this.serviceDiscovery.start();
        loadBalance=new RandomLoadBalance();}

    @Override
    public void register(ServiceInfo serviceInfo) throws Exception {log.info("开始注册服务,{}",serviceInfo);
        ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance
                .<ServiceInfo>builder().name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        serviceDiscovery.registerService(serviceInstance);
    }

    @Override
    public void unRegister(ServiceInfo serviceInfo) throws Exception {ServiceInstance<ServiceInfo> serviceInstance=ServiceInstance.<ServiceInfo>builder()
                .name(serviceInfo.getServiceName())
                .address(serviceInfo.getServiceAddress())
                .port(serviceInfo.getServicePort())
                .payload(serviceInfo)
                .build();
        serviceDiscovery.unregisterService(serviceInstance);
    }

    @Override
    public ServiceInfo discovery(String serviceName) throws Exception {
        Collection<ServiceInstance<ServiceInfo>> serviceInstances= serviceDiscovery
                .queryForInstances(serviceName);
        // 通过负载平衡返回某个具体实例
        ServiceInstance<ServiceInfo> serviceInstance=loadBalance.select((List<ServiceInstance<ServiceInfo>>)serviceInstances);
        if(serviceInstance!=null){return serviceInstance.getPayload();
        }
        return null;
    }
}

引入负载平衡算法

因为服务端发现服务时可能有多个,所以须要用到负载平衡算法来实现

ILoadBalance

public interface ILoadBalance<T> {T select(List<T> servers);
}

AbstractLoadBalance

public abstract class AbstractLoadBanalce implements ILoadBalance<ServiceInstance<ServiceInfo>> {

    @Override
    public ServiceInstance<ServiceInfo> select(List<ServiceInstance<ServiceInfo>> servers){if(servers==null||servers.size()==0){return null;}
        if(servers.size()==1){return servers.get(0);
        }
        return doSelect(servers);
    }

    protected abstract ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers);
}

RandomLoadBalance

public class RandomLoadBalance extends AbstractLoadBanalce {
    @Override
    protected ServiceInstance<ServiceInfo> doSelect(List<ServiceInstance<ServiceInfo>> servers) {int length=servers.size();
        Random random=new Random();
        return servers.get(random.nextInt(length));
    }
}

RegistryType

public enum RegistryType {ZOOKEEPER((byte)0),
    EUREKA((byte)1);

    private byte code;

    RegistryType(byte code) {this.code=code;}

    public byte code(){return this.code;}

    public static RegistryType findByCode(byte code) {for (RegistryType rt : RegistryType.values()) {if (rt.code() == code) {return rt;}
        }
        return null;
    }
}

RegistryFactory

public class RegistryFactory {public static IRegistryService createRegistryService(String address,RegistryType registryType){
        IRegistryService registryService=null;
        try {switch (registryType) {
                case ZOOKEEPER:
                    registryService = new ZookeeperRegistryService(address);
                    break;
                case EUREKA:
                    //TODO
                    break;
                default:
                    registryService = new ZookeeperRegistryService(address);
                    break;
            }
        }catch (Exception e){e.printStackTrace();
        }
        return registryService;
    }
}

批改服务端减少服务注册

批改 netty-rpc-protocol 模块,退出注册核心的反对

SpringRpcProviderBean

依照上面 case 标注局部,示意要批改的内容

@Slf4j
public class SpringRpcProviderBean implements InitializingBean, BeanPostProcessor {

    private final int serverPort;
    private final String serverAddress;
    private final IRegistryService registryService; // 批改局部, 减少注册核心实现
    public SpringRpcProviderBean(int serverPort,IRegistryService registryService) throws UnknownHostException {
        this.serverPort = serverPort;
        InetAddress address=InetAddress.getLocalHost();
        this.serverAddress=address.getHostAddress();
        this.registryService=registryService; // 批改局部, 减少注册核心实现
    }

    @Override
    public void afterPropertiesSet() throws Exception {log.info("begin deploy Netty Server to host {},on port {}",this.serverAddress,this.serverPort);
        new Thread(()->{
            try {new NettyServer(this.serverAddress,this.serverPort).startNettyServer();} catch (Exception e) {log.error("start Netty Server Occur Exception,",e);
                e.printStackTrace();}
        }).start();}

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if(bean.getClass().isAnnotationPresent(GpRemoteService.class)){ // 针对存在该注解的服务进行公布
            Method[] methods=bean.getClass().getDeclaredMethods();
            for(Method method: methods){String serviceName=bean.getClass().getInterfaces()[0].getName();
                String key=serviceName+"."+method.getName();
                BeanMethod beanMethod=new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.beanMethodMap.put(key,beanMethod);
                try {
                    // 批改局部, 减少注册核心实现
                    ServiceInfo serviceInfo = new ServiceInfo();
                    serviceInfo.setServiceAddress(this.serverAddress);
                    serviceInfo.setServicePort(this.serverPort);
                    serviceInfo.setServiceName(serviceName);
                    registryService.register(serviceInfo);// 批改局部, 减少注册核心实现
                }catch (Exception e){log.error("register service {} faild",serviceName,e);
                }
            }
        }
        return bean;
    }
}

RpcServerProperties

批改 RpcServerProperties,减少注册核心的配置

@Data
@ConfigurationProperties(prefix = "gp.rpc")
public class RpcServerProperties {

    private int servicePort;

    private byte registerType;

    private String registryAddress;
}

RpcProviderAutoConfiguration

减少注册核心的注入。

@Configuration
@EnableConfigurationProperties(RpcServerProperties.class)
public class RpcProviderAutoConfiguration {

    @Bean
    public SpringRpcProviderBean rpcProviderBean(RpcServerProperties rpcServerProperties) throws UnknownHostException {
        // 增加注册核心
        IRegistryService registryService=RegistryFactory.createRegistryService(rpcServerProperties.getRegistryAddress(), RegistryType.findByCode(rpcServerProperties.getRegisterType()));
        return new SpringRpcProviderBean(rpcServerProperties.getServicePort(),registryService);
    }
}

application.properties

批改 netty-rpc-provider 中的 application.properties。

gp.rpc.servicePort=20880
gp.rpc.registerType=0
gp.rpc.registryAddress=192.168.221.128:2181

批改客户端,减少服务发现

客户端须要批改的中央较多,上面这些批改的代码,都是 netty-rpc-protocol 模块中的类。

RpcClientProperties

减少注册核心类型和注册核心地址的选项

@Data
public class RpcClientProperties {

    private String serviceAddress="192.168.1.102";

    private int servicePort=20880;

    private byte registryType;

    private String registryAddress;

}

批改 NettyClient

本来是动态地址,当初批改成了从注册核心获取地址

@Slf4j
public class NettyClient {
    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
   /* private String serviceAddress;
    private int servicePort;*/
    public NettyClient(){log.info("begin init NettyClient");
        bootstrap=new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new RpcClientInitializer());
       /* this.serviceAddress=serviceAddress;
        this.servicePort=servicePort;*/
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol, IRegistryService registryService) throws Exception {ServiceInfo serviceInfo=registryService.discovery(protocol.getContent().getClassName());
        ChannelFuture future=bootstrap.connect(serviceInfo.getServiceAddress(),serviceInfo.getServicePort()).sync();
        future.addListener(listener->{if(future.isSuccess()){log.info("connect rpc server {} success.",serviceInfo.getServiceAddress());
            }else{log.error("connect rpc server {} failed .",serviceInfo.getServiceAddress());
                future.cause().printStackTrace();
                eventLoopGroup.shutdownGracefully();}
        });
        log.info("begin transfer data");
        future.channel().writeAndFlush(protocol);
    }
}

批改 RpcInvokerProxy

将动态 ip 和地址,批改成 IRegistryService

@Slf4j
public class RpcInvokerProxy implements InvocationHandler {

   /* private String serviceAddress;
    private int servicePort;*/

    IRegistryService registryService;

    public RpcInvokerProxy(IRegistryService registryService) {
       /* this.serviceAddress = serviceAddress;
        this.servicePort = servicePort;*/
        this.registryService=registryService;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {log.info("begin invoke target server");
        // 组装参数
        RpcProtocol<RpcRequest> protocol=new RpcProtocol<>();
        long requestId= RequestHolder.REQUEST_ID.incrementAndGet();
        Header header=new Header(RpcConstant.MAGIC, SerialType.JSON_SERIAL.code(), ReqType.REQUEST.code(),requestId,0);
        protocol.setHeader(header);
        RpcRequest request=new RpcRequest();
        request.setClassName(method.getDeclaringClass().getName());
        request.setMethodName(method.getName());
        request.setParameterTypes(method.getParameterTypes());
        request.setParams(args);
        protocol.setContent(request);
        // 发送申请
        NettyClient nettyClient=new NettyClient();
        // 构建异步数据处理
        RpcFuture<RpcResponse> future=new RpcFuture<>(new DefaultPromise<>(new DefaultEventLoop()));
        RequestHolder.REQUEST_MAP.put(requestId,future);
        nettyClient.sendRequest(protocol,this.registryService);
        return future.getPromise().get().getData();}
}

SpringRpcReferenceBean

批改援用 bean,减少注册核心配置

public class SpringRpcReferenceBean implements FactoryBean<Object> {

    private Class<?> interfaceClass;
    private Object object;
   /* private String serviceAddress;
    private int servicePort;*/
    // 批改减少注册核心
    private byte registryType;
    private String registryAddress;

    @Override
    public Object getObject() throws Exception {return object;}

    public void init(){
        // 批改减少注册核心
        IRegistryService registryService= RegistryFactory.createRegistryService(this.registryAddress, RegistryType.findByCode(this.registryType));
        this.object= Proxy.newProxyInstance(this.interfaceClass.getClassLoader(),
                new Class<?>[]{this.interfaceClass},
                new RpcInvokerProxy(registryService));
    }

    @Override
    public Class<?> getObjectType() {return this.interfaceClass;}

    public void setInterfaceClass(Class<?> interfaceClass) {this.interfaceClass = interfaceClass;}

   /* public void setServiceAddress(String serviceAddress) {this.serviceAddress = serviceAddress;}

    public void setServicePort(int servicePort) {this.servicePort = servicePort;}*/

    public void setRegistryType(byte registryType) {this.registryType = registryType;}

    public void setRegistryAddress(String registryAddress) {this.registryAddress = registryAddress;}
}

SpringRpcReferencePostProcessor

@Slf4j
public class SpringRpcReferencePostProcessor implements ApplicationContextAware, BeanClassLoaderAware, BeanFactoryPostProcessor {
    private ApplicationContext context;
    private ClassLoader classLoader;
    private RpcClientProperties clientProperties;

    public SpringRpcReferencePostProcessor(RpcClientProperties clientProperties) {this.clientProperties = clientProperties;}

    // 保留公布的援用 bean 信息
    private final Map<String, BeanDefinition> rpcRefBeanDefinitions=new ConcurrentHashMap<>();

    @Override
    public void setBeanClassLoader(ClassLoader classLoader) {this.classLoader=classLoader;}
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.context=applicationContext;}

    @Override
    public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {for (String beanDefinitionname:beanFactory.getBeanDefinitionNames()){
            // 遍历 bean 定义,而后获取到加载的 bean,遍历这些 bean 中的字段,是否携带 GpRemoteReference 注解
            // 如果有,则须要构建一个动静代理实现
            BeanDefinition beanDefinition=beanFactory.getBeanDefinition(beanDefinitionname);
            String beanClassName=beanDefinition.getBeanClassName();
            if(beanClassName!=null){Class<?> clazz=ClassUtils.resolveClassName(beanClassName,this.classLoader);
                ReflectionUtils.doWithFields(clazz,this::parseRpcReference);
            }
        }
        // 将 @GpRemoteReference 注解的 bean,构建一个动静代理对象
        BeanDefinitionRegistry registry=(BeanDefinitionRegistry)beanFactory;
        this.rpcRefBeanDefinitions.forEach((beanName,beanDefinition)->{if(context.containsBean(beanName)){log.warn("SpringContext already register bean {}",beanName);
                return;
            }
            registry.registerBeanDefinition(beanName,beanDefinition);
            log.info("registered RpcReferenceBean {} success.",beanName);
        });
    }
    private void parseRpcReference(Field field){GpRemoteReference gpRemoteReference=AnnotationUtils.getAnnotation(field,GpRemoteReference.class);
        if(gpRemoteReference!=null) {BeanDefinitionBuilder builder=BeanDefinitionBuilder.genericBeanDefinition(SpringRpcReferenceBean.class);
            builder.setInitMethodName(RpcConstant.INIT_METHOD_NAME);
            builder.addPropertyValue("interfaceClass",field.getType());
            /*builder.addPropertyValue("serviceAddress",clientProperties.getServiceAddress());
            builder.addPropertyValue("servicePort",clientProperties.getServicePort());*/
            builder.addPropertyValue("registryType",clientProperties.getRegistryType());
            builder.addPropertyValue("registryAddress",clientProperties.getRegistryAddress());
            BeanDefinition beanDefinition=builder.getBeanDefinition();
            rpcRefBeanDefinitions.put(field.getName(),beanDefinition);
        }
    }
}

RpcRefernceAutoConfiguration

@Configuration
public class RpcRefernceAutoConfiguration implements EnvironmentAware{

    @Bean
    public SpringRpcReferencePostProcessor postProcessor(){String address=environment.getProperty("gp.serviceAddress");
        int port=Integer.parseInt(environment.getProperty("gp.servicePort"));
        RpcClientProperties rc=new RpcClientProperties();
        rc.setServiceAddress(address);
        rc.setServicePort(port);
        rc.setRegistryType(Byte.parseByte(environment.getProperty("gp.registryType")));
        rc.setRegistryAddress(environment.getProperty("gp.registryAddress"));
        return new SpringRpcReferencePostProcessor(rc);
    }

    private Environment environment;

    @Override
    public void setEnvironment(Environment environment) {this.environment=environment;}
}

application.properties

批改 netty-rpc-consumer 模块中的配置

gp.serviceAddress=192.168.1.102
gp.servicePort=20880

gp.registryType=0
gp.registryAddress=192.168.221.128:2181

负载平衡的测试

减少一个服务端的启动类,并且批改端口。而后客户端不须要重启的状况下刷新浏览器,即可看到负载平衡的成果。

<center> 图 7 -5</center>

须要源码的同学,请关注公众号[跟着 Mic 学架构],回复关键字[rpc],即可取得

版权申明:本博客所有文章除特地申明外,均采纳 CC BY-NC-SA 4.0 许可协定。转载请注明来自 Mic 带你学架构
如果本篇文章对您有帮忙,还请帮忙点个关注和赞,您的保持是我一直创作的能源。欢送关注同名微信公众号获取更多技术干货!

正文完
 0