关于c:Linux网络开发必学教程10应用协议解析模块下

30次阅读

共计 7862 个字符,预计需要花费 20 分钟才能阅读完成。

问题:如何通过 socket 文件描述符实时解析协定数据?

深度思考

从文件描述符是否可能获取足够的数据?(是否肯定等到数量足够(如:音讯头 12 字节),能力开始解析)

  • 数据量足够

    • 读取 12 字节解析音讯头
    • 读取数据填充 payload (length)
  • 数据量有余

    • 无奈获取音讯头所需数据(如何解决?解析状态如何切换?)
    • 无奈获取 payload 残缺数据(如何解决?是否可追加?)

解决方案

策略:尽力获取数据,实时解析

  • 即使以后获取 1 字节,也可依据状态进行解析
  • 反对不同数据源屡次接力解析(从内存或文件描述符交替获取数据)
 充分利用解析器状态信息是实现解决方案的要害 

解析器状态切换

状态切换函数

static void InitState(MsgParser *p) 
{
    p->header = 0;
    p->need = sizeof(p->cache);
    free(p->msg);
    p->msg = NULL;
}
static int ToMidState(MsgParser *p)
{
    p->header = 1;
    p->need = p->cache.length;
    
    p->msg = malloc(sizeof(p->cache) + p->need);

    if (p->msg) {*p->msg = p->cache;}

    return !!p->msg;
}
static Message *ToLastState(MsgParser *p)
{
    Message *ret = NULL;
    
    if (p->header && !p->need) {
        ret = p->msg;
        p->msg = NULL;
    }

    return ret;
}

从文件描述符中获取数据

static int ToRecv(int fd, char *buf, int size)
{
    int retry = 0;
    int i = 0;

    while (i < size) {int len = read(fd, buf + i, size - i);

        if (len > 0) {i += len;}
        else {if (retry++ > 5) {break;}

            usleep(200 * 10000);
        }
    }

    return i;
}

从文件描述符中实时解析音讯头

if (!p->header) {int offset = sizeof(p->cache) - p->need;
    int len = ToRecv(fd, (char*)&p->cache + offset/* 计算寄存地位并读取音讯头数据 */, p->need);

    if (len == p->need) {ntoh(&p->cache);

        if (ToMidState(p)) {ret = MParser_ReadFd(p, fd);
        }
    } else {p->need -= len;}
}

从文件描述符中获取 payload 数据

if (p->msg) {int len = ToRecv(fd, p->msg->payload, p->need);
    p->need -= len;
}

/* 尝试切换到最终状态,如果胜利,则可取得协定音讯;之后切换到初始状态 */
if (ret = ToLastState(p)) {InitState(p);
}

编程试验:从文件描述符解析协定音讯

message.h

#ifndef MESSAGE_H
#define MESSAGE_H

typedef struct message {
    unsigned short type;
    unsigned short cmd;
    unsigned short index;
    unsigned short total;
    unsigned int length;
    unsigned char payload[];}Message;

Message *Message_New(unsigned short type,
                    unsigned short cmd,
                    unsigned short index,
                    unsigned short total,
                    unsigned char *payload,
                    unsigned int length);

#endif

message.c

#include "message.h"

#include <malloc.h>
#include <string.h>

Message *Message_New(unsigned short type, unsigned short cmd, unsigned short index, unsigned short total, unsigned char *payload, unsigned int length)
{Message *ret = malloc(sizeof(Message) + length);

    if (ret) {
        ret->type   = type;
        ret->cmd    = cmd;
        ret->index  = index;
        ret->total  = total;
        ret->length = length;

        if (payload) {memcpy(ret + 1, payload, length);
        }
    }

    return ret;
}

msg_parser.h

#ifndef MSG_PARSER_H
#define MSG_PARSER_H

#include "message.h"

typedef void MParser;

MParser *MParser_New();
Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length);
Message *MParser_ReadFd(MParser *parser, int fd);
void MParser_Reset(MParser *parse);
void MParser_Del(MParser *parse);

#endif

msg_parser.c

#include <malloc.h>
#include <string.h>
#include <arpa/inet.h>
#include <unistd.h>

#include "msg_parser.h"

typedef struct msg_parser {
    Message cache;
    int header;
    int need;
    Message *msg;
}MsgParser;

static void InitState(MsgParser *p)
{
    p->header = 0;
    p->need = sizeof(p->cache);
    
    free(p->msg);

    p->msg = NULL;
}

static int ToMidState(MsgParser *p)
{
    p->header = 1;
    p->need = p->cache.length;

    p->msg = malloc(sizeof(p->cache) + p->need);

    if (p->msg) {*p->msg = p->cache;}

    return !!p->msg;
}

static Message *ToLastState(MsgParser *p)
{
    Message *ret = NULL;

    if (p->header && !p->need) {
        ret = p->msg;
        p->msg = NULL;
    }

    return ret;
}

static void ntoh(Message *m)
{m->type = ntohs(m->type);
    m->cmd = ntohs(m->cmd);
    m->index = ntohs(m->index);
    m->total = ntohs(m->total);
    m->length = ntohl(m->length);    
}

static int ToRecv(int fd, char *buf, int size)
{
    int retry = 0;
    int i = 0;

    while (i < size) {int len = read(fd, buf + i, size - i);
        if (len > 0) {i += len;} else if (len < 0) {break;} else {if (retry++ > 5) {break;}

            usleep(200 * 1000);
        }
    }

    return i;
}

MParser *MParser_New()
{MsgParser *ret = calloc(1,  sizeof(MsgParser));

    InitState(ret);

    return ret;
}

Message *MParser_ReadMem(MParser *parser, unsigned char *mem, unsigned int length)
{
    Message *ret = NULL;
    MsgParser *p = (MsgParser*)parser;

    if (!p || !mem || !length) {return ret;}

    if (!p->header) {int len = (p->need < length) ? p->need : length;
        int offset = sizeof(p->cache) - p->need;

        memcpy((char*)&p->cache + offset, mem, len);

        if (p->need == len) {ntoh(&p->cache);

            mem += p->need;
            length -= p->need;

            if (ToMidState(p)) {ret = MParser_ReadMem(p, mem, length);
            } else {InitState(p);
            }
        } else {p->need -= len;}
    } else {if (p->msg) {int len = (p->need < length) ? p->need : length;
            int offset = p->msg->length - p->need;

            memcpy(p->msg->payload + offset, mem, len);

            p->need -= len;

            if (ret = ToLastState(p)) {InitState(p);
            }
        }  
    }

    return ret;
}

Message *MParser_ReadFd(MParser *parser, int fd)
{
    Message *ret = NULL;
    MsgParser *p = (MsgParser*)parser;

    if (fd == -1 || !p) {return ret;}

    if (!p->header) {int offset = sizeof(p->cache) - p->need;
        int len = ToRecv(fd, (char*)&p->cache + offset, p->need);

        if (len == p->need) {ntoh(&p->cache);
            if (ToMidState(p)) {ret = MParser_ReadFd(p, fd);
            }
            else {InitState(p);
            }
        }
        else {p->need -= len;}
    } else {if (p->msg) {
            int offset = p->msg->length - p->need;
            int len = ToRecv(fd, p->msg->payload + offset, p->need);

            p->need -= len;
        }

        if (ret = ToLastState(p)) {InitState(p);
        }
    } 

    return ret;
}

void MParser_Reset(MParser *parse)
{MsgParser *p = (MsgParser*)parse;

    if (p) {InitState(p);
    }
}

void MParser_Del(MParser *parse)
{MsgParser *p = (MsgParser*)parse;

    if (p) {free(p->msg);
        free(p);
    }
}

测试一:test.c

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>

#include "msg_parser.h"

int main()
{MParser *p = MParser_New();
    char buf[] = {0x00, 0x01, 0x00, 0x02, 0x00};
    char another[] = {0x03, 0x00, 0x04, 0x00, 0x00, 0x00, 0x04};
    char data[] = {0x11, 0x12, 0x13, 0x14};
    Message *m = MParser_ReadMem(p, buf, sizeof(buf));
    int i = 0;

    if (!m) {printf("parse again...\n");

        m = MParser_ReadMem(p, another, sizeof(another));
    }

        if (!m) {printf("parse again again...\n");

        m = MParser_ReadMem(p, data, sizeof(data));
    }

    printf("m = %p\n", m);

    if (m) {printf("type = %d\n", m->type);
        printf("cmd = %d\n", m->cmd);
        printf("index = %d\n", m->index);
        printf("total = %d\n", m->total);
        printf("length = %d\n", m->length);

        for (i=0; i<m->length; ++i) {printf("0x%02x", m->payload[i]);
        }

        printf("\n");

        free(m);
    }

    MParser_Del(p);

    return 0;    
}

输入:

parse again...
parse again again...
m = 0x555e21dd56a0
type = 1
cmd = 2
index = 3
total = 4
length = 4
0x11 0x12 0x13 0x14

测试 2:
client.c

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <malloc.h>

#include "message.h"

static void hton(Message *m)
{m->type = htons(m->type);
    m->cmd = htons(m->cmd);
    m->index = htons(m->index);
    m->total = htons(m->total);
    m->length = htonl(m->length);    
}

int main()
{
    int sock = 0;
    struct sockaddr_in addr = {0};
    int i = 0;
    char *test = "D.T.Software";
    Message *pm = NULL;

    sock = socket(PF_INET, SOCK_STREAM, 0);

    if (sock == -1) {printf("socket error\n");
        return -1;
    }

    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr("127.0.0.1");
    addr.sin_port = htons(8888);

    if (connect(sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {printf("connect error\n");
        return -1;
    }

    printf("connect success\n");

    for (i=0; i<strlen(test); ++i) {char buf[2] = {0};

        buf[0] = test[i];

        pm = Message_New(128, 129, i, strlen(test), buf, 2);

        hton(pm);

        send(sock, pm, sizeof(Message) + 2, 0);

        free(pm);
    }
    
    close(sock);

    return 0;
}

server.c

#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <malloc.h>

#include "msg_parser.h"

int main()
{
    int server = 0;
    struct sockaddr_in saddr = {0};
    int client = 0;
    struct sockaddr_in caddr = {0};
    socklen_t asize = 0;
    int len = 0;
    char buf[32] = {0};
    int r = 0;
    MParser *parser = MParser_New();

    server = socket(PF_INET, SOCK_STREAM, 0);

    if (server == -1) {printf("server socket error\n");
        return -1;
    }

    saddr.sin_family = AF_INET;
    saddr.sin_addr.s_addr = htonl(INADDR_ANY);
    saddr.sin_port = htons(8888);

    if (bind(server, (struct sockaddr*)&saddr, sizeof(saddr)) == -1) {printf("server bind error\n");
        return -1;
    }

    if (listen(server, 1) == -1) {printf("server listen error\n");
        return -1;
    }

    printf("server start success\n");

    while (1) {struct tcp_info info = {0};
        int l = sizeof(info);

        asize = sizeof(caddr);
        client = accept(server, (struct sockaddr*)&caddr, &asize);

        if (client == -1) {printf("client accept error\n");
            return -1;
        }

        printf("client: %d\n", client);

        do {getsockopt(client, IPPROTO_TCP, TCP_INFO, &info, (socklen_t*)&l);

            Message *m = MParser_ReadFd(parser, client);

            if (m) {printf("type = %d\n", m->type);
                printf("cmd = %d\n", m->cmd);
                printf("index = %d\n", m->index);
                printf("total = %d\n", m->total);
                printf("length = %d\n", m->length);
                printf("payload = %s\n", m->payload);
                printf("\n");
                free(m);
            }
        } while (info.tcpi_state == TCP_ESTABLISHED);

        printf("client socket is closed\n");

        close(client);
    }

    close(server);

    MParser_Del(parser);

    return 0;
}

输入:

server start success
client: 4
type = 128
cmd = 129
index = 0
total = 12
length = 2
payload = D

type = 128
cmd = 129
index = 1
total = 12
length = 2
payload = .

type = 128
cmd = 129
index = 2
total = 12
length = 2
payload = T

client socket is closed

有了协定和协定解析器之后,能够干嘛?

正文完
 0