前言

数据去重(data deduplication)是咱们大数据攻城狮司空见惯的问题了。除了统计UV等传统用法之外,去重的意义更在于打消不牢靠数据源产生的脏数据——即反复上报数据或反复投递数据的影响,使流式计算产生的后果更加精确。本文以Flink解决日均亿级别及以上的日志数据为背景,探讨除了奢侈办法(HashSet)之外的三种实时去重计划,即:布隆过滤器、RocksDB状态后端、内部存储。

计划一、布隆过滤器去重

布隆过滤器在笔者的博客里出镜率是很高的,如果看官尚未理解,请务必先食用这篇文章。

以之前用过的子订单日志模型为例,假如上游数据源产生的音讯为<Integer, Long, String>三元组,三个元素别离代表站点ID、子订单ID和数据载荷。因为数据源只能保障at least once语义(例如未开启correlation ID机制的RabbitMQ队列),会反复投递子订单数据,导致上游各统计后果偏高。现引入Guava的BloomFilter来去重,间接上代码说事。

 // dimensionedStream是个DataStream<Tuple3<Integer, Long, String>>  DataStream<String> dedupStream = dimensionedStream    .keyBy(0)    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))    .name("process_sub_order_dedup").uid("process_sub_order_dedup");  // 去重用的ProcessFunction  public static final class SubOrderDeduplicateProcessFunc    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {    private static final long serialVersionUID = 1L;    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);    private static final int BF_CARDINAL_THRESHOLD = 1000000;    private static final double BF_FALSE_POSITIVE_RATE = 0.01;    private volatile BloomFilter<Long> subOrderFilter;    @Override    public void open(Configuration parameters) throws Exception {      long s = System.currentTimeMillis();      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);      long e = System.currentTimeMillis();      LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));    }    @Override    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {      long subOrderId = value.f1;      if (!subOrderFilter.mightContain(subOrderId)) {        subOrderFilter.put(subOrderId);        out.collect(value.f2);      }      ctx.timerService().registerProcessingTimeTimer(UnixTimeUtil.tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1);    }    @Override    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {      long s = System.currentTimeMillis();      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);      long e = System.currentTimeMillis();      LOGGER.info("Timer triggered & resetted Guava BloomFilter, time cost: " + (e - s));    }    @Override    public void close() throws Exception {      subOrderFilter = null;    }  }  // 依据以后工夫戳获取第二天0时0分0秒的工夫戳  public static long tomorrowZeroTimestampMs(long now, int timeZone) {    return now - (now + timeZone * 3600000) % 86400000 + 86400000;  } 

这里先依照站点ID为key分组,而后在每个分组内创立存储子订单ID的布隆过滤器。布隆过滤器的冀望最大数据量应该按每天产生子订单最多的那个站点来设置,这里设为100万,并且可容忍的误判率为1%。依据下面科普文中的解说,单个布隆过滤器须要8个哈希函数,其位图占用内存约114MB,压力不大。

每当一条数据进入时,调用BloomFilter.mightContain()办法判断对应的子订单ID是否已呈现过。当没呈现过期,调用put()办法将其插入BloomFilter,并交给Collector输入。

另外,通过注册第二天凌晨0时0分0秒的processing time计时器,就能够在onTimer()办法内重置布隆过滤器,开始新一天的去重。

(吐槽一句,Guava的BloomFilter居然没有提供清零的办法,有点诡异)

计划二、内嵌RocksDB状态后端去重(自己最喜爱的一种)

布隆过滤器尽管香,然而它不能做到100%准确。在必须保障十拿九稳的场合,咱们能够抉择Flink自带的RocksDB状态后端,这样不须要依赖其余的组件。RocksDB自身是一个相似于HBase的嵌入式K-V数据库,并且它的本地性比拟好,用它保护一个较大的状态汇合并不是什么难事。

首先咱们要开启RocksDB状态后端(平时在生产环境中,也倡议总是应用它),并配置好相应的参数。这些参数同样能够在flink-conf.yaml里写入。

RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(Consts.STATE_BACKEND_PATH, true);rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);rocksDBStateBackend.setNumberOfTransferingThreads(2);rocksDBStateBackend.enableTtlCompactionFilter();env.setStateBackend(rocksDBStateBackend);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(5 * 60 * 1000); 

RocksDB的调优是个很简单的话题,详情参见官网提供的tuning guide,以及Flink配置中与RocksDB相干的参数,今后会挑工夫重点剖析一下RocksDB存储大状态时的调优办法。好在Flink曾经为咱们提供了一些预调优的参数,即PredefinedOptions,请务必依据服务器的理论状况抉择。咱们的Flink集群对立采纳SSD做存储,故抉择的是PredefinedOptions.FLASH_SSD_OPTIMIZED。

另外,因为状态空间不小,关上增量检查点以及设定多线程读写RocksDB,能够进步checkpointing效率,检查点周期也不能太短。还有,为了防止状态有限增长上来,咱们依然得定期清理它(即如同上节中布隆过滤器的复位)。当然,除了本人注册定时器之外,咱们也能够利用Flink提供的状态TTL机制,并关上RocksDB状态后端的TTL compaction filter,让它们在RocksDB后盾执行compaction操作时主动删除。特地留神,状态TTL仅对工夫特色为解决工夫时失效,对事件工夫是有效的。

接下来写具体的业务代码,以上节的<站点ID, 子订单ID, 音讯载荷>三元组为例,有两种可实现的思路:

  • 依然按站点ID分组,用存储子订单ID的MapState(当做Set来应用)保留状态;
  • 间接按子订单ID分组,用单值的ValueState保留状态。

显然,如果咱们要用状态TTL管制过期的话,第二种思路更好,因为粒度更细。代码如下。

 // dimensionedStream是个DataStream<Tuple3<Integer, Long, String>>  DataStream<String> dedupStream = dimensionedStream    .keyBy(1)    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))    .name("process_sub_order_dedup").uid("process_sub_order_dedup");  // 去重用的ProcessFunction  public static final class SubOrderDeduplicateProcessFunc    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {    private static final long serialVersionUID = 1L;    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);    private ValueState<Boolean> existState;    @Override    public void open(Configuration parameters) throws Exception {      StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))        .setStateVisibility(StateVisibility.NeverReturnExpired)        .setUpdateType(UpdateType.OnCreateAndWrite)        .cleanupInRocksdbCompactFilter(10000)        .build();      ValueStateDescriptor<Boolean> existStateDesc = new ValueStateDescriptor<>(        "suborder-dedup-state",        Boolean.class      );      existStateDesc.enableTimeToLive(stateTtlConfig);      existState = this.getRuntimeContext().getState(existStateDesc);    }    @Override    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {      if (existState.value() == null) {        existState.update(true);        out.collect(value.f2);      }    }  } 

上述代码中设定了状态TTL的相干参数:

  • 过期工夫设为1天;
  • 在状态值被创立和被更新时重设TTL;
  • 曾经过期的数据不能再被拜访到;
  • 在每解决10000条状态记录之后,更新检测过期的工夫戳。这个参数要小心设定,更新太频繁会升高compaction的性能,更新过慢会使得compaction不及时,状态空间收缩。

在理论解决数据时,如果数据的key(即子订单ID)对应的状态不存在,阐明它没有呈现过,能够更新状态并输入。反之,阐明它曾经呈现过了,间接抛弃,so easy。

最初还须要留神一点,若数据的key占用的空间比拟大(如长度可能会很长的字符串类型),也会造成状态收缩。咱们能够将它hash成整型再存储,这样每个key就最多只占用8个字节了。不过任何哈希算法都无奈保障不产生抵触,所以还是得依据业务场景自行决定。

计划三、引入内部K-V存储去重(Redis)

如果既不想用布隆过滤器,也不想在Flink作业内保护微小的状态,就只能用折衷方案了:利用内部K-V数据库(Redis、HBase之类)存储须要去重的键。因为内部存储对内存和磁盘占用同样敏感,所以也得设定相应的TTL,以及对大的键进行压缩。另外,内部K-V存储毕竟是独立于Flink框架之外的,一旦作业呈现问题重启,内部存储是不会与作业的checkpoint同步复原到统一的状态的,也就是说后果依然会呈现偏差,须要留神。

鉴于这种计划对第三方组件有强依赖,要关怀的货色太多,所以个别状况下是不必的,咱们也没有实操过,所以道歉没有代码了。

计划四、HyperLogLog去重

未完待续