共计 1428 个字符,预计需要花费 4 分钟才能阅读完成。
在编码中,一些波及网络连接的代码片段常常须要重试,本文解说了如何一步一步实现一个优雅的 retry
装璜器以及 tenacity
库的应用。
原始版本v0.0
如果有一个函数模式如下,函数有一些建设网络连接的逻辑
def f():
# do some connections
return 0
为了防止偶然的网络连接失败,须要加上重试机制,那么最简略的模式就是在对应的代码片段加一个循环,循环体里应用异样捕捉,连贯胜利时退出循环,否则就反复执行相干逻辑,此时批改之后的函数 f
如下
def f():
while 1:
try:
# do some connections
break
except ConnectionError:
continue
return 0
装璜器版本v1.0
能够应用装璜器对代码进行形象。例如当初有两个函数 f1
和f2
须要加上重试机制,写一个名为 retry
的装璜器函数,用其装璜 f1
和f2
即可。这样做防止了对老代码的批改,同时也实现了代码复用。示例如下
def retry(f):
def wrap(*args, **kwargs):
while 1:
try:
return f(*args, **kwargs)
except ConnectionError:
continue
return wrap
@retry()
def f1():
# do some connections
return 0
@retry()
def f2():
# do some other connections
return 0
带参数的装璜器版本v1.1
v1.0
的版本 retry
装璜器还有一些问题,如果有的函数想重试 3 次,有的想重试 5 次,重试的距离也依据不同函数不一样,v1.0
是无奈实现的。此时能够借助带参数的三层装璜器,例如以下代码实现的 retry
装璜器,能够传入 times
和interval
两个参数来设定重试次数和重试距离
def retry(times, interval):
def decorator(f)
def wrap(*args, **kwargs):
while times:
try:
return f(*args, **kwargs)
except ConnectionError:
times -= 1
time.sleep(interval)
continue
return wrap
return decorator
# 重试 3 次每次距离 10 秒
@retry(times=3, interval=10)
def f1():
# do some connections
return 0
# 重试 5 次每次距离 15 秒
@retry(times=5, interval=15)
def f2():
# do some other connections
return 0
装璜器反对参数之后,能够依据须要定义更丰盛的参数,比方通过参数来设定须要捕捉哪些异样等。
tenacity
版本
tenacity
是一个第三方开源库,用于函数的重试,实际上它的性能与原理是下面本人写的代码相似的!只是它可定义的参数更丰盛,如果不想反复造轮子,拿来间接用就能够。代码示例如下
from tenacity import retry, stop_after_attempt, wait_fixed
# 不带任何参数的重试
@retry
def f():
# do some connections
return 0
# 重试 5 次每次距离 15 秒
@retry(stop=stop_after_attempt(5), wait=wait_fixed(15))
def f():
# do some connections
return 0
正文完