共计 7862 个字符,预计需要花费 20 分钟才能阅读完成。
问题:如何通过 socket 文件描述符实时解析协定数据?
深度思考
从文件描述符是否可能获取足够的数据?(是否肯定等到数量足够(如:音讯头 12 字节),能力开始解析)
-
数据量足够
- 读取 12 字节解析音讯头
- 读取数据填充 payload (length)
-
数据量有余
- 无奈获取音讯头所需数据(如何解决?解析状态如何切换?)
- 无奈获取 payload 残缺数据(如何解决?是否可追加?)
解决方案
策略:尽力获取数据,实时解析
- 即使以后获取 1 字节,也可依据状态进行解析
- 反对不同数据源屡次接力解析(从内存或文件描述符交替获取数据)
充分利用解析器状态信息是实现解决方案的要害
解析器状态切换
状态切换函数
static void InitState(MsgParser *p)
{
p->header = 0;
p->need = sizeof(p->cache);
free(p->msg);
p->msg = NULL;
}
static int ToMidState(MsgParser *p)
{
p->header = 1;
p->need = p->cache.length;
p->msg = malloc(sizeof(p->cache) + p->need);
if (p->msg) {*p->msg = p->cache;}
return !!p->msg;
}
static Message *ToLastState(MsgParser *p)
{
Message *ret = NULL;
if (p->header && !p->need) {
ret = p->msg;
p->msg = NULL;
}
return ret;
}
从文件描述符中获取数据
static int ToRecv(int fd, char *buf, int size)
{
int retry = 0;
int i = 0;
while (i < size) {int len = read(fd, buf + i, size - i);
if (len > 0) {i += len;}
else {if (retry++ > 5) {break;}
usleep(200 * 10000);
}
}
return i;
}
从文件描述符中实时解析音讯头
if (!p->header) {int offset = sizeof(p->cache) - p->need;
int len = ToRecv(fd, (char*)&p->cache + offset/* 计算寄存地位并读取音讯头数据 */, p->need);
if (len == p->need) {ntoh(&p->cache);
if (ToMidState(p)) {ret = MParser_ReadFd(p, fd);
}
} else {p->need -= len;}
}
从文件描述符中获取 payload 数据
if (p->msg) {int len = ToRecv(fd, p->msg->payload, p->need);
p->need -= len;
}
/* 尝试切换到最终状态,如果胜利,则可取得协定音讯;之后切换到初始状态 */
if (ret = ToLastState(p)) {InitState(p);
}
编程试验:从文件描述符解析协定音讯
message.h
#ifndef MESSAGE_H
#define MESSAGE_H
typedef struct message {
unsigned short type;
unsigned short cmd;
unsigned short index;
unsigned short total;
unsigned int length;
unsigned char payload[];}Message;
Message *Message_New(unsigned short type,
unsigned short cmd,
unsigned short index,
unsigned short total,
unsigned char *payload,
unsigned int length);
#endif
message.c
#include "message.h"
#include <malloc.h>
#include <string.h>
Message *Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, unsigned char *payload, unsigned int length)
{Message *ret = malloc(sizeof(Message) + length);
if (ret) {
ret->type = type;
ret->cmd = cmd;
ret->index = index;
ret->total = total;
ret->length = length;
if (payload) {memcpy(ret + 1, payload, length);
}
}
return ret;
}
msg_parser.h
#ifndef MSG_PARSER_H
#define MSG_PARSER_H
#include "message.h"
typedef void MParser;
MParser *MParser_New();
Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length);
Message *MParser_ReadFd(MParser *parser, int fd);
void MParser_Reset(MParser *parse);
void MParser_Del(MParser *parse);
#endif
msg_parser.c
#include <malloc.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>
#include "msg_parser.h"
typedef struct msg_parser {
Message cache;
int header;
int need;
Message *msg;
}MsgParser;
static void InitState(MsgParser *p)
{
p->header = 0;
p->need = sizeof(p->cache);
free(p->msg);
p->msg = NULL;
}
static int ToMidState(MsgParser *p)
{
p->header = 1;
p->need = p->cache.length;
p->msg = malloc(sizeof(p->cache) + p->need);
if (p->msg) {*p->msg = p->cache;}
return !!p->msg;
}
static Message *ToLastState(MsgParser *p)
{
Message *ret = NULL;
if (p->header && !p->need) {
ret = p->msg;
p->msg = NULL;
}
return ret;
}
static void ntoh(Message *m)
{m->type = ntohs(m->type);
m->cmd = ntohs(m->cmd);
m->index = ntohs(m->index);
m->total = ntohs(m->total);
m->length = ntohl(m->length);
}
static int ToRecv(int fd, char *buf, int size)
{
int retry = 0;
int i = 0;
while (i < size) {int len = read(fd, buf + i, size - i);
if (len > 0) {i += len;} else if (len < 0) {break;} else {if (retry++ > 5) {break;}
usleep(200 * 1000);
}
}
return i;
}
MParser *MParser_New()
{MsgParser *ret = calloc(1, sizeof(MsgParser));
InitState(ret);
return ret;
}
Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length)
{
Message *ret = NULL;
MsgParser *p = (MsgParser*)parser;
if (!p || !mem || !length) {return ret;}
if (!p->header) {int len = (p->need < length) ? p->need : length;
int offset = sizeof(p->cache) - p->need;
memcpy((char*)&p->cache + offset, mem, len);
if (p->need == len) {ntoh(&p->cache);
mem += p->need;
length -= p->need;
if (ToMidState(p)) {ret = MParser_ReadMem(p, mem, length);
} else {InitState(p);
}
} else {p->need -= len;}
} else {if (p->msg) {int len = (p->need < length) ? p->need : length;
int offset = p->msg->length - p->need;
memcpy(p->msg->payload + offset, mem, len);
p->need -= len;
if (ret = ToLastState(p)) {InitState(p);
}
}
}
return ret;
}
Message *MParser_ReadFd(MParser *parser, int fd)
{
Message *ret = NULL;
MsgParser *p = (MsgParser*)parser;
if (fd == -1 || !p) {return ret;}
if (!p->header) {int offset = sizeof(p->cache) - p->need;
int len = ToRecv(fd, (char*)&p->cache + offset, p->need);
if (len == p->need) {ntoh(&p->cache);
if (ToMidState(p)) {ret = MParser_ReadFd(p, fd);
}
else {InitState(p);
}
}
else {p->need -= len;}
} else {if (p->msg) {
int offset = p->msg->length - p->need;
int len = ToRecv(fd, p->msg->payload + offset, p->need);
p->need -= len;
}
if (ret = ToLastState(p)) {InitState(p);
}
}
return ret;
}
void MParser_Reset(MParser *parse)
{MsgParser *p = (MsgParser*)parse;
if (p) {InitState(p);
}
}
void MParser_Del(MParser *parse)
{MsgParser *p = (MsgParser*)parse;
if (p) {free(p->msg);
free(p);
}
}
测试一:test.c
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include "msg_parser.h"
int main()
{MParser *p = MParser_New();
char buf[] = {0x00, 0x01, 0x00, 0x02, 0x00};
char another[] = {0x03, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04};
char data[] = {0x11, 0x12, 0x13, 0x14};
Message *m = MParser_ReadMem(p, buf, sizeof(buf));
int i = 0;
if (!m) {printf("parse again...\n");
m = MParser_ReadMem(p, another, sizeof(another));
}
if (!m) {printf("parse again again...\n");
m = MParser_ReadMem(p, data, sizeof(data));
}
printf("m = %p\n", m);
if (m) {printf("type = %d\n", m->type);
printf("cmd = %d\n", m->cmd);
printf("index = %d\n", m->index);
printf("total = %d\n", m->total);
printf("length = %d\n", m->length);
for (i=0; i<m->length; ++i) {printf("0x%02x", m->payload[i]);
}
printf("\n");
free(m);
}
MParser_Del(p);
return 0;
}
输入:
parse again...
parse again again...
m = 0x555e21dd56a0
type = 1
cmd = 2
index = 3
total = 4
length = 4
0x11 0x12 0x13 0x14
测试 2:
client.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <malloc.h>
#include "message.h"
static void hton(Message *m)
{m->type = htons(m->type);
m->cmd = htons(m->cmd);
m->index = htons(m->index);
m->total = htons(m->total);
m->length = htonl(m->length);
}
int main()
{
int sock = 0;
struct sockaddr_in addr = {0};
int i = 0;
char *test = "D.T.Software";
Message *pm = NULL;
sock = socket(PF_INET, SOCK_STREAM, 0);
if (sock == -1) {printf("socket error\n");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(8888);
if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {printf("connect error\n");
return -1;
}
printf("connect success\n");
for (i=0; i<strlen(test); ++i) {char buf[2] = {0};
buf[0] = test[i];
pm = Message_New(128, 129, i, strlen(test), buf, 2);
hton(pm);
send(sock, pm, sizeof(Message) + 2, 0);
free(pm);
}
close(sock);
return 0;
}
server.c
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <malloc.h>
#include "msg_parser.h"
int main()
{
int server = 0;
struct sockaddr_in saddr = {0};
int client = 0;
struct sockaddr_in caddr = {0};
socklen_t asize = 0;
int len = 0;
char buf[32] = {0};
int r = 0;
MParser *parser = MParser_New();
server = socket(PF_INET, SOCK_STREAM, 0);
if (server == -1) {printf("server socket error\n");
return -1;
}
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = htonl(INADDR_ANY);
saddr.sin_port = htons(8888);
if (bind(server, (struct sockaddr*)&saddr, sizeof(saddr)) == -1) {printf("server bind error\n");
return -1;
}
if (listen(server, 1) == -1) {printf("server listen error\n");
return -1;
}
printf("server start success\n");
while (1) {struct tcp_info info = {0};
int l = sizeof(info);
asize = sizeof(caddr);
client = accept(server, (struct sockaddr*)&caddr, &asize);
if (client == -1) {printf("client accept error\n");
return -1;
}
printf("client: %d\n", client);
do {getsockopt(client, IPPROTO_TCP, TCP_INFO, &info, (socklen_t*)&l);
Message *m = MParser_ReadFd(parser, client);
if (m) {printf("type = %d\n", m->type);
printf("cmd = %d\n", m->cmd);
printf("index = %d\n", m->index);
printf("total = %d\n", m->total);
printf("length = %d\n", m->length);
printf("payload = %s\n", m->payload);
printf("\n");
free(m);
}
} while (info.tcpi_state == TCP_ESTABLISHED);
printf("client socket is closed\n");
close(client);
}
close(server);
MParser_Del(parser);
return 0;
}
输入:
server start success
client: 4
type = 128
cmd = 129
index = 0
total = 12
length = 2
payload = D
type = 128
cmd = 129
index = 1
total = 12
length = 2
payload = .
type = 128
cmd = 129
index = 2
total = 12
length = 2
payload = T
client socket is closed
有了协定和协定解析器之后,能够干嘛?
正文完