本文偏重介绍如何应用 python 和 mitmproxy 实现拦挡数据包、重发数据包,以及解析 protobuf 数据内容,对于相干依赖的装置不做介绍。

一、游戏协定平安测试内容

参考https://testerhome.com/topics...,这篇文章讲的很分明。

二、实现原理

想间接应用的同学能够跳到第三局部。

mitmproxy 作为代理,能够获取客户端与服务端通信的数据,并且能够拦挡、批改和自主发送数据。当配合其证书应用时,还能够解密 wss 连贯中的 websocket 数据。

  • Websotcket 数据处理源码剖析
    在 http 代理的过程中若发现 upgrade websocket 申请,则创立 WebSocketLayer 实例,并调用其_call_办法。
# mitmproxy/proxy/protocol/http.py"""以下为Httplayer的_process_flow办法的局部代码"""if f.response.status_code == 101:    # Handle a successful HTTP 101 Switching Protocols Response,    # received after e.g. a WebSocket upgrade request.    # Check for WebSocket handshake    is_websocket = (        websockets.check_handshake(f.request.headers) and        websockets.check_handshake(f.response.headers)    )    if is_websocket and not self.config.options.websocket:        self.log(          "Client requested WebSocket connection, but the protocol is disabled.",          "info"        )    if is_websocket and self.config.options.websocket:        layer = WebSocketLayer(self, f)    else:        layer = self.ctx.next_layer(self)    layer()

WebSocketLayer 初始化时会创立用于此次 websocket 通信的编解码器。

# mitmproxy/proxy/protocol/websocket.py"""WebSocketLayer类的init办法,省略局部代码"""def __init__(self, ctx, handshake_flow):    super().__init__(ctx)    self.handshake_flow = handshake_flow    self.connections: dict[object, WSConnection] = {}    client_extensions = []    server_extensions = []    # 判断交互数据是否应用deflate压缩    if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:        if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:            client_extensions = [PerMessageDeflate()]            server_extensions = [PerMessageDeflate()]    # self.client_conn和self.server_conn继承自ctx,即原http的client和server,原理为父类的__getattr__(self, name)办法返回的是getattr(self.ctx, name)。WSConnection是一个websocket协定编解码器,理论不会发送任何网络IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html    # 负责和解码server收到信息和编码server发送的信息    self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER)    # 负责和解码client收到信息和编码client发送的信息    self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT)    # 结构发送给Server的websocket的握手申请    request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path)    # send()办法只会结构一个实用于对应conn的数据,并不会真正发送数据,recv_data()会将信息解码,须要通过next(conn.events())获取解码后数据    # 按上所说,上面两行代码的操作是将握手申请按client编码后发送给server编码器,而后让server编码器解码    data = self.connections[self.server_conn].send(request)    self.connections[self.client_conn].receive_data(data)    event = next(self.connections[self.client_conn].events())    assert isinstance(event, events.Request)    # 返回给客户端承受连贯响应    data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions))    self.connections[self.server_conn].receive_data(data)    assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)

WebSocketLayer 实例的_call_办法负责解决后续 websocket 通信

# mitmproxy/proxy/protocol/websocket.py"""WebSocketLayer类的call办法,省略局部代码"""def __call__(self):    self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow)    self.flow.metadata['websocket_handshake'] = self.handshake_flow.id    self.handshake_flow.metadata['websocket_flow'] = self.flow.id    # 调用addons中的websocket_start(self, flow)对flow进行解决    self.channel.ask("websocket_start", self.flow)    conns = [c.connection for c in self.connections.keys()]    close_received = False    try:        while not self.channel.should_exit.is_set():            # 往client或server插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server是队列,后续实现在连贯中被动发消息就是通过往队列中插入数据实现            self._inject_messages(self.client_conn, self.flow._inject_messages_client)            self._inject_messages(self.server_conn, self.flow._inject_messages_server)        # select监督原http的client和server连贯的可读事件            r = tcp.ssl_read_select(conns, 0.1)            for conn in r:                source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn                other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn                is_server = (source_conn == self.server_conn)                frame = websockets.Frame.from_file(source_conn.rfile)                # 将从conn中获取的数据放入编解码器,此办法并没有返回值,所以data是None                data = self.connections[source_conn].receive_data(bytes(frame))                # data是None,不解此举有何意义                source_conn.send(data)                if close_received:                    return        # 解决编解码器中解码后的数据,event由pop取出,后续不会再用到。                for event in self.connections[source_conn].events():                    if not self._handle_event(event, source_conn, other_conn, is_server):                        if not close_received:                            close_received = True    except (socket.error, exceptions.TcpException, SSL.Error) as e:        s = 'server' if is_server else 'client'        self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e)))        # 调用addons中的websocket_start(self, flow)对flow进行解决        self.channel.tell("websocket_start", self.flow)    finally:        self.flow.ended = True        # 调用addons中的websocket_end(self, flow)对flow进行解决        self.channel.tell("websocket_end", self.flow)

WebSocketLayer 实例中解决 Message Event 的办法

# mitmproxy/proxy/protocol/websocket.py"""WebSocketLayer类的_handle_message办法,_handle_event中,若isinstance(event, events.Message),则会调用此函数"""def _handle_message(self, event, source_conn, other_conn, is_server):    fb = self.server_frame_buffer if is_server else self.client_frame_buffer    fb.append(event.data)    if event.message_finished:        original_chunk_sizes = [len(f) for f in fb]        if isinstance(event, events.TextMessage):            message_type = wsproto.frame_protocol.Opcode.TEXT            payload = ''.join(fb)        else:            message_type = wsproto.frame_protocol.Opcode.BINARY            payload = b''.join(fb)        fb.clear()        websocket_message = WebSocketMessage(message_type, not is_server, payload)        length = len(websocket_message.content)        self.flow.messages.append(websocket_message)        # 调用addons中的websocket_message(self, flow)对flow进行解决        self.channel.ask("websocket_message", self.flow)        # WebsocketMessage的属性killed用于判断该信息是否须要被转发,可在websocket_message函数中调用message的kill()办法置为True        if not self.flow.stream and not websocket_message.killed:            def get_chunk(payload):                if len(payload) == length:                    # message has the same length, we can reuse the same sizes                    pos = 0                    for s in original_chunk_sizes:                        yield (payload[pos:pos + s], True if pos + s == length else False)                        pos += s                else:                    # just re-chunk everything into 4kB frames                    # header len = 4 bytes without masking key and 8 bytes with masking key                    chunk_size = 4092 if is_server else 4088                    chunks = range(0, len(payload), chunk_size)                    for i in chunks:                        yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)            # 将收到的信息从新编码后向对端发送            for chunk, final in get_chunk(websocket_message.content):                data = self.connections[other_conn].send(Message(data=chunk, message_finished=final))                other_conn.send(data)    if self.flow.stream:        data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished))        other_conn.send(data)    return True
  • Tcp 数据处理源码剖析
    TCP 数据处理触发条件

    # mitmproxy/proxy/root_context.py"""RootContext类_next_layer办法,省略局部代码"""
  • Check for --tcp
    判断Option中tcp_hosts, 类型是一个列表,蕴含须要转换成tcp流信息的server address正则表达式,例如['192.168.\d+.\d+']
    """
    if self.config.check_tcp(top_layer.server_conn.address):
    return protocol.RawTCPLayer(top_layer)

    """

    6. Check for raw tcp mode

    判断Option中rawtcp,类型是bool,若为true,则将不能解决的流转换成tcp流解决,倡议开启,默认是false

    """
    is_ascii = (
    len(d) == 3 and
    # expect A-Za-z
    all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
    )
    if self.config.options.rawtcp and not is_ascii:
    return protocol.RawTCPLayer(top_layer)
    TCP 信息处理 RawTCPLayer 类源码

    class RawTCPLayer(base.Layer):
    chunk_size = 4096

    def __init__(self, ctx, ignore=False):

      self.ignore = ignore  super().__init__(ctx)

    def __call__(self):

      self.connect()  if not self.ignore:      f = tcp.TCPFlow(self.client_conn, self.server_conn, self)      # 调用addons中的tcp_start(self, flow)对flow进行解决      self.channel.ask("tcp_start", f)  # 创立一个长度为4096的空bytearray  buf = memoryview(bytearray(self.chunk_size))  client = self.client_conn.connection  server = self.server_conn.connection  conns = [client, server]  # https://github.com/openssl/openssl/issues/6234  for conn in conns:      if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"):          SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY)  try:      while not self.channel.should_exit.is_set():          r = mitmproxy.net.tcp.ssl_read_select(conns, 10)          for conn in r:              dst = server if conn == client else client              try:                  # 将从conn中recv的数据存入buf,返回size                  size = conn.recv_into(buf, self.chunk_size)              except (SSL.WantReadError, SSL.WantWriteError):                  continue              if not size:                  conns.remove(conn)                  # Shutdown connection to the other peer                  if isinstance(conn, SSL.Connection):                      # We can't half-close a connection, so we just close everything here.                      # Sockets will be cleaned up on a higher level.                      return                  else:                      dst.shutdown(socket.SHUT_WR)                  if len(conns) == 0:                      return                  continue      # 将recv的数据转成TCPMessage              tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes())              if not self.ignore:                  f.messages.append(tcp_message)                  # 调用addons中的tcp_message(self, flow)对flow进行解决                  self.channel.ask("tcp_message", f)              # 发送tcp_message中的content              dst.sendall(tcp_message.content)  except (socket.error, exceptions.TcpException, SSL.Error) as e:      if not self.ignore:          f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e)))          # 调用addons中的tcp_error(self, flow)对flow进行解决          self.channel.tell("tcp_error", f)  finally:      if not self.ignore:          # 调用addons中的tcp_end(self, flow)对flow进行解决          self.channel.tell("tcp_end", f)

三、开启 mitmproxy 并加载 addon

首先须要装置两个库:mitmproxy 和 mitmdump

1、编写 websocket 的 addon

"""简略版用于websocket的Addon后续改良能够减少判断host,防止拦挡到不须要解决的连贯,或者将Queue改成redis"""import asynciofrom multiprocessing import Queueimport mitmproxy.websocketclass WebsocketAddon:    def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):        self._input_q = input_q        self._output_q = output_q    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):        while not flow.ended and not flow.error:            # 减少距离,否则会阻塞event            await asyncio.sleep(0.5)            while not self._input_q.empty():                # WebSocketFlow的内置办法,用于被动插入信息,这里我只被动插入client->server的信息                flow.inject_message(flow.server_conn, self._input_q.get())    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):        # 退出发送websocket音讯的task,参考了官网的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message        asyncio.get_event_loop().create_task(self.inject(flow))    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):        message = flow.messages[-1]        self._output_q.put({          'from_client': message.from_client,           'data': message.content        })        # message.kill()能够让Layer不转发该条信息,我这里的目标是拦挡掉所有客户端发送的数据,由本人编辑后再发送        if message.from_client:            message.kill()

2、编写 socket 的 addon

"""简略版用于socket的Addon和websocket版差异不大,插入数据和拦挡数据有区别"""import asynciofrom multiprocessing import Queueimport mitmproxy.tcpclass SocketAddon:    def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):        self._input_q = input_q        self._output_q = output_q    async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):        while flow.live and not flow.error:            await asyncio.sleep(0.5)            while not self._input_q.empty():                    # 间接向对端发送socket信息实现插入                    flow.server_conn.connection.sendall(payload)    def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):        asyncio.get_event_loop().create_task(self.inject(flow))    def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):        message = flow.messages[-1]        self._output_q.put({          'from_client': message.from_client,           'data': message.content        })        if message.from_client:           # socket发送0字节,conn.sendall(b'')将不会发送任何数据           message.content = b''

3、开启 mitmproxy 并实现处理函数

import multiprocessingfrom mitmdump import Options, DumpMasterdef start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):        addons = [        # 自主抉择是应用Websocket还是Socket            WebsocketAddon(input_q, output_q)        # SocketAddon(input_q, output_q)        ]        opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5',                     rawtcp=True,                   # 须要转换tcp数据成的ip正则                   tcp_hosts=['.*'],                   flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, )        m = DumpMaster(opts)        m.addons.add(*addons)        m.run()def deal_client_message_func(client_message: [bytes, str]):        if type(client_message) is bytes:        return client_message.decode('utf-8').encode('gbk')    elif type(client_message) is str:        return f"test {client_message}"def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):    while True:            if not output_q.empty()            message = output_q.get()                print(f"{'客户端' if message['from_client'] else '服务端'} 包内容:{message['data']}")            if message['from_client']:                input_q.push(deal_client_message_func(message['data']))def main():    input_queue = multiprocessing.Queue()    output_queue = multiprocessing.Queue()    # 应用子过程启动proxy    multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start()    simple_handel_message_func(input_queue, output_queue)

四、总结

对于想实现结尾文中所提到的性能还须要实现客户端,以及对于 protobuf 协定的编解码,这里限于篇幅不再探讨,后续有机会再更新。

另外,之所以 mitmproxy 抉择 socks5 模式,是因为 socks 协定反对代理除了 http、https 以外更多品种的协定,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。