从零开始实现mvc框架http-server实现

5次阅读

共计 6829 个字符,预计需要花费 18 分钟才能阅读完成。

1. 前言

Boomvc 完成已经有一段时间了,但拖延到现在才开始记录。写这篇文章主要是回忆和复盘一下思路。如题所讲,Boomvc 是一个 mvc 框架,但是它自带 http server 功能,也就是说不需要 tomcat 之类的 server,可以在一个 jar 包里启动而不需要其他的依赖,这就需要自己去写 http server 的实现,这一篇我就梳理一下实现。

2. server 接口

首先定义一个 server 接口

public interface Server {void init(Boom boom);

    void start();

    void stop();}

这个接口可以有多种实现,可以从 nio socket 开始写,也可以用 netty 这样的非常好用的 network 层的框架实现。在这里我实现了一个简易版的 TinyServer。

public class TinyServer implements Server {private static final Logger logger = LoggerFactory.getLogger(TinyServer.class);

    private Boom boom;

    private Ioc ioc;

    private MvcDispatcher dispatcher;

    private Environment environment;

    private EventExecutorGroup boss;

    private EventExecutorGroup workers;

    private Thread cleanSession;


    @Override
    public void init(Boom boom) {
        ...
        ...
    }

    @Override
    public void start() {this.boss.start();
        this.workers.start();
        this.cleanSession.start();}

    @Override
    public void stop() {this.boss.stop();
        this.workers.stop();}

}

在这里要关注这个地方

private EventExecutorGroup boss;

private EventExecutorGroup workers;

这是我抽象出来的表示线程组,一个 EventExecuteGroup 持有多个 EventExecute,boss 接受连接请求,workers 执行业务逻辑。看一下 EventExecuteGroup 的实现。

public class EventExecutorGroup implements Task {

    private int threadNum;

    private List<EventExecutor> executorList;

    private int index;

    private ThreadFactory threadName;

    private EventExecutorGroup childGroup;

    private MvcDispatcher dispatcher;


    public EventExecutorGroup(int threadNum, ThreadFactory threadName, EventExecutorGroup childGroup, MvcDispatcher dispatcher, SessionManager sessionManager) {
        this.threadNum = threadNum;
        this.threadName = threadName;
        this.childGroup = childGroup;
        this.dispatcher = dispatcher;
        this.executorList = new ArrayList<>(this.threadNum);
        IntStream.of(this.threadNum)
                .forEach(i-> {
                    try {this.executorList.add(new EventExecutor(this.threadName, this.childGroup, this.dispatcher, sessionManager));
                    } catch (IOException e) {throw new RuntimeException(e);
                    }
                });
        this.index = 0;
    }

    public void register(SelectableChannel channel, int ops) throws ClosedChannelException {
        int index1 = 0;
        synchronized (this){
            index1 = this.index%this.threadNum;
            this.index++;
        }
        this.executorList.get(index1).register(channel, ops);
    }

    public void register(SelectableChannel channel, int ops, Object att) throws ClosedChannelException {
        int index1 = 0;
        synchronized (this){
            index1 = this.index%this.threadNum;
            this.index++;
        }
        this.executorList.get(index1).register(channel, ops, att);
    }

    @Override
    public void start() {this.executorList.forEach(e->e.start());
    }

    @Override
    public void stop() {this.executorList.forEach(e->e.stop());
    }
}

3. EventExecutor

EventExecutor 就是一个 io 线程,它持有一个 selector,selector 是 Java NIO 核心组件中的一个,用于检查一个或多个 Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个 channels, 也就是可以管理多个网络链接。io 线程就不断轮询这个 selector,获取多个 selector key,根据这个 key 的状态,比如 accept,read,write 执行不同的逻辑。在这里 EventExecutor 是有多个的,也就是说 selector 有多个,boss EventExecutorGroup 只有一个 EventExecutor,它负责 accept 连接请求,并把接受的连接注册到 workers EventExecutorGroup 里,由 worker 线程处理 read 和 write。

public class EventExecutor {private static final Logger logger = LoggerFactory.getLogger(EventExecutor.class);

    private ThreadFactory threadName;

    private EventExecutorGroup childGroup;

    private Selector selector;

    private Thread ioThread;

    private MvcDispatcher dispatcher;

    private Runnable task;

    private Semaphore semaphore = new Semaphore(1);


    public EventExecutor(ThreadFactory threadName, EventExecutorGroup childGroup, MvcDispatcher dispatcher, SessionManager sessionManager) throws IOException {
        this.threadName = threadName;
        this.childGroup = childGroup;
        this.dispatcher = dispatcher;
        this.selector = Selector.open();
        this.task = new EventLoop(selector, this.childGroup, this.dispatcher, sessionManager, semaphore);
        this.ioThread = threadName.newThread(this.task);
    }

    public void register(SelectableChannel channel, int ops) throws ClosedChannelException {channel.register(this.selector, ops);
    }

    public void register(SelectableChannel channel, int ops, Object att) throws ClosedChannelException {
        /* 将接收的连接注册到 selector 上
        // 发现无法直接注册,一直获取不到锁
        // 这是由于 io 线程正阻塞在 select() 方法上,直接注册会造成死锁
        // 如果这时直接调用 wakeup,有可能还没有注册成功又阻塞了,可以使用信号量从 select 返回后先阻塞,等注册完后在执行
        */
        try {this.semaphore.acquire();
            this.selector.wakeup();
            channel.register(this.selector, ops, att);
        }catch (InterruptedException e){logger.error("", e);
        }finally {this.semaphore.release();
        }
    }

    public void start(){((Task)this.task).start();
        this.ioThread.start();}

    public void stop(){((Task)this.task).stop();}

}

selector 轮询是在 EventLoop 这里实现的。

3. EventLoop

public class EventLoop implements Runnable, Task {private static final Logger logger = LoggerFactory.getLogger(EventLoop.class);

    private Selector selector;

    private EventExecutorGroup childGroup;

    private MvcDispatcher dispatcher;

    private FilterMapping filterMapping;

    private volatile boolean isStart = false;

    private Semaphore semaphore;

    private SessionManager sessionManager;

    public EventLoop(Selector selector, EventExecutorGroup childGroup, MvcDispatcher dispatcher, SessionManager sessionManager, Semaphore semaphore) {...}

    @Override
    public void run() {while(this.isStart){
            try {
                int n = -1;
                try {n = selector.select(1000);
                    semaphore.acquire();} catch (InterruptedException e) {logger.error("", e);
                } finally {semaphore.release();
                }
                if(n<=0)
                    continue;
            } catch (IOException e) {logger.error("", e);
                continue;
            }
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while(iterator.hasNext()){SelectionKey key = iterator.next();
                iterator.remove();
                if(!key.isValid())
                    continue;
                try {if (key.isAcceptable()) {accept(key);
                    }
                    if (key.isReadable()) {read(key);
                    }
                    if (key.isWritable()) {write(key);
                    }
                }catch (Exception e){if(key!=null&&key.isValid()){
                        try {key.channel().close();} catch (IOException e1) {e1.printStackTrace();
                        }
                    }
                    logger.error("", e);
                }
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        this.childGroup.register(socketChannel, SelectionKey.OP_READ, new HttpProtocolParser(socketChannel));
    }

    private void read(SelectionKey key) throws Exception{...}

    private void write(SelectionKey key) throws IOException {...}

    @Override
    public void start() {this.isStart = true;}

    @Override
    public void stop() {this.filterMapping.distory();
        this.isStart = false;
    }

    public Semaphore semaphore(){return this.semaphore;}
}

这就是一个经典的 nio 程序模式,要注意这里

this.childGroup.register(socketChannel, SelectionKey.OP_READ, new HttpProtocolParser(socketChannel));

这就把接受的连接注册到其他 selector 了。
这里我用了一个 nio 程序的多 reactor 模式,主线程中 EventLoop 对象通过 select 监控连接建立事件,收到事件后通过 Acceptor 接收,将新的连接分配给某个子 EventLoop。
子线程中的 EventLoop 完成 read -> 业务处理 -> send 的完整流程。这种模式主线程和子线程的职责非常明确,主线程只负责接收新连接,子线程负责完成后续的业务处理,并且使用多个 selector,read,业务处理,write 不会影响 accept,这对于大量并发连接可以提高 accept 的速度,不会因业务处理使大量连接堆积,这里其实参考了 netty 的思想。如下图

3. 遇到的坑

在写 EventExecutor 的 register 方法是,发现如果直接在 selector 上调用 register 的话,可能会造成死锁。因为 selector 被多个线程访问,当其中一个线程调用 selector.select() 方法时发生阻塞,这个线程会一直持有 selector 的锁,这时另一个线程的 register 方法会被阻塞。如果这时直接调用 wakeup,有可能还没有注册成功又阻塞了,可以使用信号量从 select 返回后先阻塞,等注册完后在执行。具体实现如下

        try {this.semaphore.acquire();
            this.selector.wakeup();
            channel.register(this.selector, ops, att);
        }catch (InterruptedException e){logger.error("", e);
        }finally {this.semaphore.release();
        }
                try {n = selector.select(1000);
                    semaphore.acquire();} catch (InterruptedException e) {logger.error("", e);
                } finally {semaphore.release();
                }

这里 semaphore 就起到一个阻塞 EventLoop 在被唤醒时继续执行的作用,当注册完成时才继续执行。
好了,关于 server 的线程部分就写到这,下一篇写 http 协议解析部分。

正文完
 0