关于微服务:java-从零开始手写-RPC-07timeout-超时处理

27次阅读

共计 10233 个字符,预计需要花费 26 分钟才能阅读完成。

《过时不候》

最漫长的莫过于期待

咱们不可能永远等一个人

就像申请

永远期待响应

超时解决

java 从零开始手写 RPC (01) 基于 socket 实现

java 从零开始手写 RPC (02)-netty4 实现客户端和服务端

java 从零开始手写 RPC (03) 如何实现客户端调用服务端?

java 从零开始手写 RPC (04) 序列化

java 从零开始手写 RPC (05) 基于反射的通用化实现

必要性

后面咱们实现了通用的 rpc,然而存在一个问题,同步获取响应的时候没有超时解决。

如果 server 挂掉了,或者解决太慢,客户端也不可能始终傻傻的等。

当内部的调用超过指定的工夫后,就间接报错,防止无意义的资源耗费。

思路

调用的时候,将开始工夫保留。

获取的时候检测是否超时。

同时创立一个线程,用来检测是否有超时的申请。

实现

思路

调用的时候,将开始工夫保留。

获取的时候检测是否超时。

同时创立一个线程,用来检测是否有超时的申请。

超时检测线程

为了不影响失常业务的性能,咱们另起一个线程检测调用是否曾经超时。

package com.github.houbb.rpc.client.invoke.impl;

import com.github.houbb.heaven.util.common.ArgUtil;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;
import com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory;
import com.github.houbb.rpc.common.support.time.impl.Times;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 超时检测线程
 * @author binbin.hou
 * @since 0.0.7
 */
public class TimeoutCheckThread implements Runnable{

    /**
     * 申请信息
     * @since 0.0.7
     */
    private final ConcurrentHashMap<String, Long> requestMap;

    /**
     * 申请信息
     * @since 0.0.7
     */
    private final ConcurrentHashMap<String, RpcResponse> responseMap;

    /**
     * 新建
     * @param requestMap  申请 Map
     * @param responseMap 后果 map
     * @since 0.0.7
     */
    public TimeoutCheckThread(ConcurrentHashMap<String, Long> requestMap,
                              ConcurrentHashMap<String, RpcResponse> responseMap) {ArgUtil.notNull(requestMap, "requestMap");
        this.requestMap = requestMap;
        this.responseMap = responseMap;
    }

    @Override
    public void run() {for(Map.Entry<String, Long> entry : requestMap.entrySet()) {long expireTime = entry.getValue();
            long currentTime = Times.time();

            if(currentTime > expireTime) {final String key = entry.getKey();
                // 后果设置为超时,从申请 map 中移除
                responseMap.putIfAbsent(key, RpcResponseFactory.timeout());
                requestMap.remove(key);
            }
        }
    }

}

这里次要存储申请,响应的工夫,如果超时,则移除对应的申请。

线程启动

在 DefaultInvokeService 初始化时启动:

final Runnable timeoutThread = new TimeoutCheckThread(requestMap, responseMap);
Executors.newScheduledThreadPool(1)
                .scheduleAtFixedRate(timeoutThread,60, 60, TimeUnit.SECONDS);

DefaultInvokeService

原来的设置后果,获取后果是没有思考工夫的,这里加一下对应的判断。

设置申请工夫

  • 增加申请 addRequest

会将过期的工夫间接放入 map 中。

因为放入是一次操作,查问可能是屡次。

所以工夫在放入的时候计算实现。

@Override
public InvokeService addRequest(String seqId, long timeoutMills) {LOG.info("[Client] start add request for seqId: {}, timeoutMills: {}", seqId,
            timeoutMills);
    final long expireTime = Times.time()+timeoutMills;
    requestMap.putIfAbsent(seqId, expireTime);
    return this;
}

设置申请后果

  • 增加响应 addResponse
  1. 如果 requestMap 中曾经不存在这个申请信息,则阐明可能超时,间接疏忽存入后果。
  2. 此时检测是否呈现超时,超时间接返回超时信息。
  3. 放入信息后,告诉其余期待的所有过程。
@Override
public InvokeService addResponse(String seqId, RpcResponse rpcResponse) {
    // 1. 判断是否无效
    Long expireTime = this.requestMap.get(seqId);
    // 如果为空,可能是这个后果曾经超时了,被定时 job 移除之后,响应后果才过去。间接疏忽
    if(ObjectUtil.isNull(expireTime)) {return this;}

    //2. 判断是否超时
    if(Times.time() > expireTime) {LOG.info("[Client] seqId:{} 信息已超时,间接返回超时后果。", seqId);
        rpcResponse = RpcResponseFactory.timeout();}

    // 这里放入之前,能够增加判断。// 如果 seqId 必须解决申请汇合中,才容许放入。或者间接疏忽抛弃。// 告诉所有期待方
    responseMap.putIfAbsent(seqId, rpcResponse);
    LOG.info("[Client] 获取后果信息,seqId: {}, rpcResponse: {}", seqId, rpcResponse);
    LOG.info("[Client] seqId:{} 信息曾经放入,告诉所有期待方", seqId);
    // 移除对应的 requestMap
    requestMap.remove(seqId);
    LOG.info("[Client] seqId:{} remove from request map", seqId);
    synchronized (this) {this.notifyAll();
    }
    return this;
}

获取申请后果

  • 获取相应 getResponse
  1. 如果后果存在,间接返回响应后果
  2. 否则进入期待。
  3. 期待完结后获取后果。
@Override
public RpcResponse getResponse(String seqId) {
    try {RpcResponse rpcResponse = this.responseMap.get(seqId);
        if(ObjectUtil.isNotNull(rpcResponse)) {LOG.info("[Client] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);
            return rpcResponse;
        }
        // 进入期待
        while (rpcResponse == null) {LOG.info("[Client] seq {} 对应后果为空,进入期待", seqId);
            // 同步期待锁
            synchronized (this) {this.wait();
            }
            rpcResponse = this.responseMap.get(seqId);
            LOG.info("[Client] seq {} 对应后果曾经获取: {}", seqId, rpcResponse);
        }
        return rpcResponse;
    } catch (InterruptedException e) {throw new RpcRuntimeException(e);
    }
}

能够发现获取局部的逻辑没变,因为超时会返回一个超时对象:RpcResponseFactory.timeout();

这是一个非常简单的实现,如下:

package com.github.houbb.rpc.common.rpc.domain.impl;

import com.github.houbb.rpc.common.exception.RpcTimeoutException;
import com.github.houbb.rpc.common.rpc.domain.RpcResponse;

/**
 * 响应工厂类
 * @author binbin.hou
 * @since 0.0.7
 */
public final class RpcResponseFactory {private RpcResponseFactory(){}

    /**
     * 超时异样信息
     * @since 0.0.7
     */
    private static final DefaultRpcResponse TIMEOUT;

    static {TIMEOUT = new DefaultRpcResponse();
        TIMEOUT.error(new RpcTimeoutException());
    }

    /**
     * 获取超时响应后果
     * @return 响应后果
     * @since 0.0.7
     */
    public static RpcResponse timeout() {return TIMEOUT;}

}

响应后果指定一个超时异样,这个异样会在代理处理结果时抛出:

RpcResponse rpcResponse = proxyContext.invokeService().getResponse(seqId);
Throwable error = rpcResponse.error();
if(ObjectUtil.isNotNull(error)) {throw error;}
return rpcResponse.result();

测试代码

服务端

咱们成心把服务端的实现增加沉睡,其余放弃不变。

public class CalculatorServiceImpl implements CalculatorService {public CalculateResponse sum(CalculateRequest request) {int sum = request.getOne()+request.getTwo();

        // 成心沉睡 3s
        try {TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {e.printStackTrace();
        }

        return new CalculateResponse(true, sum);
    }

}

客户端

设置对应的超时工夫为 1S,其余不变:

public static void main(String[] args) {
    // 服务配置信息
    ReferenceConfig<CalculatorService> config = new DefaultReferenceConfig<CalculatorService>();
    config.serviceId(ServiceIdConst.CALC);
    config.serviceInterface(CalculatorService.class);
    config.addresses("localhost:9527");
    // 设置超时工夫为 1S
    config.timeout(1000);

    CalculatorService calculatorService = config.reference();
    CalculateRequest request = new CalculateRequest();
    request.setOne(10);
    request.setTwo(20);

    CalculateResponse response = calculatorService.sum(request);
    System.out.println(response);
}

日志如下:

.log.integration.adaptors.stdout.StdOutExImpl' adapter.
[INFO] [2021-10-05 14:59:40.974] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务开始启动客户端
...
[INFO] [2021-10-05 14:59:42.504] [main] [c.g.h.r.c.c.RpcClient.connect] - RPC 服务启动客户端实现,监听地址 localhost:9527
[INFO] [2021-10-05 14:59:42.533] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call remote with request: DefaultRpcRequest{seqId='62e126d9a0334399904509acf8dfe0bb', createTime=1633417182525, serviceId='calc', methodName='sum', paramTypeNames=[com.github.houbb.rpc.server.facade.model.CalculateRequest], paramValues=[CalculateRequest{one=10, two=20}]}
[INFO] [2021-10-05 14:59:42.534] [main] [c.g.h.r.c.i.i.DefaultInvokeService.addRequest] - [Client] start add request for seqId: 62e126d9a0334399904509acf8dfe0bb, timeoutMills: 1000
[INFO] [2021-10-05 14:59:42.535] [main] [c.g.h.r.c.p.ReferenceProxy.invoke] - [Client] start call channel id: 00e04cfffe360988-000004bc-00000000-1178e1265e903c4c-7975626f
...
Exception in thread "main" com.github.houbb.rpc.common.exception.RpcTimeoutException
    at com.github.houbb.rpc.common.rpc.domain.impl.RpcResponseFactory.<clinit>(RpcResponseFactory.java:23)
    at com.github.houbb.rpc.client.invoke.impl.DefaultInvokeService.addResponse(DefaultInvokeService.java:72)
    at com.github.houbb.rpc.client.handler.RpcClientHandler.channelRead0(RpcClientHandler.java:43)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:241)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
...
[INFO] [2021-10-05 14:59:45.615] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb 信息已超时,间接返回超时后果。[INFO] [2021-10-05 14:59:45.617] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] 获取后果信息,seqId: 62e126d9a0334399904509acf8dfe0bb, rpcResponse: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null}
[INFO] [2021-10-05 14:59:45.617] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb 信息曾经放入,告诉所有期待方
[INFO] [2021-10-05 14:59:45.618] [nioEventLoopGroup-2-1] [c.g.h.r.c.i.i.DefaultInvokeService.addResponse] - [Client] seqId:62e126d9a0334399904509acf8dfe0bb remove from request map
[INFO] [2021-10-05 14:59:45.618] [nioEventLoopGroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :DefaultRpcResponse{seqId='62e126d9a0334399904509acf8dfe0bb', error=null, result=CalculateResponse{success=true, sum=30}}
[INFO] [2021-10-05 14:59:45.619] [main] [c.g.h.r.c.i.i.DefaultInvokeService.getResponse] - [Client] seq 62e126d9a0334399904509acf8dfe0bb 对应后果曾经获取: DefaultRpcResponse{seqId='null', error=com.github.houbb.rpc.common.exception.RpcTimeoutException, result=null}
...

能够发现,超时异样。

不足之处

对于超时的解决能够拓展为双向的,比方服务端也能够指定超时限度,防止资源的节约。

小结

为了便于大家学习,以上源码曾经开源:

https://github.com/houbb/rpc

心愿本文对你有所帮忙,如果喜爱,欢送点赞珍藏转发一波。

我是老马,期待与你的下次重逢。

正文完
 0