问题:如何通过 socket 文件描述符实时解析协定数据?深度思考从文件描述符是否可能获取足够的数据?(是否肯定等到数量足够(如:音讯头12字节),能力开始解析)数据量足够
读取 12 字节解析音讯头读取数据填充 payload (length)数据量有余
无奈获取音讯头所需数据(如何解决?解析状态如何切换?)无奈获取 payload 残缺数据(如何解决?是否可追加?)解决方案策略:尽力获取数据,实时解析即使以后获取 1 字节,也可依据状态进行解析反对不同数据源屡次接力解析(从内存或文件描述符交替获取数据)充分利用解析器状态信息是实现解决方案的要害解析器状态切换
状态切换函数static void InitState(MsgParser *p) { p->header = 0; p->need = sizeof(p->cache); free(p->msg); p->msg = NULL;}static int ToMidState(MsgParser *p){ p->header = 1; p->need = p->cache.length; p->msg = malloc(sizeof(p->cache) + p->need); if (p->msg) { *p->msg = p->cache; } return !!p->msg;}static Message *ToLastState(MsgParser *p){ Message *ret = NULL; if (p->header && !p->need) { ret = p->msg; p->msg = NULL; } return ret;}从文件描述符中获取数据static int ToRecv(int fd, char *buf, int size){ int retry = 0; int i = 0; while (i < size) { int len = read(fd, buf + i, size - i); if (len > 0) { i += len; } else { if (retry++ > 5) { break; } usleep(200 * 10000); } } return i;}从文件描述符中实时解析音讯头if (!p->header) { int offset = sizeof(p->cache) - p->need; int len = ToRecv(fd, (char*)&p->cache + offset/* 计算寄存地位并读取音讯头数据 */, p->need); if (len == p->need) { ntoh(&p->cache); if (ToMidState(p)) { ret = MParser_ReadFd(p, fd); } } else { p->need -= len; }}从文件描述符中获取 payload 数据if (p->msg) { int len = ToRecv(fd, p->msg->payload, p->need); p->need -= len;}/* 尝试切换到最终状态,如果胜利,则可取得协定音讯;之后切换到初始状态 */if (ret = ToLastState(p)) { InitState(p);}编程试验:从文件描述符解析协定音讯message.h#ifndef MESSAGE_H#define MESSAGE_Htypedef struct message { unsigned short type; unsigned short cmd; unsigned short index; unsigned short total; unsigned int length; unsigned char payload[];}Message;Message *Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, unsigned char *payload, unsigned int length);#endifmessage.c#include "message.h"#include <malloc.h>#include <string.h>Message *Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, unsigned char *payload, unsigned int length){ Message *ret = malloc(sizeof(Message) + length); if (ret) { ret->type = type; ret->cmd = cmd; ret->index = index; ret->total = total; ret->length = length; if (payload) { memcpy(ret + 1, payload, length); } } return ret;}msg_parser.h#ifndef MSG_PARSER_H#define MSG_PARSER_H#include "message.h"typedef void MParser;MParser *MParser_New();Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length);Message *MParser_ReadFd(MParser *parser, int fd);void MParser_Reset(MParser *parse);void MParser_Del(MParser *parse);#endifmsg_parser.c#include <malloc.h>#include <string.h>#include <arpa/inet.h>#include <unistd.h>#include "msg_parser.h"typedef struct msg_parser { Message cache; int header; int need; Message *msg;}MsgParser;static void InitState(MsgParser *p){ p->header = 0; p->need = sizeof(p->cache); free(p->msg); p->msg = NULL;}static int ToMidState(MsgParser *p){ p->header = 1; p->need = p->cache.length; p->msg = malloc(sizeof(p->cache) + p->need); if (p->msg) { *p->msg = p->cache; } return !!p->msg;}static Message *ToLastState(MsgParser *p){ Message *ret = NULL; if (p->header && !p->need) { ret = p->msg; p->msg = NULL; } return ret;}static void ntoh(Message *m){ m->type = ntohs(m->type); m->cmd = ntohs(m->cmd); m->index = ntohs(m->index); m->total = ntohs(m->total); m->length = ntohl(m->length); }static int ToRecv(int fd, char *buf, int size){ int retry = 0; int i = 0; while (i < size) { int len = read(fd, buf + i, size - i); if (len > 0) { i += len; } else if (len < 0) { break; } else { if (retry++ > 5) { break; } usleep(200 * 1000); } } return i;}MParser *MParser_New(){ MsgParser *ret = calloc(1, sizeof(MsgParser)); InitState(ret); return ret;}Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length){ Message *ret = NULL; MsgParser *p = (MsgParser*)parser; if (!p || !mem || !length) { return ret; } if (!p->header) { int len = (p->need < length) ? p->need : length; int offset = sizeof(p->cache) - p->need; memcpy((char*)&p->cache + offset, mem, len); if (p->need == len) { ntoh(&p->cache); mem += p->need; length -= p->need; if (ToMidState(p)) { ret = MParser_ReadMem(p, mem, length); } else { InitState(p); } } else { p->need -= len; } } else { if (p->msg) { int len = (p->need < length) ? p->need : length; int offset = p->msg->length - p->need; memcpy(p->msg->payload + offset, mem, len); p->need -= len; if (ret = ToLastState(p)) { InitState(p); } } } return ret;}Message *MParser_ReadFd(MParser *parser, int fd){ Message *ret = NULL; MsgParser *p = (MsgParser*)parser; if (fd == -1 || !p) { return ret; } if (!p->header) { int offset = sizeof(p->cache) - p->need; int len = ToRecv(fd, (char*)&p->cache + offset, p->need); if (len == p->need) { ntoh(&p->cache); if (ToMidState(p)) { ret = MParser_ReadFd(p, fd); } else { InitState(p); } } else { p->need -= len; } } else { if (p->msg) { int offset = p->msg->length - p->need; int len = ToRecv(fd, p->msg->payload + offset, p->need); p->need -= len; } if (ret = ToLastState(p)) { InitState(p); } } return ret;}void MParser_Reset(MParser *parse){ MsgParser *p = (MsgParser*)parse; if (p) { InitState(p); }}void MParser_Del(MParser *parse){ MsgParser *p = (MsgParser*)parse; if (p) { free(p->msg); free(p); }}测试一:test.c#include <stdio.h>#include <unistd.h>#include <stdlib.h>#include "msg_parser.h"int main(){ MParser *p = MParser_New(); char buf[] = {0x00, 0x01, 0x00, 0x02, 0x00}; char another[] = {0x03, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04}; char data[] = {0x11, 0x12, 0x13, 0x14}; Message *m = MParser_ReadMem(p, buf, sizeof(buf)); int i = 0; if (!m) { printf("parse again...\n"); m = MParser_ReadMem(p, another, sizeof(another)); } if (!m) { printf("parse again again...\n"); m = MParser_ReadMem(p, data, sizeof(data)); } printf("m = %p\n", m); if (m) { printf("type = %d\n", m->type); printf("cmd = %d\n", m->cmd); printf("index = %d\n", m->index); printf("total = %d\n", m->total); printf("length = %d\n", m->length); for (i=0; i<m->length; ++i) { printf("0x%02x ", m->payload[i]); } printf("\n"); free(m); } MParser_Del(p); return 0; }输入:parse again...parse again again...m = 0x555e21dd56a0type = 1cmd = 2index = 3total = 4length = 40x11 0x12 0x13 0x14测试2:client.c#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <stdio.h>#include <unistd.h>#include <string.h>#include <malloc.h>#include "message.h"static void hton(Message *m){ m->type = htons(m->type); m->cmd = htons(m->cmd); m->index = htons(m->index); m->total = htons(m->total); m->length = htonl(m->length); }int main(){ int sock = 0; struct sockaddr_in addr = {0}; int i = 0; char *test = "D.T.Software"; Message *pm = NULL; sock = socket(PF_INET, SOCK_STREAM, 0); if (sock == -1) { printf("socket error\n"); return -1; } addr.sin_family = AF_INET; addr.sin_addr.s_addr = inet_addr("127.0.0.1"); addr.sin_port = htons(8888); if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) { printf("connect error\n"); return -1; } printf("connect success\n"); for (i=0; i<strlen(test); ++i) { char buf[2] = {0}; buf[0] = test[i]; pm = Message_New(128, 129, i, strlen(test), buf, 2); hton(pm); send(sock, pm, sizeof(Message) + 2, 0); free(pm); } close(sock); return 0;}server.c#include <sys/types.h>#include <sys/socket.h>#include <netinet/in.h>#include <arpa/inet.h>#include <netinet/tcp.h>#include <stdio.h>#include <unistd.h>#include <string.h>#include <malloc.h>#include "msg_parser.h"int main(){ int server = 0; struct sockaddr_in saddr = {0}; int client = 0; struct sockaddr_in caddr = {0}; socklen_t asize = 0; int len = 0; char buf[32] = {0}; int r = 0; MParser *parser = MParser_New(); server = socket(PF_INET, SOCK_STREAM, 0); if (server == -1) { printf("server socket error\n"); return -1; } saddr.sin_family = AF_INET; saddr.sin_addr.s_addr = htonl(INADDR_ANY); saddr.sin_port = htons(8888); if (bind(server, (struct sockaddr*)&saddr, sizeof(saddr)) == -1) { printf("server bind error\n"); return -1; } if (listen(server, 1) == -1) { printf("server listen error\n"); return -1; } printf("server start success\n"); while (1) { struct tcp_info info = {0}; int l = sizeof(info); asize = sizeof(caddr); client = accept(server, (struct sockaddr*)&caddr, &asize); if (client == -1) { printf("client accept error\n"); return -1; } printf("client: %d\n", client); do { getsockopt(client, IPPROTO_TCP, TCP_INFO, &info, (socklen_t*)&l); Message *m = MParser_ReadFd(parser, client); if (m) { printf("type = %d\n", m->type); printf("cmd = %d\n", m->cmd); printf("index = %d\n", m->index); printf("total = %d\n", m->total); printf("length = %d\n", m->length); printf("payload = %s\n", m->payload); printf("\n"); free(m); } } while (info.tcpi_state == TCP_ESTABLISHED); printf("client socket is closed\n"); close(client); } close(server); MParser_Del(parser); return 0;}输入:server start successclient: 4type = 128cmd = 129index = 0total = 12length = 2payload = Dtype = 128cmd = 129index = 1total = 12length = 2payload = .type = 128cmd = 129index = 2total = 12length = 2payload = Tclient socket is closed有了协定和协定解析器之后,能够干嘛?