乐趣区

Socket Error 104 bug

bug 概述
技术栈

nginx
uwsgi
bottle

错误详情
报警机器人经常有如下警告:
<27>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 – – Socket Error: 104
<31>1 2018-xx-xxT06:59:03.038Z 660ece0ebaad admin/admin 14 – – Removing timeout for next heartbeat interval
<28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 – – Socket closed when connection was open
<31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 – – Added: {‘callback’: <bound method SelectConnection._on_connection_start of <pika.adapters.select_connection.SelectConnection object at 0x7f74752525d0>>, ‘only’: None, ‘one_shot’: True, ‘arguments’: None, ‘calls’: 1}
<28>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 – – Disconnected from RabbitMQ at xx_host:5672 (0): Not specified
<31>1 2018-xx-xxT06:59:03.039Z 660ece0ebaad admin/admin 14 – – Processing 0:_on_connection_closed
<31>1 2018-xx-xxT06:59:03.040Z 660ece0ebaad admin/admin 14 – – Calling <bound method _CallbackResult.set_value_once of <pika.adapters.blocking_connection._CallbackResult object at 0x7f74752513f8>> for “0:_on_connection_closed”
debug 过程
确定报错位置
有日志就很好办, 首先看日志在哪里打的. 从三个地方入手.
我们自己的代码
没有.
uwsgi 的代码
root@660ece0ebaad:/# uwsgi –version2.0.14 从 github 上 co 下来, 没有.
python library 的代码
在容器中执行
>>> import sys
>>> sys.path
[”, ‘/usr/lib/python2.7’, ‘/usr/lib/python2.7/plat-x86_64-linux-gnu’, ‘/usr/lib/python2.7/lib-tk’, ‘/usr/lib/python2.7/lib-old’, ‘/usr/lib/python2.7/lib-dynload’, ‘/usr/local/lib/python2.7/dist-packages’, ‘/usr/lib/python2.7/dist-packages’, ‘/usr/lib/python2.7/dist-packages/PILcompat’, ‘/usr/lib/python2.7/dist-packages/gtk-2.0’]
在这些目录下 grep, 在 pika 中找到
root@660ece0ebaad:/usr/local/lib/python2.7# grep “Socket Error” -R .
Binary file ./dist-packages/pika/adapters/base_connection.pyc matches
./dist-packages/pika/adapters/base_connection.py: LOGGER.error(“Fatal Socket Error: %r”, error_value)
./dist-packages/pika/adapters/base_connection.py: LOGGER.error(“Socket Error: %s”, error_code)
确定 pika 版本.
>>> import pika
>>> pika.__version__
‘0.10.0’
确定错误逻辑
通过代码可以看到, Socket Error 是 errno 的错误码, 确定错误码含义是对端发送了 RST.
>>> import errno
>>> errno.errorcode[104]
‘ECONNRESET’
怀疑 rabbitmq server 地址错误, 一个未 listen 的端口是会返回 RST 的, 验证后发现不是. 接着怀疑链接超时断开未通知客户端之类. 看 rabbitmq server 日志, 发现大量:
=ERROR REPORT==== 7-Dec-2018::20:43:18 ===
closing AMQP connection <0.9753.18> (172.17.0.19:27542 -> 192.168.44.112:5672):
missed heartbeats from client, timeout: 60s

=ERROR REPORT==== 7-Dec-2018::20:43:18 ===
closing AMQP connection <0.9768.18> (172.17.0.19:27544 -> 192.168.44.112:5672):
missed heartbeats from client, timeout: 60s
发现 rabbitmq server 和 admin docker 的链接已经全部断开
root@xxxxxxx:/home/dingxinglong# netstat -nap | grep 5672 | grep “172.17.0.19”
那么, 为什么 rabbitmq server 会踢掉 pika 建立的链接呢? 看 pika 代码注释:
:param int heartbeat_interval: How often to send heartbeats.
Min between this value and server’s proposal
will be used. Use 0 to deactivate heartbeats
and None to accept server’s proposal.
我们没有传入心跳间隔, 理论上应该使用服务端默认的 60S. 实际上, 客户端从来没有发出过心跳包. 于是继续看代码: 通过打印, 确认了 HeartbeatChecker 对象成功创建, 也成功地创建了 timer, 但是 timer 从来没有回调过. 从代码一路跟下去, 我们用的是 blocking_connections, 在其 add_timeout 注释中看到:
def add_timeout(self, deadline, callback_method):
“””Create a single-shot timer to fire after deadline seconds. Do not
confuse with Tornado’s timeout where you pass in the time you want to
have your callback called. Only pass in the seconds until it’s to be
called.

NOTE: the timer callbacks are dispatched only in the scope of
specially-designated methods: see
`BlockingConnection.process_data_events` and
`BlockingChannel.start_consuming`.

:param float deadline: The number of seconds to wait to call callback
:param callable callback_method: The callback method with the signature
callback_method()
timer 的触发要靠 process_data_events, 而我们没有调用. 所以客户端的 heartbeat 从来没被触发. 简单地将 heartbeat 关掉以解决这个问题.
具体触发点
调用代码如下: 没有跑 main_loop, 故, 没处理 rabbitmq_server 的 FIN 包, 无法跟踪链接状态. 一路跟踪 basic_publish 接口的代码. 在发送时, 收到 RST, 最终跑到 base_connection.py:452, _handle_error 函数中打印 socket_error.
def connect_mq():
mq_conf = xxxxx
connection = pika.BlockingConnection(
pika.ConnectionParameters(mq_conf[‘host’],
int(mq_conf[‘port’]),
mq_conf[‘path’],
pika.PlainCredentials(mq_conf[‘user’],
mq_conf[‘pwd’]),
heartbeat_interval=0))
channel = connection.channel()
channel.exchange_declare(exchange=xxxxx, type=’direct’, durable=True)
return channel

channel = connect_mq()

def notify_xxxxx():
global channel

def _publish(product):
channel.basic_publish(exchange=xxxxx,
routing_key=’xxxxx’,
body=json.dumps({‘msg’: ‘xxxxx’}))

退出移动版