一 概述
通常 python 连接 redis 数据库用的 redis-py 这个三方库,最近研究了一下它的源码
它连接服务器的函数是这个 _connect 函数
def _connect(self****):
"""创建一个 tcp socket 连接"""
# we want to mimic wha****t socket.create_connection does to support
# ipv4/ipv6, but we want to set options prior to calling
# socket.connect()
err = None
for res in socket.getaddrinfo(self.host, self.port, 0,
socket.SOCK_STREAM):
family, socktype, proto, canonname, socket_address = res
sock = None
try:
sock = socket.socket(family, socktype, proto)
# TCP_NODELAY
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# TCP 长连接
if self.socket_keepalive:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
for k, v in iteritems(self.socket_keepalive_options):
sock.setsockopt(socket.SOL_TCP, k, v)
# 我们连接之前设置 socket_connect_timeout 超时时间
sock.settimeout(self.socket_connect_timeout)
# 连接
sock.connect(socket_address)
# 连接成功后设置 socket_timeout 时间
sock.settimeout(self.socket_timeout)
return sock
except socket.error as _:
err = _
if sock is not None:
sock.close()
if err is not None:
raise err
raise socket.error("socket.getaddrinfo returned an empty list")
on_connect 函数,实列化连接,有密码则发送密码,选择数据表
def on_connect(self):
"Initialize the connection, authenticate and select a database"
self._parser.on_connect(self)
# if a password is specified, authenticate
if self.password:
# 如果 redis 服务做了进一步的安全加固,也是在这里进行鉴权
self.send_command('AUTH', self.password)
if nativestr(self.read_response()) != 'OK':
raise AuthenticationError('Invalid Password')
# if a database is specified, switch to it
if self.db:
self.send_command('SELECT', self.db)
if nativestr(self.read_response()) != 'OK':
raise ConnectionError('Invalid Database')
执行命令函数 execute_command,从连接池获取连接并发送命令
def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
except (ConnectionError, TimeoutError) as e:
connection.disconnect()
if not connection.retry_on_timeout and isinstance(e, TimeoutError):
raise
connection.send_command(*args)
return self.parse_response(connection, command_name, **options)
finally:
pool.release(connection)