关于flink:FlinkDeduplicate-去重算子源码解读

56次阅读

共计 6309 个字符,预计需要花费 16 分钟才能阅读完成。

语法

https://nightlies.apache.org/flink/flink-docs-master/docs/dev…

SELECT [column_list]
FROM (SELECT [column_list],
         ROW_NUMBER() OVER ([PARTITIONBY col1[, col2...]]
    ORDER BY time_attr [asc|desc])AS rownum
    FROM table_name
)
WHERE rownum= 1

留神点:

  • ORDER BY 后的字段必须是工夫属性(process time/row time)

Minibatch 开关

开启 MiniBatch 时应用 KeyedMapBundleOperator,否则应用 KeyedProcessOperator。

状态应用

Event Time Process Time
开启 minibatch RowTimeMiniBatchDeduplicateFunction ProcTimeMiniBatchDeduplicateKeepLastRowFunction
ProcTimeMiniBatchDeduplicateKeepFirstRowFunction
没开启 minibatch RowTimeDeduplicateFunction ProcTimeDeduplicateKeepLastRowFunction
ProcTimeDeduplicateKeepFirstRowFunction

在 Event Time 场景下,每条数据到来后必须比照其附带的事件工夫和该算子已存储的事件工夫进行比照,因而只需一个函数对立做逻辑解决。

而解决工夫是由以后解决数据的算子赋予,因而能够间接简化为两种场景:保留第一条和保留最初一条:

  • First Row:存储第一条过去的数据,并抛弃前面来的所有数据即可,只能解决上游是 Append-only 的数据,如果是 Process Time 场景也只会产生 Insert 数据;
  • Last Row:每次到来数据须要依据工夫属性留下最新的一条,如果以后的数据是最新的,则下发回撤老数据。

所有数据的最终解决逻辑最终会落到 DeduplicateFunctionHelper,因而咱们能够通过浏览 DeduplicateFunctionHelper 的源码查看不同场景的解决状况。

DeduplicateFunctionHelper

去重函数的解决都最终调用这个工具类的办法

Process Time&Last Row

Process Time 依据是否解决回撤音讯分为两种:

  • processLastRowOnProcTime:仅反对解决 INSERT 音讯;
  • processLastRowOnChangelog:除 INSERT 外可解决回撤等音讯。

Last Row on Proctime

static void processLastRowOnProcTime(
            RowData currentRow,
            boolean generateUpdateBefore,
            boolean generateInsert,
            ValueState<RowData> state,
            Collector<RowData> out)
            throws Exception {checkInsertOnly(currentRow);
  if (generateUpdateBefore || generateInsert) {
    // use state to keep the previous row content if we need to generate UPDATE_BEFORE
    // or use to distinguish the first row, if we need to generate INSERT
    RowData preRow = state.value();
    state.update(currentRow);
    if (preRow == null) {
      // the first row, send INSERT message
      currentRow.setRowKind(RowKind.INSERT);
      out.collect(currentRow);
    } else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);
        out.collect(preRow);
      }
      currentRow.setRowKind(RowKind.UPDATE_AFTER);
      out.collect(currentRow);
    }
  } else {
    // always send UPDATE_AFTER if INSERT is not needed
    currentRow.setRowKind(RowKind.UPDATE_AFTER);
    out.collect(currentRow);
  }
}
  1. 查看该音讯是否是 INSERT 格局(只承受 INSERT 格局音讯);
  2. 查看该节点发送的音讯类型(generateUpdateBefore 的值由 changelogmode 和上游须要的音讯类型独特决定是否须要下发回撤,generateInsert 由 table.exec.deduplicate.insert-update-after-sensitive-enabled 配置确定)是否反对 UA 和 I;
  3. 如果不反对发回撤且不反对发 INSERT 则间接发送 UA 音讯到上游;
  4. 如果反对,查看以后这条数据是否是第一条数据(依据状态查问,状态里会保留上一条到来的数据),并更新状态为以后数据;
  5. 如果是第一条数据,则附上 +I 标识发送音讯到上游;
  6. 如果不是第一条数据,查看该节点是否反对发送 UB 音讯;
  7. 如果须要,则附上 -UB 标识发送回撤上一条数据;
  8. 发送 +UA 音讯。

Last Row on Changelog

static void processLastRowOnChangelog(
        RowData currentRow,
        boolean generateUpdateBefore,
        ValueState<RowData> state,
        Collector<RowData> out,
        boolean isStateTtlEnabled,
        RecordEqualiser equaliser)
        throws Exception {RowData preRow = state.value();
    RowKind currentKind = currentRow.getRowKind();
    if (currentKind == RowKind.INSERT || currentKind == RowKind.UPDATE_AFTER) {if (preRow == null) {
            // the first row, send INSERT message
            currentRow.setRowKind(RowKind.INSERT);
            out.collect(currentRow);
        } else {if (!isStateTtlEnabled && equaliser.equals(preRow, currentRow)) {
                // currentRow is the same as preRow and state cleaning is not enabled.
                // We do not emit retraction and update message.
                // If state cleaning is enabled, we have to emit messages to prevent too early
                // state eviction of downstream operators.
                return;
            } else {if (generateUpdateBefore) {preRow.setRowKind(RowKind.UPDATE_BEFORE);
                    out.collect(preRow);
                }
                currentRow.setRowKind(RowKind.UPDATE_AFTER);
                out.collect(currentRow);
            }
        }
        // normalize row kind
        currentRow.setRowKind(RowKind.INSERT);
        // save to state
        state.update(currentRow);
    } else {
        // DELETE or UPDATER_BEFORE
        if (preRow != null) {
            // always set to DELETE because this row has been removed
            // even the input is UPDATE_BEFORE, there may no UPDATE_AFTER after it.
            preRow.setRowKind(RowKind.DELETE);
            // output the preRow instead of currentRow,
            // because preRow always contains the full content.
            // currentRow may only contain key parts (e.g. Kafka tombstone records).
            out.collect(preRow);
            // clear state as the row has been removed
            state.clear();}
        // nothing to do if removing a non-existed row
    }
}

changelog 场景下,因为须要解决回撤信息,去重逻辑绝对简单一点:

  1. 查看以后数据是 accumulate(INSERT 或 UPDATE)还是回撤音讯;
  2. 如果是回撤音讯,且状态不为空,则将状态的数据附上 -D 标识下发回撤,留神:这里回撤用的数据必须是状态里的,因为以后的数据可能只蕴含 key 局部;
  3. 如果不是回撤数据,则查看状态数据是否为空(即以后数据是否是第一条数据),如果是,则附上 +I 标识下发;
  4. 如果没有启用 TTL 且以后数据和状态里的数据统一,则不下发(缩小大量反复数据场景下下发的数据量);
  5. 如果反对发送 UB 音讯(generateUpdateBefore 为 true),则附上 -UB 标识下发回撤;
  6. 为以后音讯附上 +UA 标识下发;
  7. 更新状态为以后音讯。

Process Time&First Row

static void processFirstRowOnProcTime(RowData currentRow, ValueState<Boolean> state, Collector<RowData> out)
            throws Exception {checkInsertOnly(currentRow);
  // ignore record if it is not first row
  if (state.value() != null) {return;}
  state.update(true);
  // emit the first row which is INSERT message
  out.collect(currentRow);
}
  1. 查看该音讯是否是 INSERT 格局(只承受 INSERT 格局音讯);
  2. 查看该条数据是否是第一条达到的数据(通过状态查问是否来过数据);
  3. 如果是,则间接疏忽不下发,否则附上 +I 标识下发该条数据。

Event Time

因为数据到来可能存在乱序,最早到的数据不肯定是 Event Time 最老的,最初到的数据也不肯定是 Event Time 最新的,因而 Event Time 的去重场景不须要像 Process Time 那样针对 First Row 和 Last Row 别离实现一套逻辑,只需在查看以后数据是否须要下发时采取不同的策略即可。

Event Time 去重场景实现可参考 RowTimeDeduplicateFunction:

public static void deduplicateOnRowTime(
        ValueState<RowData> state,
        RowData currentRow,
        Collector<RowData> out,
        boolean generateUpdateBefore,
        boolean generateInsert,
        int rowtimeIndex,
        boolean keepLastRow)
        throws Exception {checkInsertOnly(currentRow);
    RowData preRow = state.value();

    if (isDuplicate(preRow, currentRow, rowtimeIndex, keepLastRow)) {updateDeduplicateResult(generateUpdateBefore, generateInsert, preRow, currentRow, out);
        state.update(currentRow);
    }
}
  1. 首先查看音讯是否是 INSERT;
  2. 调用 isDuplicate 判断该音讯是否应该下发,而 Event Time 去重的逻辑精华就在于此;
  3. 下发音讯并更新状态。

判断音讯优先级

static boolean isDuplicate(RowData preRow, RowData currentRow, int rowtimeIndex, boolean keepLastRow) {if (keepLastRow) {
    return preRow == null
      || getRowtime(preRow, rowtimeIndex) <= getRowtime(currentRow, rowtimeIndex);
  } else {
    return preRow == null
      || getRowtime(currentRow, rowtimeIndex) < getRowtime(preRow, rowtimeIndex);
  }
}
  1. 如果是保留最新一条数据(Last Row),则比拟以后数据的事件工夫是否大于等于先前数据的事件工夫;
  2. 如果是保留最早一条数据(First Row),则比拟以后数据的事件工夫是否小于先前数据的事件工夫。

注:这里为什么第一个判断用大于等于,第二个判断用小于?因为第一种状况的语义是最新一条数据,因而两条数据事件工夫一样,取起初的数据,而第二种状况的语义是最早一条数据,两条数据事件工夫一样时取先来的数据。

更新下发后果

static void updateDeduplicateResult(
            boolean generateUpdateBefore,
            boolean generateInsert,
            RowData preRow,
            RowData currentRow,
            Collector<RowData> out) {if (generateUpdateBefore || generateInsert) {if (preRow == null) {
      // the first row, send INSERT message
      currentRow.setRowKind(RowKind.INSERT);
      out.collect(currentRow);
    } else {if (generateUpdateBefore) {final RowKind preRowKind = preRow.getRowKind();
        preRow.setRowKind(RowKind.UPDATE_BEFORE);
        out.collect(preRow);
        preRow.setRowKind(preRowKind);
      }
      currentRow.setRowKind(RowKind.UPDATE_AFTER);
      out.collect(currentRow);
    }
  } else {currentRow.setRowKind(RowKind.UPDATE_AFTER);
    out.collect(currentRow);
  }
}

这段逻辑与 Process Time&Last Row 的逻辑十分类似,可间接参考上述的代码解说。

正文完
 0