共计 1650 个字符,预计需要花费 5 分钟才能阅读完成。
bsonrpc 是 python 中⼀个基于 json 或 bson 的近程过程调⽤的库,提供了服务端与客户端实现,其底层采⽤的是基于 TCP 连贯的通信。
程序结构
bsonrpc 次要包含以下⽂件:
- concurrent.py:针对两种并发⽅式(threading 线程对象、gevent 协程对象)波及的相应组件(Queue,Event,Lock 等)提供统⼀的对外的⽣成接⼝:spawn(),new_promise(),new_queue(),new_lock() 等;
- definitions.py:定义 rpc 的音讯构造和谬误编码;
- dispatcher.py:rpc 的解决调度,路由解决(音讯对应的处理函数);
- exceptions.py:异样定义;
- framing.py:定义不同类实现 JSON RPC 2.0 规范中的不同音讯构造;
- interfaces.py:定义提供服务的装璜器;
- misc.py:该⽂件中定义了⼀个 id ⽣成器,从 1 开始累加。
- options.py:定义配置选项。
- rpc.py:次要为 BSONRpc 和 JSONRpc 类的实现;
- socket_queue.py:次要为音讯的拆包组包局部;
- util.py:零碎⼯具。
本⽂次要形容库包中对于不同协定的分包组包的解决,波及到 socket_queue.py 和 framing.py ⽂件,次要采⽤的是对象组合的技术。
解读
socket_queue.py 中的 SocketQueue 类是⽤来解决从 socket 接收数据,次要的⽅法为_receiver() 和 put() ⽅法,别离对应分包和组包,分包的次要内容如下:
def _receiver(self):
bbuffer = b''
while True:
try:
chunk = self.socket.recv(self.BUFSIZE) # 从 socket 上接收数据
bbuffer = self._to_queue(bbuffer + chunk) # 数据分包
except DecodingError as e:
self._queue.put(e)
# 后⾯省略...
def _to_queue(self, bbuffer):
b_msg, bbuffer = self.codec.extract_message(bbuffer) # 解码器提取残缺的信息
while b_msg is not None:
self._queue.put(self.codec.loads(b_msg)) # 解码后的音讯放⼊音讯队列中期待解决
b_msg, bbuffer = self.codec.extract_message(bbuffer)
return bbuffer
组包的次要内容如下:
def put(self, item):
if self._closed:
raise BsonRpcError('Attempt to put items to closed queue.')
msg_bytes = self.codec.into_frame(self.codec.dumps(item)) # 组包
with self._lock:
self.socket.sendall(msg_bytes)
如上图所示,程序采⽤的是对象组合的⽅式实现音讯分包解决的。对象组合是继承之外的另⼀种抉择,对象组合要求被组合的对象具备良好定义的接⼝,通过接⼝的⽅式调⽤其余对象的性能,这个也被“⿊箱复⽤”,因为对象的外部细节是不可⻅的。SocketQueue 中依赖 Codec 的 extract_message() 接⼝⽅法,不⽤关⼼其具体的实现⽅法。具体实现由 JSONCodec 和 BSONCode 进⾏实现。JSONCodec 中依赖 JSONFrame 中的 extract_message() 接⼝⽅法,该接⼝⽅法的实现由 JSONFramingNone、JSONFramingNetstring、JSONFramingRFC7464 进⾏实现。SocketQueue 音讯组包过程依赖于 into_frame() ⽅法,也是通过对象组合实现的。
注:图中的接⼝为了⼤家容易了解才加上了,源码⾥⾯并没有。
正文完