把布隆过滤器用起来

5次阅读

共计 6623 个字符,预计需要花费 17 分钟才能阅读完成。

本文偏应用和代码实践,理论请参考本文末尾参考文章

简介

一句话简介:
过滤器,判断这个元素在与不在,不在则 100% 不在;在则去查询,b 确认在不在。

详细简介:
BloomFilter,中文名称叫做布隆过滤器,是 1970 年由 Bloom 提出的,它可以被用来检测一个元素是否在一个集合中,它的空间利用效率很高,使用它可以大大节省存储空间。BloomFilter 使用位数组表示一个待检测集合,并可以快速地通过概率算法判断一个元素是否存在于这个集合中,所以利用这个算法我们可以实现去重效果。

它的优点是空间效率和查询时间都远远超过一般算法,缺点是有一定的误识别率和删除困难。

场景

1、大量爬虫数据去重

2、保护数据安全:
广告精确投放:广告主通过设备 id,计算 hash 算法,在数据包(数据提供方)中去查找,如果在存在,则证明该设备 id 属于目标人群,进行投放广告,同时保证设备 id 不泄露。数据提供方和广告主都没有暴露自己拥有的设备 id。间接用户画像且不违数据安全法。详见:https://zhuanlan.zhihu.com/p/…

3、比特币网络转账确认

SPV 节点:SPV 是“Simplified Payment Verification”(简单支付验证)的缩写。中本聪论文简要地提及了这一概念,指出:不运行完全节点也可验证支付,用户只需要保存所有的 block header 就可以了。用户虽然不能自己验证交易,但如果能够从区块链的某处找到相符的交易,他就可以知道网络已经认可了这笔交易,而且得到了网络的多少个确认。


先去访问布隆过滤器,去判断交易记录是否在某个 block(区块)里存在。从海量数据 (十亿个区块,每个区块 1 -2M 的交易记录,),快速得到结果。
详见:https://www.youtube.com/watch…

4、分布式系统(Map-Reduce)
把大任务切分成块,分配和验证一个子任务是否在一个子系统上。

必要性

省空间,提升效率

我们首先来回顾一下 ScrapyRedis 的去重机制,它将 Request 的指纹存储到了 Redis 集合中,每个指纹的长度为 40,例如 27adcc2e8979cdee0c9cecbbe8bf8ff51edefb61 就是一个指纹,它的每一位都是 16 进制数。

让我们来计算一下用这种方式耗费的存储空间,每个 16 进制数占用 4b,1 个指纹用 40 个 16 进制数表示,占用空间为 20B,所以 1 万个指纹即占用空间 200KB,1 亿个指纹即占用 2G,所以当我们的爬取数量达到上亿级别时,Redis 的占用的内存就会变得很高,而且这仅仅是指纹的存储,另外 Redis 还存储了爬取队列,内存占用会进一步提高,更别说有多个 Scrapy 项目同时爬取的情况了。所以当爬取达到亿级别规模时 ScrapyRedis 提供的集合去重已经不能满足我们的要求,所以在这里我们需要使用一个更加节省内存的去重算法,它叫做 BloomFilter。

(内存版)Python 实现的内存版布隆过滤器 pybloom

https://github.com/jaybaird/p…
安装:

pip install pybloom

该模块包含两个类实现布隆过滤器功能。
BloomFilter 是定容。
ScalableBloomFilter 可以自动扩容

使用:

>>> from pybloom import BloomFilter
>>> f = BloomFilter(capacity=1000, error_rate=0.001) # capacity 是容量, error_rate 是能容忍的误报率
>>> f.add('Traim304') # 当不存在该元素, 返回 False
False
>>> f.add('Traim304') # 若存在, 返回 True
True
>>> 'Traim304' in f # 值得注意的是若返回 True。该元素可能存在, 也可能不存在。过滤器能容许存在一定的错误
True
>>> 'Jacob' in f # 但是 False。则必定不存在
False
>>> len(f) # 当前存在的元素
1

>>> f = BloomFilter(capacity=1000, error_rate=0.001) 
>>> from pybloom import ScalableBloomFilter
>>> sbf = ScalableBloomFilter(mode=ScalableBloomFilter.SMALL_SET_GROWTH)
>>> # sbf.add() 与 BloomFilter 同

超过误报率时抛出异常

>>> f = BloomFilter(capacity=1000, error_rate=0.0000001)
>>> for a in range(1000):
...     _ = f.add(a)
...
>>> len(a)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: object of type 'int' has no len()
>>> len(f)
1000
>>> f.add(1000)
False
>>> f.add(1001) # 当误报率超过 error_rate 会报错
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/site-packages/pybloom/pybloom.py", line 182, in add
    raise IndexError("BloomFilter is at capacity")
IndexError: BloomFilter is at capacity

(持久化)手动实现的 redis 版布隆过滤器

大数据量,多用 Redis 持久化版本的布隆过滤器

# 布隆过滤器 redis 版本实现
import hashlib
import redis
import six

# 1. 多个 hash 函数的实现和求值
# 2. hash 表实现和实现对应的映射和判断


class MultipleHash(object):
    '''根据提供的原始数据,和预定义的多个 salt,生成多个 hash 函数值'''

    def __init__(self, salts, hash_func_name="md5"):
        self.hash_func = getattr(hashlib, hash_func_name)
        if len(salts) < 3:
            raise Exception("请至少提供 3 个 salt")
        self.salts = salts

    def get_hash_values(self, data):
        '''根据提供的原始数据, 返回多个 hash 函数值'''
        hash_values = []
        for i in self.salts:
            hash_obj = self.hash_func()
            hash_obj.update(self._safe_data(data))
            hash_obj.update(self._safe_data(i))
            ret = hash_obj.hexdigest()
            hash_values.append(int(ret, 16))
        return hash_values

    def _safe_data(self, data):
        '''
        python2   str  === python3   bytes
        python2   uniocde === python3  str
        :param data: 给定的原始数据
        :return: 二进制类型的字符串数据
        '''
        if six.PY3:
            if isinstance(data, bytes):
                return data
            elif isinstance(data, str):
                return data.encode()
            else:
                raise Exception("请提供一个字符串")   # 建议使用英文来描述
        else:
            if isinstance(data, str):
                return data
            elif isinstance(data, unicode):
                return data.encode()
            else:
                raise Exception("请提供一个字符串")   # 建议使用英文来描述


class BloomFilter(object):
    ''''''def __init__(self, salts, redis_host="localhost", redis_port=6379, redis_db=0, redis_key="bloomfilter"):
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.redis_db = redis_db
        self.redis_key = redis_key
        self.client = self._get_redis_client()
        self.multiple_hash = MultipleHash(salts)

    def _get_redis_client(self):
        '''返回一个 redis 连接对象'''
        pool = redis.ConnectionPool(host=self.redis_host, port=self.redis_port, db=self.redis_db)
        client = redis.StrictRedis(connection_pool=pool)
        return client

    def save(self, data):
        ''''''
        hash_values = self.multiple_hash.get_hash_values(data)
        for hash_value in hash_values:
            offset = self._get_offset(hash_value)
            self.client.setbit(self.redis_key, offset, 1)
        return True

    def is_exists(self, data):
        hash_values = self.multiple_hash.get_hash_values(data)
        for hash_value in hash_values:
            offset = self._get_offset(hash_value)
            v = self.client.getbit(self.redis_key, offset)
            if v == 0:
                return False
        return True

    def _get_offset(self, hash_value):
        # 512M 长度哈希表 
        # 2**8 = 256
        # 2**20 = 1024 * 1024
        # (2**8 * 2**20 * 2*3) 代表 hash 表的长度  如果同一项目中不能更改
        return hash_value % (2**8 * 2**20 * 2*3)
if __name__ == '__main__':

    data = ["asdfasdf", "123", "123", "456","asf", "asf"]

    bm = BloomFilter(salts=["1","2","3", "4"],redis_host="172.17.0.2")
    for d in data:
        if not bm.is_exists(d):
            bm.save(d)
            print("映射数据成功:", d)
        else:
            print("发现重复数据:", d)

应用在 scrapy-redis 中

代码已经打包成了一个 Python 包并发布到了 PyPi,链接为:https://pypi.python.org/pypi/…,因此我们以后如果想使用 ScrapyRedisBloomFilter 直接使用就好了,不需要再自己实现一遍。

我们可以直接使用 Pip 来安装,命令如下:

pip3 install scrapy-redis-bloomfilter

使用的方法和 ScrapyRedis 基本相似,在这里说明几个关键配置:

# 去重类,要使用 BloomFilter 请替换 DUPEFILTER_CLASS
DUPEFILTER_CLASS = "scrapy_redis_bloomfilter.dupefilter.RFPDupeFilter"
# 哈希函数的个数,默认为 6,可以自行修改
BLOOMFILTER_HASH_NUMBER = 6
# BloomFilter 的 bit 参数,默认 30,占用 128MB 空间,去重量级 1 亿
BLOOMFILTER_BIT = 30

DUPEFILTER_CLASS 是去重类,如果要使用 BloomFilter 需要将 DUPEFILTER_CLASS 修改为该包的去重类。

BLOOMFILTER_HASH_NUMBER 是 BloomFilter 使用的哈希函数的个数,默认为 6,可以根据去重量级自行修改。

BLOOMFILTER_BIT 即前文所介绍的 BloomFilter 类的 bit 参数,它决定了位数组的位数,如果 BLOOMFILTER_BIT 为 30,那么位数组位数为 2 的 30 次方,将占用 Redis 128MB 的存储空间,去重量级在 1 亿左右,即对应爬取量级 1 亿左右。如果爬取量级在 10 亿、20 亿甚至 100 亿,请务必将此参数对应调高。

测试

Spider 文件:
from scrapy import Request, Spider

class TestSpider(Spider):
    name = 'test'
    base_url = 'https://www.baidu.com/s?wd='

    def start_requests(self):
        for i in range(10):
            url = self.base_url + str(i)
            yield Request(url, callback=self.parse)

        # Here contains 10 duplicated Requests    
        for i in range(100): 
            url = self.base_url + str(i)
            yield Request(url, callback=self.parse)

    def parse(self, response):
        self.logger.debug('Response of' + response.url)

在 start_requests() 方法中首先循环 10 次,构造参数为 0-9 的 URL,然后重新循环了 100 次,构造了参数为 0-99 的 URL,那么这里就会包含 10 个重复的 Request,我们运行项目测试一下:

scrapy crawl test

可以看到最后的输出结果如下:

{'bloomfilter/filtered': 10,
 'downloader/request_bytes': 34021,
 'downloader/request_count': 100,
 'downloader/request_method_count/GET': 100,
 'downloader/response_bytes': 72943,
 'downloader/response_count': 100,
 'downloader/response_status_count/200': 100,
 'finish_reason': 'finished',
 'finish_time': datetime.datetime(2017, 8, 11, 9, 34, 30, 419597),
 'log_count/DEBUG': 202,
 'log_count/INFO': 7,
 'memusage/max': 54153216,
 'memusage/startup': 54153216,
 'response_received_count': 100,
 'scheduler/dequeued/redis': 100,
 'scheduler/enqueued/redis': 100,
 'start_time': datetime.datetime(2017, 8, 11, 9, 34, 26, 495018)}

可以看到最后统计的第一行的结果:

'bloomfilter/filtered': 10,

这就是 BloomFilter 过滤后的统计结果,可以看到它的过滤个数为 10 个,也就是它成功将重复的 10 个 Reqeust 识别出来了,测试通过。

原理

本文偏应用,难以描述的原理,最后说。
一个很长的二进制向量和一个映射函数。

参考资料

1、https://zhuanlan.zhihu.com/p/…
2、https://www.youtube.com/watch…
3、《python3 网络爬虫开发实战》崔庆才
4、https://www.jianshu.com/p/f57…

正文完
 0