python-redispy-源码分析

38次阅读

共计 2033 个字符,预计需要花费 6 分钟才能阅读完成。

一 概述
通常 python 连接 redis 数据库用的 redis-py 这个三方库,最近研究了一下它的源码

它连接服务器的函数是这个 _connect 函数

    def _connect(self****):
    """创建一个 tcp socket 连接"""
    # we want to mimic wha****t socket.create_connection does to support
    # ipv4/ipv6, but we want to set options prior to calling
    # socket.connect()
    err = None
    for res in socket.getaddrinfo(self.host, self.port, 0,
                                  socket.SOCK_STREAM):
        family, socktype, proto, canonname, socket_address = res
        sock = None
        try:
            sock = socket.socket(family, socktype, proto)
            # TCP_NODELAY
            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)

            # TCP 长连接
            if self.socket_keepalive:
                sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                for k, v in iteritems(self.socket_keepalive_options):
                    sock.setsockopt(socket.SOL_TCP, k, v)

            # 我们连接之前设置 socket_connect_timeout 超时时间
            sock.settimeout(self.socket_connect_timeout)

            # 连接
            sock.connect(socket_address)

            # 连接成功后设置 socket_timeout 时间
            sock.settimeout(self.socket_timeout)
            return sock

        except socket.error as _:
            err = _
            if sock is not None:
                sock.close()

    if err is not None:
        raise err
    raise socket.error("socket.getaddrinfo returned an empty list")


on_connect 函数,实列化连接,有密码则发送密码,选择数据表

 def on_connect(self):
    "Initialize the connection, authenticate and select a database"
    self._parser.on_connect(self)

    # if a password is specified, authenticate
    if self.password:
        # 如果 redis 服务做了进一步的安全加固,也是在这里进行鉴权
        self.send_command('AUTH', self.password)
        if nativestr(self.read_response()) != 'OK':
            raise AuthenticationError('Invalid Password')

    # if a database is specified, switch to it
    if self.db:
        self.send_command('SELECT', self.db)
        if nativestr(self.read_response()) != 'OK':
            raise ConnectionError('Invalid Database')
            

执行命令函数 execute_command,从连接池获取连接并发送命令

  def execute_command(self, *args, **options):
    "Execute a command and return a parsed response"
    pool = self.connection_pool
    command_name = args[0]
    connection = pool.get_connection(command_name, **options)
    try:
        connection.send_command(*args)
        return self.parse_response(connection, command_name, **options)
    except (ConnectionError, TimeoutError) as e:
        connection.disconnect()
        if not connection.retry_on_timeout and isinstance(e, TimeoutError):
            raise
        connection.send_command(*args)
        return self.parse_response(connection, command_name, **options)
    finally:
        pool.release(connection)

正文完
 0