import redis
import contextlib
import pickle
import os, socket, threading
class RedisLock:
def __init__(self, lock_name, host='', port=6379, db=0):
self.lock_name = lock_name
self.redis = redis.Redis(connection_pool=redis.ConnectionPool(host=host, port=port, db=db))
def acquire_lock(self, lock_id, expire=None):
lock_id = lock_id if lock_id else self.get_lock_id()
return True if self.redis.set(self.lock_name, pickle.dumps(lock_id), nx=True, ex=expire) else False
# Above 1 line code can replace with follow codes to debug
# if self.redis.set(self.lock_name, pickle.dumps(lock_id), nx=True, ex=expire):
# print('Lock Succeed')
# return True
# else:
# print('Lock Failed')
# return False
def release_lock(self, lock_id=None):
lock_id = lock_id if lock_id else self.get_lock_id()
if lock_id == pickle.loads(self.redis.get(self.lock_name)):
self.redis.delete(self.lock_name)
# print('Unlock Succeed')
return True
else:
# print('Unlock Failed')
return False
@contextlib.contextmanager
def lock(self, lock_id=None, expire=None):
if not self.acquire_lock(lock_id, expire):
exit(0)
yield self
self.release_lock(lock_id)
def get_lock_id(self):
"""hostname+processID+threadName"""
return f'{socket.gethostname()}{os.getpid()}{threading.current_thread().name}'
redis_lock = RedisLock('lockname', host='Your IP') # 第一个匿名参数必传,作为 redis 的 key
with redis_lock.lock() as lock:
print('You Can Do Something Here')
1. 注释部分是我写的时候,调试用的代码,最后写完的时候都替换为简洁的语法.
2. 因为锁具有互斥特性,所以选择 set() 的 nx 参数来实现,nx 参数:我个人一直这样记(读作 not exist)====> 不存在
理解:不存在则添加,存在就不添加了。举一反三,没有锁就加个锁。有锁就不加锁了。3. set 方法 的 ex 参数,可代替 expire 方法的来设置过期时间
4. redis 有 很多指令变形。比如 set(nx=, ex=) 可拆分为 setnx setex
但 "set() 这种指令更优",能用就尽量用,理由如下:"set 指令 好处是 set 具有原子性", 避免了解决资源竞争的同时引发自身可能出现的资源竞争
5. 我使用装饰器版 的 上下文管理器,对代码做了封装,所以调用时,用 "with 语句" 即可
6. with redis_lock.lock() as lock,"lock() 这里可以自己指定 2 个参数":lock_id=None # 这是区分不同线程的唯一标识符,默认为(主机名 + 进程 ID+ 线程名),可自传
expire=None # 过期时间,秒为单位
7. 需要注意一个点,与 redis 通信是以二进制形式。所以我在代码内部对 lock_id 做了 "pickle 序列化"
当然如果是字符串用 encode() 与 decode() 来实现也是可以的。