关于网络编程:用-Python-写网络编程四

46次阅读

共计 6650 个字符,预计需要花费 17 分钟才能阅读完成。

复习功课

第一篇地址

第二篇地址

第三篇地址

逐渐讲述回包

间接入正题,第三篇曾经提到了如何收到回包,以后这个主题网络编程都是讲 Tcp,协定申请形式自身会决定个性,咱们这里先讲和发包收包程序无关的。

Tcp 首先是一个双工的,意思就是客户端 C 端发消息给 S 端,S 端也能够发消息给客户端,传输协定数据类型外面传输在网卡那层都是二进制文件流。为啥是流而不是包能够回顾之前的,也能够抉择死记硬背。

那么 C 端是不是发给 S 端音讯,S 端肯定是要回复的呢,这个是未必的。因为在协定申请 / 传输方式还会传输协定数据类型,这个类型和业务是间接无关的,大体能够分为须要应答和不须要应答。不须要应答的最常见的就是心跳包。这里为啥要叫包而不是叫心跳流呢,只能说是一种通用的叫法。

当客户端链接上服务后,每隔约一段时间,往服务器固定传输一段信息,就和人的心电图一样,证实以后客户端是沉闷的。如果不沉闷服务器会把客户端对象断开(Tcp 挥手),这里不沉闷个别是指多少次没有收到,才会判断不沉闷。

断开的益处,这里能够这样记忆,每个 socket 链接是肯定有老本的,放弃链接状态也一样,每个 socket 做一些事会产生内存变动(这里不思考软件缓存,硬件的几级缓存),如果在把一些数据存储到数据库,就会把内存数据交换到数据库或者数据库缓存,交互过程也会产生内存碎片和数据库 IO 开销。只是是沉闷的,就会依据这个 socket 做得事件产生一系列的非用户态的一些开销,所以有必要剔除不沉闷的申请。

比方 30 分钟没有取得教训 / 多少分钟没有挪动(挪动也会产生挪动数据包,这个数据包会逐条同步给他四周的沉闷用户),而后就会进入挂机状态,这种其实也是相似把 fb 标记为了不沉闷 (leave = no alive),不沉闷就只须要定期同步一次。

如果是齐全下线 (close),这里撇开业务,齐全下线也会主动挂机获取教训。

以上次要缓缓开展客户端发了 3 次 100 个字节后期待几百 ms,然而服务器只回了一组包,这个是 Tcp 独有黏包的问题。

然而发现这样跨度有点大,例子比拟根底,会先来通过一个 Tcp 心跳的例子(分四,五章节)来缓缓往后延长,反正最终都会讲清楚的。

心跳学习和设计

写个例子来做一些直观性的推理和设计,例子也会联合之前的一些知识点。

心跳信息:json 字串 # 实战外面是一个 bytes 模式 这个 butes 会比拟小,只是为了让服务器计数客户端是沉闷的。

详解 服务器那边记录一个客户端连贯句柄的治理字典,链接的客户端对象在服务器那边会是一个 ”fd”,是否沉闷会是一个字段 ”status”:”alive”,””leave”,”close” 别离代表 沉闷,非沉闷暂离,敞开。

服务器那边进行开发设计,反对状态为一个双向的程序 (“alive”<–>”leave”<–>”close”)

对于 status:”alive” 状态条件 客户端来说是默认的,没在 socket.error 那层被拦挡掉,所以链接上了,链接上分为地址正确,写法正确,防火墙容许 (这里测试用不上这个)

须要服务器开发:
1.socket 服务器程序
2. 惯例接管包
3. 判断 4 个字节后的数据在反序列化 json.loads。服务器比客户端多一个 bind 后开启监听的性能,服务器反对多个客户端。
4. 匹配性能。服务器收到客户端信息后会进行匹配,有一个游标(第 5 章实现该性能)。
游标比方设置为 3,阈值是 5,多少秒收到一次音讯就 -1,多少秒没有收到音讯就 +1. 阈值达到一次 5 计数一次,达到 3 次阈值,批改以后客户端链接治理字典为 ”leave”。达到阈值小于 3 次那么还是 alive。
5. 会用一个 user 实例对象列表去治理增加模仿链接进来的客户端。每 10 秒加一个,客户端是模仿假的,端口号 +1。

须要客户端开发的:
1.socket client 程序
2.Json 发包 + 数据结构
3. 距离每 10 秒发一次。

对于 status:”leave” 状态:记录放弃 30 分钟(代码会写 30 秒)会到下个状态 ”close”,如果呈现心跳稳固屡次,没有触发到阈值 5,会回退到 ”alive”。

对于达到 status:”close” 服务器会把客户端踢掉,在 user 的列表治理外面,服务器会存储客户端的 ip: 端口。

心跳客户端前置

先写这个例子 只有 ”alive” 状态条件,没有切换条件的。依据过来例子,咱们先须要定义一个网络层的数据结构,这次先不必 protobuff。

协定传输用 Tcp 应用通用性高的 struct,传输数据构造为 Json
Tcp 数据结构为包头 4 个字节,4 个字节为外面塞包体长度,那个整个包长度也就是 4 个字节(固定的)+ 包头 4 个字节的取值(动静的包体长度)。

Json C2S {“ip”: 传递本地 Ip,”status”:下面讲的状态应用默认的,”pid”: 用于客户端治理本人的} pid 这里次要用于本人杀本人过程。

这里须要补一个常识,第三章未讲完的,就是如果 4 个字节包头长度 + 客户端传递对象是如何写的

# ip_add 给服务器的地址
ip_addr = socket.gethostbyname(socket.gethostname())
# 客户端本地本人治理本人用的 pid
pid = os.getpid()
# 状态切换用的列表,列表有序的,通过 idx 作为状态机索引,默认是 alive
status = ["alive", "leave", "close"]
idx = 0
data = {'ip': ip_addr, 'status': status[idx], 'pid': pid}
# data 的长度,这里有 str 只是为了上面 + 应用,int->str 不影响上面后果
pack_len = str(len(data))
print(pack_len) # 输入为 3
# i 是 4 个字节 +pack_len struct.calcsize 是看 fmt 区域的
client_len = struct.calcsize("!i" + pack_len + 's')
print(client_len) # 输入为 4 +3

留神看正文,动静计算长度就这么进去了,然而留神了,struct.pack 压包动态化是面向包头外面有几个对象,如果只有一个对象传入 json 输入应该是这样的
本次代码对于压包实例

buffer = json.dumps(data)
print(len(buffer))
packet_head = struct.pack("!i",len(buffer))
buffer_client = packet_head+buffer.encode("utf-8")
print(buffer_client) # b'\x00\x00\x008{"ip":"192.168.1.105","status":"alive","pid": 26464}'

心跳客户端

只蕴含和服务器进行通信,发送 Tcp-stuct,Json,把 json 数据做为心跳包发送了,留神理论心跳确实会是固定的数据,但不会传入这个。
服务器只是解析字符串那样去判断心跳是哪个客户端发的。状态切换性能下个文章再写。

import datetime
import socket, sys, os
import time, json,struct

class HeartBeatClient:
    interval = 10  #心跳发送距离

    def __init__(self, addr: tuple):
        try:
            self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.client.connect(addr)
        except socket.error as e:
            print("Error creating socket: %s" % e)
            sys.exit()

    def encode_json(self, conn,data:dict):
        """发包"""
        buffer = json.dumps(data)
        print(f"以后客户端收回去包长度{len(buffer)}")
        packet_head = struct.pack("!i", len(buffer))
        conn.send(packet_head + buffer.encode("utf-8"))

    def loop_client(self, conn: socket.socket):
        """
        循环客户端
        :param conn:
        :return:
        """status = ["alive","leave","close"]  #状态列表有序
        idx = 0 #状态索引
        while True:
            ip_addr = socket.gethostbyname(socket.gethostname())
            pid = os.getpid()
            data = {"ip": ip_addr, "status": status[idx], "pid": pid}
            buffer = json.dumps(data)
            try:
                self.encode_json(conn,buffer)
            except socket.error:
                print("client send failed")
                break
            else:
                now =datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
                print({"time":now,"fd":os.getpid(),"status":status[idx]})
                time.sleep(self.interval)
        self.client_close(conn)

    def client_close(self,conn):
        if conn:
            conn.close()


if __name__ == '__main__':
    client = HeartBeatClient(("127.0.0.1", 14000))
    client.loop_client(client.client)

加固 strcut 传递应用的例子,倡议手敲代码。

心跳服务器

次要是对应这次客户端编写的心跳服务器性能,留神调式是先启动服务器,在启动客户端文件。
为了模仿 fd_port 自增 1, 等于好多个客户端连贯入,这里如果不相熟 Threading 的,须要略微自学下。

import socket
import json
from threading import Thread
import ast

class HeartBeatServer:

    def __init__(self, addr: tuple, max: int):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(addr)
        # 最大反对 max 个链接数
        self.sock.listen(max)
        self.user = []

    def encode_buffer(self, conn: socket.socket, data: str or bytes or dict):
        """
        发送数据
        :param conn: 客户端链接到服务器的句柄
        :param data: 数据
        :return:
        """
        if isinstance(data, str):
            data = data.encode("utf-8")
        elif isinstance(data, dict):
            data = str(data).encode("utf-8")
        conn.send(str(data).encode("utf-8"))

    def match_data(self,recv)->bool:
        """
        匹配反序列化后的数据
        :param recv:
        :return:
        """
        if isinstance(recv,dict):
            return recv.get("ip") and recv.get("status")

    def loop_thread(self, conn: socket.socket, fd_port: int):
        """
        服务器是只判断客户端的包头是否非法。:return:
        """
        head_len = 4
        self.user.append("192.168.1.105:88888")
        idx = 1
        while True:
            try:
                # 不停收包
                data = conn.recv(1024 * 4)
                if len(data) > head_len:
                    recv = json.loads(data[4:])
                    recv = ast.literal_eval(recv)
                    if not self.match_data(recv):
                        # pop()列表丢进来并且返回丢出的数据,默认是最初一个
                        print(f"client fd addr:{self.user.pop()} Connected over!")
                        break
                    # 为了模仿 fd_port 自增 1, 等于好多个客户端连贯入
                    self.user.append(f"{recv.get('ip')}:{fd_port + idx}")
                    print(f"以后服务器治理用户 -->{self.user}")
                else:
                    print(f"client fd addr:{self.user[-1]}数据不非法")
            except socket.error:
                print(f"client fd addr:{self.user.pop()} Connected over!")
                break
            else:
                idx += 1
        conn.close()

    def start(self):
        """
        服务启动
        :return:
        """
        while True:
            conn, addr = self.sock.accept()
            t = Thread(target=self.loop_thread, args=(conn, addr[1]))
            t.start()


if __name__ == '__main__':
    server = HeartBeatServer(("127.0.0.1", 14000), 100)
    server.start()

这里有一个重要概念

这里为啥不能用 self.sock 是因为 self.sock 是服务器自身的句柄
conn 是服务器收到客户端对象的句柄,所以这个 conn 才是被治理的,当客户端断开时,在服务器通过 conn 进行挥手
如果是 self.sock 服务就敞开了。

结尾

第五章也写了一部分草稿中

本文首发于 TesterHome 社区,作者是资深游戏测试开发工程师陈子昂。用 Python 写网络编程共四篇,明天分享的是第四篇。原文链接:https://testerhome.com/topics/29411

以上是明天的分享,你学废了吗~

想学习更多干货常识和前沿技术?

想结识测试行业大咖和业界精英?

欢送关注2022 MTSC 大会(第十届中国互联网测试开发大会)

业界对 MTSC 大会的评估:落地、求实、有深度、重分享

中国互联网测试开发大会 Testing Summit China,是由 TesterHome 发动的软件测试开发行业技术会议。还有个别名:Make Testing Super Cool!

MTSC 大会以软件品质保障体系和测试研发技术交换为次要目标,始于 2015 年,已胜利举办了九届。共有 1000+ 家企业,10000+ 测试工程师、测试经理、CTO 参会,受到了全行业的宽泛关注,是中国互联网质量保证行业的顶级会议。

为了保障议题品质,每年都会提前半年进行议题征集,再通过历时数月的审核,最终确定在大会分享的议题。MTSC 2022 的议题还在征集中,诚邀各位资深测试技术专家、品质治理经理和测试新秀们自荐 / 举荐议题!

提交议题形式

间接点击 https://www.wjx.top/vj/wZwCju… 进入投递入口,依照格局进行填写并提交即可,欢送自荐 / 举荐议题。

议题截止工夫

为便于评审组有更充沛工夫进行评审及与讲师进行沟通优化,为宽广参会者出现更好的议题,议题投稿(可无 PPT,有纲要等议题信息介绍即可)投递截止工夫提前为:2022 年 3 月 30 日

议题征集评比流程

总体流程:案例提交 > 初审核定 > PPT 提交 > 确认议题 > 会议演讲

总体工夫节点:

案例提交 >> 3 月 30 日

初审核定 >> 4 月 15 日

ppt 提交 >> 5 月 15 日

议题确认 >> 5 月 30 日

会议演讲 >> 7 月 22~23 日

分享讲师福利

  1. 锤炼演讲能力,加强集体品牌
  2. 取得和业内专家面对面交换的机会,博采众长
  3. 晋升公司品牌,给本人的团队减少吸引力
  4. 获取收费的大会门票和材料:
    每位讲师都会获赠 1 张大会门票,大会后续的 PPT 和视频都会第一工夫给到讲师
  5. 当地讲师由 MTSC 大会组委会承当差旅费用

MTSC 2022 早鸟票已轻轻开售,点击理解详情。

正文完
 0