远程通讯——Mina
目标:介绍基于 Mina 的来实现的远程通信、介绍 dubbo-remoting-mina 内的源码解析。
前言
Apache MINA 是一个网络应用程序框架,可帮助用户轻松开发高性能和高可扩展性的网络应用程序。它通过 Java NIO 在各种传输(如 TCP / IP 和 UDP / IP)上提供抽象的事件驱动异步 API。它通常被称为 NIO 框架库、客户端服务器框架库或者网络套接字库。那么本问就要讲解在 dubbo 项目中,基于 mina 的 API 实现服务端和客户端来完成远程通讯这件事情。
下面是 mina 实现的包结构:
源码分析
(一)MinaChannel
该类继承了 AbstractChannel,是基于 mina 实现的通道。
1. 属性
private static final Logger logger = LoggerFactory.getLogger(MinaChannel.class);
/**
* 通道的 key
*/
private static final String CHANNEL_KEY = MinaChannel.class.getName() + “.CHANNEL”;
/**
* mina 中的一个句柄,表示两个端点之间的连接,与传输类型无关
*/
private final IoSession session;
该类的属性除了封装了一个 CHANNEL_KEY 以外,还用到了 mina 中的 IoSession,它封装着一个连接所需要的方法,比如获得远程地址等。
2.getOrAddChannel
static MinaChannel getOrAddChannel(IoSession session, URL url, ChannelHandler handler) {
// 如果连接 session 为空,则返回空
if (session == null) {
return null;
}
// 获得 MinaChannel 实例
MinaChannel ret = (MinaChannel) session.getAttribute(CHANNEL_KEY);
// 如果不存在,则创建
if (ret == null) {
// 创建一个 MinaChannel 实例
ret = new MinaChannel(session, url, handler);
// 如果两个端点连接
if (session.isConnected()) {
// 把新创建的 MinaChannel 添加到 session 中
MinaChannel old = (MinaChannel) session.setAttribute(CHANNEL_KEY, ret);
// 如果属性的旧值不为空,则重新设置旧值
if (old != null) {
session.setAttribute(CHANNEL_KEY, old);
ret = old;
}
}
}
return ret;
}
该方法是一个获得 MinaChannel 对象的方法,其中每一个 MinaChannel 都会被放在 session 的属性值中。
3.removeChannelIfDisconnected
static void removeChannelIfDisconnected(IoSession session) {
if (session != null && !session.isConnected()) {
session.removeAttribute(CHANNEL_KEY);
}
}
该方法是当没有连接时移除该通道,比较简单。
4.send
@Override
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
boolean success = true;
int timeout = 0;
try {
// 发送消息,返回 future
WriteFuture future = session.write(message);
// 如果已经发送过了
if (sent) {
// 获得延迟时间
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 等待 timeout 的连接时间后查看是否发送成功
success = future.join(timeout);
}
} catch (Throwable e) {
throw new RemotingException(this, “Failed to send message ” + message + ” to ” + getRemoteAddress() + “, cause: ” + e.getMessage(), e);
}
if (!success) {
throw new RemotingException(this, “Failed to send message ” + message + ” to ” + getRemoteAddress()
+ “in timeout(” + timeout + “ms) limit”);
}
}
该方法是最关键的发送消息,其中调用到了 session 的 write 方法,就是 mina 封装的发送消息。并且根据返回的 WriteFuture 对象来判断是否发送成功。
(二)MinaHandler
该类继承了 IoHandlerAdapter,是通道处理器实现类,其中就是 mina 项目中 IoHandler 接口的几个 方法。
/**
* url 对象
*/
private final URL url;
/**
* 通道处理器对象
*/
private final ChannelHandler handler;
该类有两个属性,上述提到的实现 IoHandler 接口方法都是调用了 handler 来实现的,我就举例讲一个,其他的都一样的写法:
@Override
public void sessionOpened(IoSession session) throws Exception {
// 获得 MinaChannel 对象
MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
try {
// 调用接连该通道
handler.connected(channel);
} finally {
// 如果没有连接则移除通道
MinaChannel.removeChannelIfDisconnected(session);
}
}
该方法在 IoHandler 中叫做 sessionOpened,其实就是连接方法,所以调用的是 handler.connected。其他方法也一样,请自行查看。
(三)MinaClient
该类继承了 AbstractClient 类,是基于 mina 实现的客户端类。
1. 属性
/**
* 套接字连接集合
*/
private static final Map<String, SocketConnector> connectors = new ConcurrentHashMap<String, SocketConnector>();
/**
* 连接的 key
*/
private String connectorKey;
/**
* 套接字连接者
*/
private SocketConnector connector;
/**
* 一个句柄
*/
private volatile IoSession session; // volatile, please copy reference to use
该类中的属性都跟 mina 项目中封装类有关系。
2.doOpen
@Override
protected void doOpen() throws Throwable {
// 用 url 来作为 key
connectorKey = getUrl().toFullString();
// 先从集合中取套接字连接
SocketConnector c = connectors.get(connectorKey);
if (c != null) {
connector = c;
// 如果为空
} else {
// set thread pool. 设置线程池
connector = new SocketConnector(Constants.DEFAULT_IO_THREADS,
Executors.newCachedThreadPool(new NamedThreadFactory(“MinaClientWorker”, true)));
// config 获得套接字连接配置
SocketConnectorConfig cfg = (SocketConnectorConfig) connector.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// 启用 TCP_NODELAY
cfg.getSessionConfig().setTcpNoDelay(true);
// 启用 SO_KEEPALIVE
cfg.getSessionConfig().setKeepAlive(true);
int timeout = getConnectTimeout();
// 设置连接超时时间
cfg.setConnectTimeout(timeout < 1000 ? 1 : timeout / 1000);
// set codec.
// 设置编解码器
connector.getFilterChain().addLast(“codec”, new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
// 加入集合
connectors.put(connectorKey, connector);
}
}
该方法是打开客户端,在 mina 中用套接字连接者 connector 来表示。其中的操作就是新建一个 connector,并且设置相应的属性,然后加入集合。
3.doConnect
@Override
protected void doConnect() throws Throwable {
// 连接服务器
ConnectFuture future = connector.connect(getConnectAddress(), new MinaHandler(getUrl(), this));
long start = System.currentTimeMillis();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
// 用于对线程的阻塞和唤醒
final CountDownLatch finish = new CountDownLatch(1); // resolve future.awaitUninterruptibly() dead lock
// 加入监听器
future.addListener(new IoFutureListener() {
@Override
public void operationComplete(IoFuture future) {
try {
// 如果已经读完了
if (future.isReady()) {
// 创建获得该连接的 IoSession 实例
IoSession newSession = future.getSession();
try {
// Close old channel 关闭旧的 session
IoSession oldSession = MinaClient.this.session; // copy reference
if (oldSession != null) {
try {
if (logger.isInfoEnabled()) {
logger.info(“Close old mina channel ” + oldSession + ” on create new mina channel ” + newSession);
}
// 关闭连接
oldSession.close();
} finally {
// 移除通道
MinaChannel.removeChannelIfDisconnected(oldSession);
}
}
} finally {
// 如果 MinaClient 关闭了
if (MinaClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info(“Close new mina channel ” + newSession + “, because the client closed.”);
}
// 关闭 session
newSession.close();
} finally {
MinaClient.this.session = null;
MinaChannel.removeChannelIfDisconnected(newSession);
}
} else {
// 设置新的 session
MinaClient.this.session = newSession;
}
}
}
} catch (Exception e) {
exception.set(e);
} finally {
// 减少数量,释放所有等待的线程
finish.countDown();
}
}
});
try {
// 当前线程等待,直到锁存器倒计数到零,除非线程被中断,或者指定的等待时间过去
finish.await(getConnectTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RemotingException(this, “client(url: ” + getUrl() + “) failed to connect to server ” + getRemoteAddress() + ” client-side timeout ”
+ getConnectTimeout() + “ms (elapsed: ” + (System.currentTimeMillis() – start)
+ “ms) from netty client ” + NetUtils.getLocalHost() + ” using dubbo version ”
+ Version.getVersion() + “, cause: ” + e.getMessage(), e);
}
Throwable e = exception.get();
if (e != null) {
throw e;
}
}
该方法是客户端连接服务器的实现方法。其中用到了 CountDownLatch 来代表完成完成事件,它来做一个线程等待,直到 1 个线程完成上述的动作,也就是连接完成结束,才释放等待的线程。保证每次只有一条线程去连接,解决 future.awaitUninterruptibly()死锁问题。
其他方法请自行查看我写的注释。
(四)MinaServer
该类继承了 AbstractServer,是基于 mina 实现的服务端实现类。
1. 属性
private static final Logger logger = LoggerFactory.getLogger(MinaServer.class);
/**
* 套接字接收者对象
*/
private SocketAcceptor acceptor;
2.doOpen
@Override
protected void doOpen() throws Throwable {
// set thread pool.
// 创建套接字接收者对象
acceptor = new SocketAcceptor(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
Executors.newCachedThreadPool(new NamedThreadFactory(“MinaServerWorker”,
true)));
// config
// 设置配置
SocketAcceptorConfig cfg = (SocketAcceptorConfig) acceptor.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
// set codec. 设置编解码器
acceptor.getFilterChain().addLast(“codec”, new ProtocolCodecFilter(new MinaCodecAdapter(getCodec(), getUrl(), this)));
// 开启服务器
acceptor.bind(getBindAddress(), new MinaHandler(getUrl(), this));
}
该方法是创建服务器,并且打开服务器。关键就是调用了 acceptor 的方法。
3.doClose
@Override
protected void doClose() throws Throwable {
try {
if (acceptor != null) {
// 取消绑定,也就是关闭服务器
acceptor.unbind(getBindAddress());
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
该方法是关闭服务器,就是调用了 acceptor.unbind 方法。
4.getChannels
@Override
public Collection<Channel> getChannels() {
// 获得连接到该服务器到所有连接句柄
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
Collection<Channel> channels = new HashSet<Channel>();
for (IoSession session : sessions) {
if (session.isConnected()) {
// 每次都用一个连接句柄创建一个通道
channels.add(MinaChannel.getOrAddChannel(session, getUrl(), this));
}
}
return channels;
}
该方法是获得所有连接该服务器的通道。
5.getChannel
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
// 获得连接到该服务器到所有连接句柄
Set<IoSession> sessions = acceptor.getManagedSessions(getBindAddress());
// 遍历所有句柄,找到要找的通道
for (IoSession session : sessions) {
if (session.getRemoteAddress().equals(remoteAddress)) {
return MinaChannel.getOrAddChannel(session, getUrl(), this);
}
}
return null;
}
该方法是获得地址对应的单个通道。
(五)MinaTransporter
public class MinaTransporter implements Transporter {
public static final String NAME = “mina”;
@Override
public Server bind(URL url, ChannelHandler handler) throws RemotingException {
// 创建 MinaServer 实例
return new MinaServer(url, handler);
}
@Override
public Client connect(URL url, ChannelHandler handler) throws RemotingException {
// 创建 MinaClient 实例
return new MinaClient(url, handler);
}
}
该类实现了 Transporter 接口,是基于 mina 的传输层实现。可以看到,bind 和 connect 方法分别就是创建了 MinaServer 和 MinaClient 实例。这里我建议查看一下《dubbo 源码解析(九)远程通信——Transport 层》。
(六)MinaCodecAdapter
该类是基于 mina 实现的编解码类,实现了 ProtocolCodecFactory。
1. 属性
/**
* 编码对象
*/
private final ProtocolEncoder encoder = new InternalEncoder();
/**
* 解码对象
*/
private final ProtocolDecoder decoder = new InternalDecoder();
/**
* 编解码器
*/
private final Codec2 codec;
/**
* url 对象
*/
private final URL url;
/**
* 通道处理器对象
*/
private final ChannelHandler handler;
/**
* 缓冲区大小
*/
private final int bufferSize;
属性比较好理解,该编解码器用到了 ProtocolEncoder 和 ProtocolDecoder,而 InternalEncoder 和 InternalDecoder 两个类是该类的内部类,它们实现了 ProtocolEncoder 和 ProtocolDecoder,关键的编解码逻辑在这两个类中实现。
2. 构造方法
public MinaCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
// 如果缓存区大小在 16 字节以内,则设置配置大小,如果不是,则设置 8 字节的缓冲区大小
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
3.InternalEncoder
private class InternalEncoder implements ProtocolEncoder {
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void encode(IoSession session, Object msg, ProtocolEncoderOutput out) throws Exception {
// 动态分配一个 1k 的缓冲区
ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(1024);
// 获得通道
MinaChannel channel = MinaChannel.getOrAddChannel(session, url, handler);
try {
// 编码
codec.encode(channel, buffer, msg);
} finally {
// 检测是否断开连接,如果断开,则移除
MinaChannel.removeChannelIfDisconnected(session);
}
// 写数据到 out 中
out.write(ByteBuffer.wrap(buffer.toByteBuffer()));
out.flush();
}
}
该内部类是编码类,其中的 encode 方法中写到了编码核心调用的是 codec.encode。
4.InternalDecoder
private class InternalDecoder implements ProtocolDecoder {
private ChannelBuffer buffer = ChannelBuffers.EMPTY_BUFFER;
@Override
public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception {
int readable = in.limit();
if (readable <= 0) return;
ChannelBuffer frame;
// 如果缓冲区还有可读字节数
if (buffer.readable()) {
// 如果缓冲区是 DynamicChannelBuffer 类型的
if (buffer instanceof DynamicChannelBuffer) {
// 往 buffer 中写入数据
buffer.writeBytes(in.buf());
frame = buffer;
} else {
// 缓冲区大小
int size = buffer.readableBytes() + in.remaining();
// 动态分配一个缓冲区
frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
// buffer 的数据把写到 frame
frame.writeBytes(buffer, buffer.readableBytes());
// 把流中的数据写到 frame
frame.writeBytes(in.buf());
}
} else {
// 否则是基于 Java NIO 的 ByteBuffer 生成的缓冲区
frame = ChannelBuffers.wrappedBuffer(in.buf());
}
// 获得通道
Channel channel = MinaChannel.getOrAddChannel(session, url, handler);
Object msg;
int savedReadIndex;
try {
do {
// 获得读索引
savedReadIndex = frame.readerIndex();
try {
// 解码
msg = codec.decode(channel, frame);
} catch (Exception e) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw e;
}
// 拆包
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
frame.readerIndex(savedReadIndex);
break;
} else {
if (savedReadIndex == frame.readerIndex()) {
buffer = ChannelBuffers.EMPTY_BUFFER;
throw new Exception(“Decode without read data.”);
}
if (msg != null) {
// 把数据写到输出流里面
out.write(msg);
}
}
} while (frame.readable());
} finally {
// 如果 frame 还有可读数据
if (frame.readable()) {
// 丢弃可读数据
frame.discardReadBytes();
buffer = frame;
} else {
buffer = ChannelBuffers.EMPTY_BUFFER;
}
MinaChannel.removeChannelIfDisconnected(session);
}
}
@Override
public void dispose(IoSession session) throws Exception {
}
@Override
public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception {
}
}
该内部类是解码类,其中 decode 方法中关键的是调用了 codec.decode,其余的操作是利用缓冲区对数据的冲刷流转。
后记
该部分相关的源码解析地址:https://github.com/CrazyHZM/i…
该文章讲解了基于 mina 的来实现的远程通信、介绍 dubbo-remoting-mina 内的源码解析,关键需要对 mina 有所了解。下一篇我会讲解基于 netty3 实现远程通信部分。