关于后端:开源C库实现HTTP服务器多线程事件模型外挂式跟踪统计

4次阅读

共计 8567 个字符,预计需要花费 22 分钟才能阅读完成。

本文次要介绍 Melon 库中的一种跟踪技术,并以一个 HTTP 服务器的实现和应用为例进行阐明。

对于 Melon 库,这是一个开源的 C 语言库,这个库不依赖其余开源第三方库,因而装置不便,开箱即用。并且中英文文档具体,便于作为工具书进行查阅。Github 仓库:传送门。

闲话少叙,咱们间接上代码:

#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include "mln_core.h"
#include "mln_log.h"
#include "mln_http.h"
#include "mln_iothread.h"
#include "mln_trace.h"


static void *mln_thread_entry(void *args);
static void mln_accept(mln_event_t *ev, int fd, void *data);
static int mln_http_recv_body_handler(mln_http_t *http, mln_chain_t **in, mln_chain_t **nil);
static void mln_recv(mln_event_t *ev, int fd, void *data);
static void mln_quit(mln_event_t *ev, int fd, void *data);
static void mln_send(mln_event_t *ev, int fd, void *data);
static int mln_http_send_body_handler(mln_http_t *http, mln_chain_t **body_head, mln_chain_t **body_tail);

static void mln_accept(mln_event_t *ev, int fd, void *data)
{
    mln_tcp_conn_t *connection;
    mln_http_t *http;
    int connfd;
    socklen_t len;
    struct sockaddr_in addr;

    while (1) {memset(&addr, 0, sizeof(addr));
        len = sizeof(addr);
        connfd = accept(fd, (struct sockaddr *)&addr, &len);
        if (connfd < 0) {if (errno == EAGAIN) break;
            if (errno == EINTR) continue;
            perror("accept");
            exit(1);
        }

        connection = (mln_tcp_conn_t *)malloc(sizeof(mln_tcp_conn_t));
        if (connection == NULL) {fprintf(stderr, "3No memory.\n");
            close(connfd);
            continue;
        }
        if (mln_tcp_conn_init(connection, connfd) < 0) {fprintf(stderr, "4No memory.\n");
            close(connfd);
            free(connection);
            continue;
        }

        http = mln_http_init(connection, NULL, mln_http_recv_body_handler);
        if (http == NULL) {fprintf(stderr, "5No memory.\n");
            mln_tcp_conn_destroy(connection);
            free(connection);
            close(connfd);
            continue;
        }

        if (mln_event_fd_set(ev, \
                             connfd, \
                             M_EV_RECV|M_EV_NONBLOCK, \
                             M_EV_UNLIMITED, \
                             http, \
                             mln_recv) < 0)
        {fprintf(stderr, "6No memory.\n");
            mln_http_destroy(http);
            mln_tcp_conn_destroy(connection);
            free(connection);
            close(connfd);
            continue;
        }
    }
}

static void mln_quit(mln_event_t *ev, int fd, void *data)
{mln_http_t *http = (mln_http_t *)data;
    mln_tcp_conn_t *connection = mln_http_get_connection(http);

    mln_event_fd_set(ev, fd, M_EV_CLR, M_EV_UNLIMITED, NULL, NULL);
    mln_http_destroy(http);
    mln_tcp_conn_destroy(connection);
    free(connection);
    close(fd);
}

static void mln_recv(mln_event_t *ev, int fd, void *data)
{mln_http_t *http = (mln_http_t *)data;
    mln_tcp_conn_t *connection = mln_http_get_connection(http);
    int ret, rc;
    mln_chain_t *c;

    while (1) {ret = mln_tcp_conn_recv(connection, M_C_TYPE_MEMORY);
        if (ret == M_C_FINISH) {continue;} else if (ret == M_C_NOTYET) {c = mln_tcp_conn_remove(connection, M_C_RECV);
            if (c != NULL) {rc = mln_http_parse(http, &c);
                if (c != NULL) {mln_tcp_conn_append_chain(connection, c, NULL, M_C_RECV);
                }
                if (rc == M_HTTP_RET_OK) {return;} else if (rc == M_HTTP_RET_DONE) {mln_send(ev, fd, data);
                } else {fprintf(stderr, "Http parse error. error_code:%u\n", mln_http_get_error(http));
                    mln_quit(ev, fd, data);
                    return;
                }
            }
            break;
        } else if (ret == M_C_CLOSED) {c = mln_tcp_conn_remove(connection, M_C_RECV);
            if (c != NULL) {rc = mln_http_parse(http, &c);
                if (c != NULL) {mln_tcp_conn_append_chain(connection, c, NULL, M_C_RECV);
                }
                if (rc == M_HTTP_RET_ERROR) {fprintf(stderr, "Http parse error. error_code:%u\n", mln_http_get_error(http));
                }
            }
            mln_quit(ev, fd, data);
            return;
        } else if (ret == M_C_ERROR) {mln_quit(ev, fd, data);
            return;
        }
    }
}

static int mln_http_recv_body_handler(mln_http_t *http, mln_chain_t **in, mln_chain_t **nil)
{mln_u32_t method = mln_http_get_method(http);
    mln_trace("s", method == M_HTTP_GET? "GET": "OTHERS");
    if (method == M_HTTP_GET)
        return M_HTTP_RET_DONE;
    mln_http_set_error(http, M_HTTP_NOT_IMPLEMENTED);
    return M_HTTP_RET_ERROR;
}

static void mln_send(mln_event_t *ev, int fd, void *data)
{mln_http_t *http = (mln_http_t *)data;
    mln_tcp_conn_t *connection = mln_http_get_connection(http);
    mln_chain_t *c = mln_tcp_conn_get_head(connection, M_C_SEND);
    int ret;

    if (c == NULL) {mln_http_reset(http);
        mln_http_set_status(http, M_HTTP_OK);
        mln_http_set_version(http, M_HTTP_VERSION_1_0);
        mln_http_set_type(http, M_HTTP_RESPONSE);
        mln_http_set_handler(http, mln_http_send_body_handler);
        mln_chain_t *body_head = NULL, *body_tail = NULL;
        if (mln_http_generate(http, &body_head, &body_tail) == M_HTTP_RET_ERROR) {fprintf(stderr, "mln_http_generate() failed. %u\n", mln_http_get_error(http));
            mln_quit(ev, fd, data);
            return;
        }
        mln_tcp_conn_append_chain(connection, body_head, body_tail, M_C_SEND);
    }

    while ((c = mln_tcp_conn_get_head(connection, M_C_SEND)) != NULL) {ret = mln_tcp_conn_send(connection);
        if (ret == M_C_FINISH) {mln_quit(ev, fd, data);
            break;
        } else if (ret == M_C_NOTYET) {mln_chain_pool_release_all(mln_tcp_conn_remove(connection, M_C_SENT));
            mln_event_fd_set(ev, fd, M_EV_SEND|M_EV_APPEND|M_EV_NONBLOCK, M_EV_UNLIMITED, data, mln_send);
            return;
        } else if (ret == M_C_ERROR) {mln_quit(ev, fd, data);
            return;
        } else {fprintf(stderr, "Shouldn't be here.\n");
            abort();}
    }
}

static int mln_http_send_body_handler(mln_http_t *http, mln_chain_t **body_head, mln_chain_t **body_tail)
{
    mln_u8ptr_t buf;
    mln_alloc_t *pool = mln_http_get_pool(http);
    mln_string_t cttype_key = mln_string("Content-Type");
    mln_string_t cttype_val = mln_string("text/html");

    buf = mln_alloc_m(pool, 5);
    if (buf == NULL) {mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR);
        return M_HTTP_RET_ERROR;
    }
    memcpy(buf, "hello", 5);

    if (mln_http_set_field(http, &cttype_key, &cttype_val) == M_HTTP_RET_ERROR) {mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR);
        return M_HTTP_RET_ERROR;
    }

    mln_string_t ctlen_key = mln_string("Content-Length");
    mln_string_t ctlen_val = mln_string("5");
    if (mln_http_set_field(http, &ctlen_key, &ctlen_val) == M_HTTP_RET_ERROR) {mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR);
        return M_HTTP_RET_ERROR;
    }

    mln_chain_t *c = mln_chain_new(pool);
    if (c == NULL) {mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR);
        return M_HTTP_RET_ERROR;
    }
    mln_buf_t *b = mln_buf_new(pool);
    if (b == NULL) {mln_chain_pool_release(c);
        mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR);
        return M_HTTP_RET_ERROR;
    }
    c->buf = b;
    b->left_pos = b->pos = b->start = buf;
    b->last = b->end = buf + 5;
    b->in_memory = 1;
    b->last_buf = 1;
    b->last_in_chain = 1;

    if (*body_head == NULL) {*body_head = *body_tail = c;} else {(*body_tail)->next = c;
        *body_tail = c;
    }

    return M_HTTP_RET_DONE;
}

static void *mln_thread_entry(void *args)
{mln_event_dispatch((mln_event_t *)args);
    return NULL;
}

int main(int argc, char *argv[])
{
    mln_event_t *ev;
    mln_iothread_t t;
    struct sockaddr_in addr;
    int val = 1, listenfd;
    int nthread = 1;
    mln_u16_t port = 1234;
    mln_s8_t ip[] = "0.0.0.0";
    struct mln_core_attr cattr;
    struct mln_iothread_attr tattr;

    /* init library */
    cattr.argc = 0;
    cattr.argv = NULL;
    cattr.global_init = NULL;
    cattr.master_process = NULL;
    cattr.worker_process = NULL;
    mln_core_init(&cattr);

    /* create event instance */
    if ((ev = mln_event_new()) == NULL) {mln_log(error, "event new error.\n");
        return -1;
    }

    /* set listen fd */
    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd < 0) {mln_log(error, "listen socket error\n");
        return -1;
    }
    if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {mln_log(error, "setsockopt error\n");
        return -1;
    }
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(ip);
    if (bind(listenfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {mln_log(error, "bind error\n");
        return -1;
    }
    if (listen(listenfd, 511) < 0) {mln_log(error, "listen error\n");
        return -1;
    }
    if (mln_event_fd_set(ev, \
                         listenfd, \
                         M_EV_RECV|M_EV_NONBLOCK, \
                         M_EV_UNLIMITED, \
                         NULL, \
                         mln_accept) < 0)
    {mln_log(error, "listen sock set event error\n");
        return -1;
    }

    /* init trace */
    if (mln_trace_init(ev, mln_trace_path()) < 0) {mln_log(error, "trace init failed.\n");
        return -1;
    }

    /* create threads */
    tattr.nthread = nthread;
    tattr.entry = (mln_iothread_entry_t)mln_thread_entry;
    tattr.args = ev;
    tattr.handler = NULL;
    if (mln_iothread_init(&t, &tattr) < 0) {mln_log(error, "iothread init failed\n");
        return -1;
    }

    mln_event_dispatch(ev);
    return 0;
}

在这 329 行代码中,咱们应用 Melon 库实现了一个简略的 HTTP1.1 服务器,这个服务器的行为就是,当收到 GET 申请后会回复一个 200 Hello。

在这个例子中,咱们起了两个线程(主线程 + 1 个工作线程),两个线程都会操作同一个事件对象(ev)。而咱们的 HTTP 服务器的 socket 则是依赖于这个事件对象 ev 的。换言之,事件对象是能够跨线程调度的,且事件模型帮咱们屏蔽了惊群的问题。

这里其实能够只用单线程,只是为了给读者演示和证实事件模型是反对多线程的。

接着,咱们能够看到 mln_trace_init 这个函数在 main 中被调用,用于初始化跟踪(tracing)工作。

题目中之所以称为 外挂式 ,起因是因为,跟踪(tracing)数据是由一个额定的脚本工作进行解决的。尽管脚本工作也依赖于事件模型,但脚本工作不会阻止 socket 事件的触发和解决。 简略来说,所有的句柄、定时器事件与脚本运行是分时解决的,且不会因为脚本中存在死循环而导致其余事件无奈被解决 。此外,如果咱们不启用跟踪脚本,或者不配置跟踪配置项,那么程序是不会投递跟踪信息的(见mln_accept 中的 mln_trace 调用处)。

trace 脚本应用的在 Melon 库中内置的脚本语言 Melang,咱们看一下与本例配套的 Melang 脚本:

sys = Import('sys');

Pipe('subscribe');
beg = sys.time();
sum = [];
while (1) {ret = Pipe('recv');
    if (!ret) {now = sys.time();
       if (now - beg >= 3) {sys.print(sum);
           beg = now;
       } fi
       sys.msleep(10);
    } else {n = sys.size(ret);
       for (i = 0; i < n; ++i) {if (!(sys.has(sum, ret[i][0])))
               sum[ret[i][0]] = 0;
           fi
           sum[ret[i][0]]++;
       }
    }
}
Pipe('unsubscribe');

简略形容一下:咱们会对所有的申请按办法 (method) 的不同进行归类统计,统计不同办法的申请总次数,并每 3 秒输入一次统计后果。

接着,咱们要批改 Melon 的配置文件,将 trace_mode 配置项正文去掉。

留神:如果你是在 Melon 源码目录中启动 http 服务器,那么应该批改的是 Melon/trace/trace.m 文件。否则,应该批改 /usr/local/lib/melang/trace/trace.m 文件,因为解释器会优先查看以后门路下是否有该文件。

上面,咱们来运行程序:

#tty1
$ ./a
#tty2
$ ab -c 100 -n 100 http://127.1:1234/

此时,咱们能够看到 tty1 中输入:

...
[]
[]
[100,]
...

并且随着 ab 被屡次调用,数组中的值也会随之减少。

最初,总结一下这样做的益处。

  1. 采纳这种跟踪模式能够按需开启跟踪和敞开跟踪,甚至能够在程序中调用 trace 模块接口实现动静开启和敞开跟踪模式。
  2. Melang 脚本自身也反对与 MySQL 通信,因而能够在脚本层进行跟踪信息汇总和入库,这样就将统计与性能相拆散,齐全不须要在 C 程序中退出大量统计变量。
正文完
 0