本文偏重介绍如何应用 python 和 mitmproxy 实现拦挡数据包、重发数据包,以及解析 protobuf 数据内容,对于相干依赖的装置不做介绍。
一、游戏协定平安测试内容
参考 https://testerhome.com/topics…,这篇文章讲的很分明。
二、实现原理
想间接应用的同学能够跳到第三局部。
mitmproxy 作为代理,能够获取客户端与服务端通信的数据,并且能够拦挡、批改和自主发送数据。当配合其证书应用时,还能够解密 wss 连贯中的 websocket 数据。
- Websotcket 数据处理源码剖析
在 http 代理的过程中若发现 upgrade websocket 申请,则创立 WebSocketLayer 实例,并调用其_call_办法。
# mitmproxy/proxy/protocol/http.py
"""以下为 Httplayer 的_process_flow 办法的局部代码"""
if f.response.status_code == 101:
# Handle a successful HTTP 101 Switching Protocols Response,
# received after e.g. a WebSocket upgrade request.
# Check for WebSocket handshake
is_websocket = (websockets.check_handshake(f.request.headers) and
websockets.check_handshake(f.response.headers)
)
if is_websocket and not self.config.options.websocket:
self.log(
"Client requested WebSocket connection, but the protocol is disabled.",
"info"
)
if is_websocket and self.config.options.websocket:
layer = WebSocketLayer(self, f)
else:
layer = self.ctx.next_layer(self)
layer()
WebSocketLayer 初始化时会创立用于此次 websocket 通信的编解码器。
# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer 类的 init 办法,省略局部代码"""
def __init__(self, ctx, handshake_flow):
super().__init__(ctx)
self.handshake_flow = handshake_flow
self.connections: dict[object, WSConnection] = {}
client_extensions = []
server_extensions = []
# 判断交互数据是否应用 deflate 压缩
if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
client_extensions = [PerMessageDeflate()]
server_extensions = [PerMessageDeflate()]
# self.client_conn 和 self.server_conn 继承自 ctx, 即原 http 的 client 和 server,原理为父类的__getattr__(self, name)办法返回的是 getattr(self.ctx, name)。WSConnection 是一个 websocket 协定编解码器,理论不会发送任何网络 IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html
# 负责和解码 server 收到信息和编码 server 发送的信息
self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER)
# 负责和解码 client 收到信息和编码 client 发送的信息
self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT)
# 结构发送给 Server 的 websocket 的握手申请
request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path)
# send()办法只会结构一个实用于对应 conn 的数据,并不会真正发送数据,recv_data()会将信息解码,须要通过 next(conn.events())获取解码后数据
# 按上所说,上面两行代码的操作是将握手申请按 client 编码后发送给 server 编码器,而后让 server 编码器解码
data = self.connections[self.server_conn].send(request)
self.connections[self.client_conn].receive_data(data)
event = next(self.connections[self.client_conn].events())
assert isinstance(event, events.Request)
# 返回给客户端承受连贯响应
data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions))
self.connections[self.server_conn].receive_data(data)
assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)
WebSocketLayer 实例的_call_办法负责解决后续 websocket 通信
# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer 类的 call 办法,省略局部代码"""
def __call__(self):
self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow)
self.flow.metadata['websocket_handshake'] = self.handshake_flow.id
self.handshake_flow.metadata['websocket_flow'] = self.flow.id
# 调用 addons 中的 websocket_start(self, flow)对 flow 进行解决
self.channel.ask("websocket_start", self.flow)
conns = [c.connection for c in self.connections.keys()]
close_received = False
try:
while not self.channel.should_exit.is_set():
# 往 client 或 server 插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server 是队列,后续实现在连贯中被动发消息就是通过往队列中插入数据实现
self._inject_messages(self.client_conn, self.flow._inject_messages_client)
self._inject_messages(self.server_conn, self.flow._inject_messages_server)
# select 监督原 http 的 client 和 server 连贯的可读事件
r = tcp.ssl_read_select(conns, 0.1)
for conn in r:
source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
is_server = (source_conn == self.server_conn)
frame = websockets.Frame.from_file(source_conn.rfile)
# 将从 conn 中获取的数据放入编解码器,此办法并没有返回值,所以 data 是 None
data = self.connections[source_conn].receive_data(bytes(frame))
# data 是 None,不解此举有何意义
source_conn.send(data)
if close_received:
return
# 解决编解码器中解码后的数据,event 由 pop 取出,后续不会再用到。for event in self.connections[source_conn].events():
if not self._handle_event(event, source_conn, other_conn, is_server):
if not close_received:
close_received = True
except (socket.error, exceptions.TcpException, SSL.Error) as e:
s = 'server' if is_server else 'client'
self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e)))
# 调用 addons 中的 websocket_start(self, flow)对 flow 进行解决
self.channel.tell("websocket_start", self.flow)
finally:
self.flow.ended = True
# 调用 addons 中的 websocket_end(self, flow)对 flow 进行解决
self.channel.tell("websocket_end", self.flow)
WebSocketLayer 实例中解决 Message Event 的办法
# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer 类的_handle_message 办法,_handle_event 中,若 isinstance(event, events.Message),则会调用此函数"""
def _handle_message(self, event, source_conn, other_conn, is_server):
fb = self.server_frame_buffer if is_server else self.client_frame_buffer
fb.append(event.data)
if event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
if isinstance(event, events.TextMessage):
message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
fb.clear()
websocket_message = WebSocketMessage(message_type, not is_server, payload)
length = len(websocket_message.content)
self.flow.messages.append(websocket_message)
# 调用 addons 中的 websocket_message(self, flow)对 flow 进行解决
self.channel.ask("websocket_message", self.flow)
# WebsocketMessage 的属性 killed 用于判断该信息是否须要被转发,可在 websocket_message 函数中调用 message 的 kill()办法置为 True
if not self.flow.stream and not websocket_message.killed:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
pos = 0
for s in original_chunk_sizes:
yield (payload[pos:pos + s], True if pos + s == length else False)
pos += s
else:
# just re-chunk everything into 4kB frames
# header len = 4 bytes without masking key and 8 bytes with masking key
chunk_size = 4092 if is_server else 4088
chunks = range(0, len(payload), chunk_size)
for i in chunks:
yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)
# 将收到的信息从新编码后向对端发送
for chunk, final in get_chunk(websocket_message.content):
data = self.connections[other_conn].send(Message(data=chunk, message_finished=final))
other_conn.send(data)
if self.flow.stream:
data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished))
other_conn.send(data)
return True
-
Tcp 数据处理源码剖析
TCP 数据处理触发条件# mitmproxy/proxy/root_context.py """RootContext 类_next_layer 办法,省略局部代码"""
-
Check for –tcp
判断 Option 中 tcp_hosts, 类型是一个列表,蕴含须要转换成 tcp 流信息的 server address 正则表达式,例如[‘192.168.\d+.\d+’]
“””
if self.config.check_tcp(top_layer.server_conn.address):
return protocol.RawTCPLayer(top_layer)“””
6. Check for raw tcp mode
判断 Option 中 rawtcp,类型是 bool,若为 true,则将不能解决的流转换成 tcp 流解决,倡议开启,默认是 false
“””
is_ascii = (
len(d) == 3 and
# expect A-Za-z
all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
)
if self.config.options.rawtcp and not is_ascii:
return protocol.RawTCPLayer(top_layer)
TCP 信息处理 RawTCPLayer 类源码class RawTCPLayer(base.Layer):
chunk_size = 4096def __init__(self, ctx, ignore=False):
self.ignore = ignore super().__init__(ctx)
def __call__(self):
self.connect() if not self.ignore: f = tcp.TCPFlow(self.client_conn, self.server_conn, self) # 调用 addons 中的 tcp_start(self, flow)对 flow 进行解决 self.channel.ask("tcp_start", f) # 创立一个长度为 4096 的空 bytearray buf = memoryview(bytearray(self.chunk_size)) client = self.client_conn.connection server = self.server_conn.connection conns = [client, server] # https://github.com/openssl/openssl/issues/6234 for conn in conns: if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"): SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY) try: while not self.channel.should_exit.is_set(): r = mitmproxy.net.tcp.ssl_read_select(conns, 10) for conn in r: dst = server if conn == client else client try: # 将从 conn 中 recv 的数据存入 buf,返回 size size = conn.recv_into(buf, self.chunk_size) except (SSL.WantReadError, SSL.WantWriteError): continue if not size: conns.remove(conn) # Shutdown connection to the other peer if isinstance(conn, SSL.Connection): # We can't half-close a connection, so we just close everything here. # Sockets will be cleaned up on a higher level. return else: dst.shutdown(socket.SHUT_WR) if len(conns) == 0: return continue # 将 recv 的数据转成 TCPMessage tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes()) if not self.ignore: f.messages.append(tcp_message) # 调用 addons 中的 tcp_message(self, flow)对 flow 进行解决 self.channel.ask("tcp_message", f) # 发送 tcp_message 中的 content dst.sendall(tcp_message.content) except (socket.error, exceptions.TcpException, SSL.Error) as e: if not self.ignore: f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e))) # 调用 addons 中的 tcp_error(self, flow)对 flow 进行解决 self.channel.tell("tcp_error", f) finally: if not self.ignore: # 调用 addons 中的 tcp_end(self, flow)对 flow 进行解决 self.channel.tell("tcp_end", f)
三、开启 mitmproxy 并加载 addon
首先须要装置两个库:mitmproxy 和 mitmdump
1、编写 websocket 的 addon
"""
简略版用于 websocket 的 Addon
后续改良能够减少判断 host,防止拦挡到不须要解决的连贯,或者将 Queue 改成 redis
"""
import asyncio
from multiprocessing import Queue
import mitmproxy.websocket
class WebsocketAddon:
def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
self._input_q = input_q
self._output_q = output_q
async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
while not flow.ended and not flow.error:
# 减少距离,否则会阻塞 event
await asyncio.sleep(0.5)
while not self._input_q.empty():
# WebSocketFlow 的内置办法,用于被动插入信息,这里我只被动插入 client->server 的信息
flow.inject_message(flow.server_conn, self._input_q.get())
def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
# 退出发送 websocket 音讯的 task,参考了官网的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message
asyncio.get_event_loop().create_task(self.inject(flow))
def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
message = flow.messages[-1]
self._output_q.put({
'from_client': message.from_client,
'data': message.content
})
# message.kill()能够让 Layer 不转发该条信息,我这里的目标是拦挡掉所有客户端发送的数据,由本人编辑后再发送
if message.from_client:
message.kill()
2、编写 socket 的 addon
"""
简略版用于 socket 的 Addon
和 websocket 版差异不大,插入数据和拦挡数据有区别
"""
import asyncio
from multiprocessing import Queue
import mitmproxy.tcp
class SocketAddon:
def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
self._input_q = input_q
self._output_q = output_q
async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
while flow.live and not flow.error:
await asyncio.sleep(0.5)
while not self._input_q.empty():
# 间接向对端发送 socket 信息实现插入
flow.server_conn.connection.sendall(payload)
def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
asyncio.get_event_loop().create_task(self.inject(flow))
def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
message = flow.messages[-1]
self._output_q.put({
'from_client': message.from_client,
'data': message.content
})
if message.from_client:
# socket 发送 0 字节,conn.sendall(b'')将不会发送任何数据
message.content = b''
3、开启 mitmproxy 并实现处理函数
import multiprocessing
from mitmdump import Options, DumpMaster
def start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
addons = [
# 自主抉择是应用 Websocket 还是 Socket
WebsocketAddon(input_q, output_q)
# SocketAddon(input_q, output_q)
]
opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5',
rawtcp=True,
# 须要转换 tcp 数据成的 ip 正则
tcp_hosts=['.*'],
flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, )
m = DumpMaster(opts)
m.addons.add(*addons)
m.run()
def deal_client_message_func(client_message: [bytes, str]):
if type(client_message) is bytes:
return client_message.decode('utf-8').encode('gbk')
elif type(client_message) is str:
return f"test {client_message}"
def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
while True:
if not output_q.empty()
message = output_q.get()
print(f"{' 客户端 'if message['from_client'] else' 服务端 '} 包内容:{message['data']}")
if message['from_client']:
input_q.push(deal_client_message_func(message['data']))
def main():
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
# 应用子过程启动 proxy
multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start()
simple_handel_message_func(input_queue, output_queue)
四、总结
对于想实现结尾文中所提到的性能还须要实现客户端,以及对于 protobuf 协定的编解码,这里限于篇幅不再探讨,后续有机会再更新。
另外,之所以 mitmproxy 抉择 socks5 模式,是因为 socks 协定反对代理除了 http、https 以外更多品种的协定,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。