问题:如何通过 socket 文件描述符实时解析协定数据?

深度思考

从文件描述符是否可能获取足够的数据?(是否肯定等到数量足够(如:音讯头12字节),能力开始解析)
  • 数据量足够

    • 读取 12 字节解析音讯头
    • 读取数据填充 payload (length)
  • 数据量有余

    • 无奈获取音讯头所需数据(如何解决?解析状态如何切换?)
    • 无奈获取 payload 残缺数据(如何解决?是否可追加?)

解决方案

策略:尽力获取数据,实时解析
  • 即使以后获取 1 字节,也可依据状态进行解析
  • 反对不同数据源屡次接力解析(从内存或文件描述符交替获取数据)
充分利用解析器状态信息是实现解决方案的要害

解析器状态切换

状态切换函数

static void InitState(MsgParser *p) {    p->header = 0;    p->need = sizeof(p->cache);    free(p->msg);    p->msg = NULL;}
static int ToMidState(MsgParser *p){    p->header = 1;    p->need = p->cache.length;        p->msg = malloc(sizeof(p->cache) + p->need);    if (p->msg) {        *p->msg = p->cache;    }    return !!p->msg;}
static Message *ToLastState(MsgParser *p){    Message *ret = NULL;        if (p->header && !p->need) {        ret = p->msg;        p->msg = NULL;    }    return ret;}

从文件描述符中获取数据

static int ToRecv(int fd, char *buf, int size){    int retry = 0;    int i = 0;    while (i < size) {        int len = read(fd, buf + i, size - i);        if (len > 0) {            i += len;        }        else {            if (retry++ > 5) {                break;            }            usleep(200 * 10000);        }    }    return i;}

从文件描述符中实时解析音讯头

if (!p->header) {    int offset = sizeof(p->cache) - p->need;    int len = ToRecv(fd, (char*)&p->cache + offset/* 计算寄存地位并读取音讯头数据 */, p->need);    if (len == p->need) {        ntoh(&p->cache);        if (ToMidState(p)) {            ret = MParser_ReadFd(p, fd);        }    } else {        p->need -= len;    }}

从文件描述符中获取 payload 数据

if (p->msg) {    int len = ToRecv(fd, p->msg->payload, p->need);    p->need -= len;}/* 尝试切换到最终状态,如果胜利,则可取得协定音讯;之后切换到初始状态 */if (ret = ToLastState(p)) {    InitState(p);}

编程试验:从文件描述符解析协定音讯

message.h
#ifndef MESSAGE_H#define MESSAGE_Htypedef struct message {    unsigned short type;    unsigned short cmd;    unsigned short index;    unsigned short total;    unsigned int length;    unsigned char payload[];}Message;Message *Message_New(unsigned short type,                    unsigned short cmd,                    unsigned short index,                    unsigned short total,                    unsigned char *payload,                    unsigned int length);#endif
message.c
#include "message.h"#include <malloc.h>#include <string.h>Message *Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, unsigned char *payload, unsigned int length){    Message *ret = malloc(sizeof(Message) + length);    if (ret) {        ret->type   = type;        ret->cmd    = cmd;        ret->index  = index;        ret->total  = total;        ret->length = length;        if (payload) {            memcpy(ret + 1, payload, length);        }    }    return ret;}
msg_parser.h
#ifndef MSG_PARSER_H#define MSG_PARSER_H#include "message.h"typedef void MParser;MParser *MParser_New();Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length);Message *MParser_ReadFd(MParser *parser, int fd);void MParser_Reset(MParser *parse);void MParser_Del(MParser *parse);#endif
msg_parser.c
#include <malloc.h>#include <string.h>#include <arpa/inet.h>#include <unistd.h>#include "msg_parser.h"typedef struct msg_parser {    Message cache;    int header;    int need;    Message *msg;}MsgParser;static void InitState(MsgParser *p){    p->header = 0;    p->need = sizeof(p->cache);        free(p->msg);    p->msg = NULL;}static int ToMidState(MsgParser *p){    p->header = 1;    p->need = p->cache.length;    p->msg = malloc(sizeof(p->cache) + p->need);    if (p->msg) {        *p->msg = p->cache;    }    return !!p->msg;}static Message *ToLastState(MsgParser *p){    Message *ret = NULL;    if (p->header && !p->need) {        ret = p->msg;        p->msg = NULL;    }    return ret;}static void ntoh(Message *m){    m->type = ntohs(m->type);    m->cmd = ntohs(m->cmd);    m->index = ntohs(m->index);    m->total = ntohs(m->total);    m->length = ntohl(m->length);    }static int ToRecv(int fd, char *buf, int size){    int retry = 0;    int i = 0;    while (i < size) {        int len = read(fd, buf + i, size - i);        if (len > 0) {            i += len;        } else if (len < 0) {            break;        } else {            if (retry++ > 5) {                break;            }            usleep(200 * 1000);        }    }    return i;}MParser *MParser_New(){    MsgParser *ret = calloc(1,  sizeof(MsgParser));    InitState(ret);    return ret;}Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length){    Message *ret = NULL;    MsgParser *p = (MsgParser*)parser;    if (!p || !mem || !length) {        return ret;    }    if (!p->header) {        int len = (p->need < length) ? p->need : length;        int offset = sizeof(p->cache) - p->need;        memcpy((char*)&p->cache + offset, mem, len);        if (p->need == len) {            ntoh(&p->cache);            mem += p->need;            length -= p->need;            if (ToMidState(p)) {                ret = MParser_ReadMem(p, mem, length);            } else {                InitState(p);            }        } else {            p->need -= len;        }    } else {        if (p->msg) {            int len = (p->need < length) ? p->need : length;            int offset = p->msg->length - p->need;            memcpy(p->msg->payload + offset, mem, len);            p->need -= len;            if (ret = ToLastState(p)) {                InitState(p);            }        }      }    return ret;}Message *MParser_ReadFd(MParser *parser, int fd){    Message *ret = NULL;    MsgParser *p = (MsgParser*)parser;    if (fd == -1 || !p) {        return ret;    }    if (!p->header) {        int offset = sizeof(p->cache) - p->need;        int len = ToRecv(fd, (char*)&p->cache + offset, p->need);        if (len == p->need) {            ntoh(&p->cache);            if (ToMidState(p)) {                ret = MParser_ReadFd(p, fd);            }            else {                InitState(p);            }        }        else {            p->need -= len;        }    } else {        if (p->msg) {            int offset = p->msg->length - p->need;            int len = ToRecv(fd, p->msg->payload + offset, p->need);            p->need -= len;        }        if (ret = ToLastState(p)) {            InitState(p);        }    }     return ret;}void MParser_Reset(MParser *parse){    MsgParser *p = (MsgParser*)parse;    if (p) {        InitState(p);    }}void MParser_Del(MParser *parse){    MsgParser *p = (MsgParser*)parse;    if (p) {        free(p->msg);        free(p);    }}
测试一:test.c
#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include "msg_parser.h"int main(){       MParser *p = MParser_New();    char buf[] = {0x00, 0x01, 0x00, 0x02, 0x00};    char another[] = {0x03, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04};    char data[] = {0x11, 0x12, 0x13, 0x14};    Message *m = MParser_ReadMem(p, buf, sizeof(buf));    int i = 0;    if (!m) {        printf("parse again...\n");        m = MParser_ReadMem(p, another, sizeof(another));    }        if (!m) {        printf("parse again again...\n");        m = MParser_ReadMem(p, data, sizeof(data));    }    printf("m = %p\n", m);    if (m) {        printf("type = %d\n", m->type);        printf("cmd = %d\n", m->cmd);        printf("index = %d\n", m->index);        printf("total = %d\n", m->total);        printf("length = %d\n", m->length);        for (i=0; i<m->length; ++i) {            printf("0x%02x ", m->payload[i]);        }        printf("\n");        free(m);    }    MParser_Del(p);    return 0;    }
输入:
parse again...parse again again...m = 0x555e21dd56a0type = 1cmd = 2index = 3total = 4length = 40x11 0x12 0x13 0x14
测试2:
client.c
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <unistd.h>#include <string.h>#include <malloc.h>#include "message.h"static void hton(Message *m){    m->type = htons(m->type);    m->cmd = htons(m->cmd);    m->index = htons(m->index);    m->total = htons(m->total);    m->length = htonl(m->length);    }int main(){    int sock = 0;    struct sockaddr_in addr = {0};    int i = 0;    char *test = "D.T.Software";    Message *pm = NULL;    sock = socket(PF_INET, SOCK_STREAM, 0);    if (sock == -1) {        printf("socket error\n");        return -1;    }    addr.sin_family = AF_INET;    addr.sin_addr.s_addr = inet_addr("127.0.0.1");    addr.sin_port = htons(8888);    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {        printf("connect error\n");        return -1;    }    printf("connect success\n");    for (i=0; i<strlen(test); ++i) {        char buf[2] = {0};        buf[0] = test[i];        pm = Message_New(128, 129, i, strlen(test), buf, 2);        hton(pm);        send(sock, pm, sizeof(Message) + 2, 0);        free(pm);    }        close(sock);    return 0;}
server.c
#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <netinet/tcp.h>#include <stdio.h>#include <unistd.h>#include <string.h>#include <malloc.h>#include "msg_parser.h"int main(){    int server = 0;    struct sockaddr_in saddr = {0};    int client = 0;    struct sockaddr_in caddr = {0};    socklen_t asize = 0;    int len = 0;    char buf[32] = {0};    int r = 0;    MParser *parser = MParser_New();    server = socket(PF_INET, SOCK_STREAM, 0);    if (server == -1) {        printf("server socket error\n");        return -1;    }    saddr.sin_family = AF_INET;    saddr.sin_addr.s_addr = htonl(INADDR_ANY);    saddr.sin_port = htons(8888);    if (bind(server, (struct sockaddr*)&saddr, sizeof(saddr)) == -1) {        printf("server bind error\n");        return -1;    }    if (listen(server, 1) == -1) {        printf("server listen error\n");        return -1;    }    printf("server start success\n");    while (1) {        struct tcp_info info = {0};        int l = sizeof(info);        asize = sizeof(caddr);        client = accept(server, (struct sockaddr*)&caddr, &asize);        if (client == -1) {            printf("client accept error\n");            return -1;        }        printf("client: %d\n", client);        do {            getsockopt(client, IPPROTO_TCP, TCP_INFO, &info, (socklen_t*)&l);            Message *m = MParser_ReadFd(parser, client);            if (m) {                printf("type = %d\n", m->type);                printf("cmd = %d\n", m->cmd);                printf("index = %d\n", m->index);                printf("total = %d\n", m->total);                printf("length = %d\n", m->length);                printf("payload = %s\n", m->payload);                printf("\n");                free(m);            }        } while (info.tcpi_state == TCP_ESTABLISHED);        printf("client socket is closed\n");        close(client);    }    close(server);    MParser_Del(parser);    return 0;}
输入:
server start successclient: 4type = 128cmd = 129index = 0total = 12length = 2payload = Dtype = 128cmd = 129index = 1total = 12length = 2payload = .type = 128cmd = 129index = 2total = 12length = 2payload = Tclient socket is closed

有了协定和协定解析器之后,能够干嘛?