乐趣区

关于flink:Flink海量数据去重方案

前言

数据去重(data deduplication)是咱们大数据攻城狮司空见惯的问题了。除了统计 UV 等传统用法之外,去重的意义更在于打消不牢靠数据源产生的脏数据——即反复上报数据或反复投递数据的影响,使流式计算产生的后果更加精确。本文以 Flink 解决日均亿级别及以上的日志数据为背景,探讨除了奢侈办法(HashSet)之外的三种实时去重计划,即:布隆过滤器、RocksDB 状态后端、内部存储。

计划一、布隆过滤器去重

布隆过滤器在笔者的博客里出镜率是很高的,如果看官尚未理解,请务必先食用这篇文章。

以之前用过的子订单日志模型为例,假如上游数据源产生的音讯为 <Integer, Long, String> 三元组,三个元素别离代表站点 ID、子订单 ID 和数据载荷。因为数据源只能保障 at least once 语义(例如未开启 correlation ID 机制的 RabbitMQ 队列),会反复投递子订单数据,导致上游各统计后果偏高。现引入 Guava 的 BloomFilter 来去重,间接上代码说事。

 // dimensionedStream 是个 DataStream<Tuple3<Integer, Long, String>>
  DataStream<String> dedupStream = dimensionedStream
    .keyBy(0)
    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
    .name("process_sub_order_dedup").uid("process_sub_order_dedup");

  // 去重用的 ProcessFunction
  public static final class SubOrderDeduplicateProcessFunc
    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);
    private static final int BF_CARDINAL_THRESHOLD = 1000000;
    private static final double BF_FALSE_POSITIVE_RATE = 0.01;

    private volatile BloomFilter<Long> subOrderFilter;

    @Override
    public void open(Configuration parameters) throws Exception {long s = System.currentTimeMillis();
      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
      long e = System.currentTimeMillis();
      LOGGER.info("Created Guava BloomFilter, time cost:" + (e - s));
    }

    @Override
    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
      long subOrderId = value.f1;
      if (!subOrderFilter.mightContain(subOrderId)) {subOrderFilter.put(subOrderId);
        out.collect(value.f2);
      }
      ctx.timerService().registerProcessingTimeTimer(UnixTimeUtil.tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {long s = System.currentTimeMillis();
      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
      long e = System.currentTimeMillis();
      LOGGER.info("Timer triggered & resetted Guava BloomFilter, time cost:" + (e - s));
    }

    @Override
    public void close() throws Exception {subOrderFilter = null;}
  }

  // 依据以后工夫戳获取第二天 0 时 0 分 0 秒的工夫戳
  public static long tomorrowZeroTimestampMs(long now, int timeZone) {return now - (now + timeZone * 3600000) % 86400000 + 86400000;
  } 

这里先依照站点 ID 为 key 分组,而后在每个分组内创立存储子订单 ID 的布隆过滤器。布隆过滤器的冀望最大数据量应该按每天产生子订单最多的那个站点来设置,这里设为 100 万,并且可容忍的误判率为 1%。依据下面科普文中的解说,单个布隆过滤器须要 8 个哈希函数,其位图占用内存约 114MB,压力不大。

每当一条数据进入时,调用 BloomFilter.mightContain() 办法判断对应的子订单 ID 是否已呈现过。当没呈现过期,调用 put() 办法将其插入 BloomFilter,并交给 Collector 输入。

另外,通过注册第二天凌晨 0 时 0 分 0 秒的 processing time 计时器,就能够在 onTimer() 办法内重置布隆过滤器,开始新一天的去重。

(吐槽一句,Guava 的 BloomFilter 居然没有提供清零的办法,有点诡异)

计划二、内嵌 RocksDB 状态后端去重(自己最喜爱的一种)

布隆过滤器尽管香,然而它不能做到 100% 准确。在必须保障十拿九稳的场合,咱们能够抉择 Flink 自带的 RocksDB 状态后端,这样不须要依赖其余的组件。RocksDB 自身是一个相似于 HBase 的嵌入式 K - V 数据库,并且它的本地性比拟好,用它保护一个较大的状态汇合并不是什么难事。

首先咱们要开启 RocksDB 状态后端(平时在生产环境中,也倡议总是应用它),并配置好相应的参数。这些参数同样能够在 flink-conf.yaml 里写入。

RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(Consts.STATE_BACKEND_PATH, true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
rocksDBStateBackend.setNumberOfTransferingThreads(2);
rocksDBStateBackend.enableTtlCompactionFilter();

env.setStateBackend(rocksDBStateBackend);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(5 * 60 * 1000); 

RocksDB 的调优是个很简单的话题,详情参见官网提供的 tuning guide,以及 Flink 配置中与 RocksDB 相干的参数,今后会挑工夫重点剖析一下 RocksDB 存储大状态时的调优办法。好在 Flink 曾经为咱们提供了一些预调优的参数,即 PredefinedOptions,请务必依据服务器的理论状况抉择。咱们的 Flink 集群对立采纳 SSD 做存储,故抉择的是 PredefinedOptions.FLASH_SSD_OPTIMIZED。

另外,因为状态空间不小,关上增量检查点以及设定多线程读写 RocksDB,能够进步 checkpointing 效率,检查点周期也不能太短。还有,为了防止状态有限增长上来,咱们依然得定期清理它(即如同上节中布隆过滤器的复位)。当然,除了本人注册定时器之外,咱们也能够利用 Flink 提供的状态 TTL 机制,并关上 RocksDB 状态后端的 TTL compaction filter,让它们在 RocksDB 后盾执行 compaction 操作时主动删除。特地留神,状态 TTL 仅对工夫特色为解决工夫时失效,对事件工夫是有效的。

接下来写具体的业务代码,以上节的 < 站点 ID, 子订单 ID, 音讯载荷 > 三元组为例,有两种可实现的思路:

  • 依然按站点 ID 分组,用存储子订单 ID 的 MapState(当做 Set 来应用)保留状态;
  • 间接按子订单 ID 分组,用单值的 ValueState 保留状态。

显然,如果咱们要用状态 TTL 管制过期的话,第二种思路更好,因为粒度更细。代码如下。

 // dimensionedStream 是个 DataStream<Tuple3<Integer, Long, String>>
  DataStream<String> dedupStream = dimensionedStream
    .keyBy(1)
    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
    .name("process_sub_order_dedup").uid("process_sub_order_dedup");

  // 去重用的 ProcessFunction
  public static final class SubOrderDeduplicateProcessFunc
    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);

    private ValueState<Boolean> existState;

    @Override
    public void open(Configuration parameters) throws Exception {StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
        .setStateVisibility(StateVisibility.NeverReturnExpired)
        .setUpdateType(UpdateType.OnCreateAndWrite)
        .cleanupInRocksdbCompactFilter(10000)
        .build();

      ValueStateDescriptor<Boolean> existStateDesc = new ValueStateDescriptor<>(
        "suborder-dedup-state",
        Boolean.class
      );
      existStateDesc.enableTimeToLive(stateTtlConfig);

      existState = this.getRuntimeContext().getState(existStateDesc);
    }

    @Override
    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {if (existState.value() == null) {existState.update(true);
        out.collect(value.f2);
      }
    }
  } 

上述代码中设定了状态 TTL 的相干参数:

  • 过期工夫设为 1 天;
  • 在状态值被创立和被更新时重设 TTL;
  • 曾经过期的数据不能再被拜访到;
  • 在每解决 10000 条状态记录之后,更新检测过期的工夫戳。这个参数要小心设定,更新太频繁会升高 compaction 的性能,更新过慢会使得 compaction 不及时,状态空间收缩。

在理论解决数据时,如果数据的 key(即子订单 ID)对应的状态不存在,阐明它没有呈现过,能够更新状态并输入。反之,阐明它曾经呈现过了,间接抛弃,so easy。

最初还须要留神一点,若数据的 key 占用的空间比拟大(如长度可能会很长的字符串类型),也会造成状态收缩。咱们能够将它 hash 成整型再存储,这样每个 key 就最多只占用 8 个字节了。不过任何哈希算法都无奈保障不产生抵触,所以还是得依据业务场景自行决定。

计划三、引入内部 K - V 存储去重(Redis)

如果既不想用布隆过滤器,也不想在 Flink 作业内保护微小的状态,就只能用折衷方案了:利用内部 K - V 数据库(Redis、HBase 之类)存储须要去重的键。因为内部存储对内存和磁盘占用同样敏感,所以也得设定相应的 TTL,以及对大的键进行压缩。另外,内部 K - V 存储毕竟是独立于 Flink 框架之外的,一旦作业呈现问题重启,内部存储是不会与作业的 checkpoint 同步复原到统一的状态的,也就是说后果依然会呈现偏差,须要留神。

鉴于这种计划对第三方组件有强依赖,要关怀的货色太多,所以个别状况下是不必的,咱们也没有实操过,所以道歉没有代码了。

计划四、HyperLogLog 去重

未完待续

退出移动版