本文次要介绍Melon库中的一种跟踪技术,并以一个HTTP服务器的实现和应用为例进行阐明。
对于Melon库,这是一个开源的C语言库,这个库不依赖其余开源第三方库,因而装置不便,开箱即用。并且中英文文档具体,便于作为工具书进行查阅。Github仓库:传送门。
闲话少叙,咱们间接上代码:
#include <stdio.h>#include <stdlib.h>#include <sys/socket.h>#include <netinet/in.h>#include <errno.h>#include <arpa/inet.h>#include <sys/time.h>#include "mln_core.h"#include "mln_log.h"#include "mln_http.h"#include "mln_iothread.h"#include "mln_trace.h"static void *mln_thread_entry(void *args);static void mln_accept(mln_event_t *ev, int fd, void *data);static int mln_http_recv_body_handler(mln_http_t *http, mln_chain_t **in, mln_chain_t **nil);static void mln_recv(mln_event_t *ev, int fd, void *data);static void mln_quit(mln_event_t *ev, int fd, void *data);static void mln_send(mln_event_t *ev, int fd, void *data);static int mln_http_send_body_handler(mln_http_t *http, mln_chain_t **body_head, mln_chain_t **body_tail);static void mln_accept(mln_event_t *ev, int fd, void *data){ mln_tcp_conn_t *connection; mln_http_t *http; int connfd; socklen_t len; struct sockaddr_in addr; while (1) { memset(&addr, 0, sizeof(addr)); len = sizeof(addr); connfd = accept(fd, (struct sockaddr *)&addr, &len); if (connfd < 0) { if (errno == EAGAIN) break; if (errno == EINTR) continue; perror("accept"); exit(1); } connection = (mln_tcp_conn_t *)malloc(sizeof(mln_tcp_conn_t)); if (connection == NULL) { fprintf(stderr, "3No memory.\n"); close(connfd); continue; } if (mln_tcp_conn_init(connection, connfd) < 0) { fprintf(stderr, "4No memory.\n"); close(connfd); free(connection); continue; } http = mln_http_init(connection, NULL, mln_http_recv_body_handler); if (http == NULL) { fprintf(stderr, "5No memory.\n"); mln_tcp_conn_destroy(connection); free(connection); close(connfd); continue; } if (mln_event_fd_set(ev, \ connfd, \ M_EV_RECV|M_EV_NONBLOCK, \ M_EV_UNLIMITED, \ http, \ mln_recv) < 0) { fprintf(stderr, "6No memory.\n"); mln_http_destroy(http); mln_tcp_conn_destroy(connection); free(connection); close(connfd); continue; } }}static void mln_quit(mln_event_t *ev, int fd, void *data){ mln_http_t *http = (mln_http_t *)data; mln_tcp_conn_t *connection = mln_http_get_connection(http); mln_event_fd_set(ev, fd, M_EV_CLR, M_EV_UNLIMITED, NULL, NULL); mln_http_destroy(http); mln_tcp_conn_destroy(connection); free(connection); close(fd);}static void mln_recv(mln_event_t *ev, int fd, void *data){ mln_http_t *http = (mln_http_t *)data; mln_tcp_conn_t *connection = mln_http_get_connection(http); int ret, rc; mln_chain_t *c; while (1) { ret = mln_tcp_conn_recv(connection, M_C_TYPE_MEMORY); if (ret == M_C_FINISH) { continue; } else if (ret == M_C_NOTYET) { c = mln_tcp_conn_remove(connection, M_C_RECV); if (c != NULL) { rc = mln_http_parse(http, &c); if (c != NULL) { mln_tcp_conn_append_chain(connection, c, NULL, M_C_RECV); } if (rc == M_HTTP_RET_OK) { return; } else if (rc == M_HTTP_RET_DONE) { mln_send(ev, fd, data); } else { fprintf(stderr, "Http parse error. error_code:%u\n", mln_http_get_error(http)); mln_quit(ev, fd, data); return; } } break; } else if (ret == M_C_CLOSED) { c = mln_tcp_conn_remove(connection, M_C_RECV); if (c != NULL) { rc = mln_http_parse(http, &c); if (c != NULL) { mln_tcp_conn_append_chain(connection, c, NULL, M_C_RECV); } if (rc == M_HTTP_RET_ERROR) { fprintf(stderr, "Http parse error. error_code:%u\n", mln_http_get_error(http)); } } mln_quit(ev, fd, data); return; } else if (ret == M_C_ERROR) { mln_quit(ev, fd, data); return; } }}static int mln_http_recv_body_handler(mln_http_t *http, mln_chain_t **in, mln_chain_t **nil){ mln_u32_t method = mln_http_get_method(http); mln_trace("s", method == M_HTTP_GET? "GET": "OTHERS"); if (method == M_HTTP_GET) return M_HTTP_RET_DONE; mln_http_set_error(http, M_HTTP_NOT_IMPLEMENTED); return M_HTTP_RET_ERROR;}static void mln_send(mln_event_t *ev, int fd, void *data){ mln_http_t *http = (mln_http_t *)data; mln_tcp_conn_t *connection = mln_http_get_connection(http); mln_chain_t *c = mln_tcp_conn_get_head(connection, M_C_SEND); int ret; if (c == NULL) { mln_http_reset(http); mln_http_set_status(http, M_HTTP_OK); mln_http_set_version(http, M_HTTP_VERSION_1_0); mln_http_set_type(http, M_HTTP_RESPONSE); mln_http_set_handler(http, mln_http_send_body_handler); mln_chain_t *body_head = NULL, *body_tail = NULL; if (mln_http_generate(http, &body_head, &body_tail) == M_HTTP_RET_ERROR) { fprintf(stderr, "mln_http_generate() failed. %u\n", mln_http_get_error(http)); mln_quit(ev, fd, data); return; } mln_tcp_conn_append_chain(connection, body_head, body_tail, M_C_SEND); } while ((c = mln_tcp_conn_get_head(connection, M_C_SEND)) != NULL) { ret = mln_tcp_conn_send(connection); if (ret == M_C_FINISH) { mln_quit(ev, fd, data); break; } else if (ret == M_C_NOTYET) { mln_chain_pool_release_all(mln_tcp_conn_remove(connection, M_C_SENT)); mln_event_fd_set(ev, fd, M_EV_SEND|M_EV_APPEND|M_EV_NONBLOCK, M_EV_UNLIMITED, data, mln_send); return; } else if (ret == M_C_ERROR) { mln_quit(ev, fd, data); return; } else { fprintf(stderr, "Shouldn't be here.\n"); abort(); } }}static int mln_http_send_body_handler(mln_http_t *http, mln_chain_t **body_head, mln_chain_t **body_tail){ mln_u8ptr_t buf; mln_alloc_t *pool = mln_http_get_pool(http); mln_string_t cttype_key = mln_string("Content-Type"); mln_string_t cttype_val = mln_string("text/html"); buf = mln_alloc_m(pool, 5); if (buf == NULL) { mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR); return M_HTTP_RET_ERROR; } memcpy(buf, "hello", 5); if (mln_http_set_field(http, &cttype_key, &cttype_val) == M_HTTP_RET_ERROR) { mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR); return M_HTTP_RET_ERROR; } mln_string_t ctlen_key = mln_string("Content-Length"); mln_string_t ctlen_val = mln_string("5"); if (mln_http_set_field(http, &ctlen_key, &ctlen_val) == M_HTTP_RET_ERROR) { mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR); return M_HTTP_RET_ERROR; } mln_chain_t *c = mln_chain_new(pool); if (c == NULL) { mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR); return M_HTTP_RET_ERROR; } mln_buf_t *b = mln_buf_new(pool); if (b == NULL) { mln_chain_pool_release(c); mln_http_set_error(http, M_HTTP_INTERNAL_SERVER_ERROR); return M_HTTP_RET_ERROR; } c->buf = b; b->left_pos = b->pos = b->start = buf; b->last = b->end = buf + 5; b->in_memory = 1; b->last_buf = 1; b->last_in_chain = 1; if (*body_head == NULL) { *body_head = *body_tail = c; } else { (*body_tail)->next = c; *body_tail = c; } return M_HTTP_RET_DONE;}static void *mln_thread_entry(void *args){ mln_event_dispatch((mln_event_t *)args); return NULL;}int main(int argc, char *argv[]){ mln_event_t *ev; mln_iothread_t t; struct sockaddr_in addr; int val = 1, listenfd; int nthread = 1; mln_u16_t port = 1234; mln_s8_t ip[] = "0.0.0.0"; struct mln_core_attr cattr; struct mln_iothread_attr tattr; /* init library */ cattr.argc = 0; cattr.argv = NULL; cattr.global_init = NULL; cattr.master_process = NULL; cattr.worker_process = NULL; mln_core_init(&cattr); /* create event instance */ if ((ev = mln_event_new()) == NULL) { mln_log(error, "event new error.\n"); return -1; } /* set listen fd */ listenfd = socket(AF_INET, SOCK_STREAM, 0); if (listenfd < 0) { mln_log(error, "listen socket error\n"); return -1; } if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { mln_log(error, "setsockopt error\n"); return -1; } addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip); if (bind(listenfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { mln_log(error, "bind error\n"); return -1; } if (listen(listenfd, 511) < 0) { mln_log(error, "listen error\n"); return -1; } if (mln_event_fd_set(ev, \ listenfd, \ M_EV_RECV|M_EV_NONBLOCK, \ M_EV_UNLIMITED, \ NULL, \ mln_accept) < 0) { mln_log(error, "listen sock set event error\n"); return -1; } /* init trace */ if (mln_trace_init(ev, mln_trace_path()) < 0) { mln_log(error, "trace init failed.\n"); return -1; } /* create threads */ tattr.nthread = nthread; tattr.entry = (mln_iothread_entry_t)mln_thread_entry; tattr.args = ev; tattr.handler = NULL; if (mln_iothread_init(&t, &tattr) < 0) { mln_log(error, "iothread init failed\n"); return -1; } mln_event_dispatch(ev); return 0;}
在这329行代码中,咱们应用Melon库实现了一个简略的HTTP1.1服务器,这个服务器的行为就是,当收到GET申请后会回复一个200 Hello。
在这个例子中,咱们起了两个线程(主线程+1个工作线程),两个线程都会操作同一个事件对象(ev
)。而咱们的HTTP服务器的socket则是依赖于这个事件对象ev
的。换言之,事件对象是能够跨线程调度的,且事件模型帮咱们屏蔽了惊群的问题。
这里其实能够只用单线程,只是为了给读者演示和证实事件模型是反对多线程的。
接着,咱们能够看到mln_trace_init
这个函数在main
中被调用,用于初始化跟踪(tracing)工作。
题目中之所以称为外挂式,起因是因为,跟踪(tracing)数据是由一个额定的脚本工作进行解决的。尽管脚本工作也依赖于事件模型,但脚本工作不会阻止socket事件的触发和解决。简略来说,所有的句柄、定时器事件与脚本运行是分时解决的,且不会因为脚本中存在死循环而导致其余事件无奈被解决。此外,如果咱们不启用跟踪脚本,或者不配置跟踪配置项,那么程序是不会投递跟踪信息的(见mln_accept
中的mln_trace
调用处)。
trace脚本应用的在Melon库中内置的脚本语言Melang,咱们看一下与本例配套的Melang脚本:
sys = Import('sys');Pipe('subscribe');beg = sys.time();sum = [];while (1) { ret = Pipe('recv'); if (!ret) { now = sys.time(); if (now - beg >= 3) { sys.print(sum); beg = now; } fi sys.msleep(10); } else { n = sys.size(ret); for (i = 0; i < n; ++i) { if (!(sys.has(sum, ret[i][0]))) sum[ret[i][0]] = 0; fi sum[ret[i][0]]++; } }}Pipe('unsubscribe');
简略形容一下:咱们会对所有的申请按办法(method)的不同进行归类统计,统计不同办法的申请总次数,并每3秒输入一次统计后果。
接着,咱们要批改Melon的配置文件,将trace_mode
配置项正文去掉。
留神:如果你是在Melon源码目录中启动http服务器,那么应该批改的是Melon/trace/trace.m
文件。否则,应该批改/usr/local/lib/melang/trace/trace.m
文件,因为解释器会优先查看以后门路下是否有该文件。
上面,咱们来运行程序:
#tty1$ ./a
#tty2$ ab -c 100 -n 100 http://127.1:1234/
此时,咱们能够看到tty1中输入:
...[][][100, ]...
并且随着ab
被屡次调用,数组中的值也会随之减少。
最初,总结一下这样做的益处。
- 采纳这种跟踪模式能够按需开启跟踪和敞开跟踪,甚至能够在程序中调用trace模块接口实现动静开启和敞开跟踪模式。
- Melang脚本自身也反对与MySQL通信,因而能够在脚本层进行跟踪信息汇总和入库,这样就将统计与性能相拆散,齐全不须要在C程序中退出大量统计变量。