更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
Source Connector
本文将次要介绍创立、治理 Split 的角色 SplitCoordinator。SourceSplitCoordinator
大数据处理框架的外围目标就是将大规模的数据拆分成为多个正当的 Split,SplitCoordinator 承当这个创立、治理 Split 的角色。
SourceSplitCoordinator 接口
public interface SourceSplitCoordinator<SplitT extends SourceSplit, StateT> extends Serializable, AutoCloseable {void start();
void addReader(int subtaskId);
void addSplitsBack(List<SplitT> splits, int subtaskId);
void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);
default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { }
StateT snapshotState() throws Exception;
default void notifyCheckpointComplete(long checkpointId) throws Exception { }
void close();
interface Context<SplitT extends SourceSplit, StateT> {boolean isRestored();
/**
* Return the state to the split coordinator, for the exactly-once.
*/
StateT getRestoreState();
/**
* Return total parallelism of the source reader.
*/
int totalParallelism();
/**
* When Source reader started, it will be registered itself to coordinator.
*/
Set<Integer> registeredReaders();
/**
* Assign splits to reader.
*/
void assignSplit(int subtaskId, List<SplitT> splits);
/**
* Mainly use in boundedness situation, represents there will no more split will send to source reader.
*/
void signalNoMoreSplits(int subtask);
/**
* If split coordinator have any event want to send source reader, use this method.
* Like send Pause event to Source Reader in CDC2.0.
*/
void sendEventToSourceReader(int subtaskId, SourceEvent event);
/**
* Schedule to run the callable and handler, often used in un-boundedness mode.
*/
<T> void runAsync(Callable<T> callable,
BiConsumer<T, Throwable> handler,
int initialDelay,
long interval);
/**
* Just run callable and handler once, often used in boundedness mode.
*/
<T> void runAsyncOnce(Callable<T> callable,
BiConsumer<T, Throwable> handler);
}
}
构造方法
开发者在构造方法中个别次要进行一些配置的设置和分片信息存储的容器的创立。
以 ClickhouseSourceSplitCoordinator 的结构为例:
public ClickhouseSourceSplitCoordinator(SourceSplitCoordinator.Context<ClickhouseSourceSplit, EmptyState> context,
BitSailConfiguration jobConf) {
this.context = context;
this.jobConf = jobConf;
this.splitAssignmentPlan = Maps.newConcurrentMap();}
在自定义了 State 的场景中,须要对 checkpoint 时存储在 SourceSplitCoordinator.Context 的状态进行保留和复原。
以 RocketMQSourceSplitCoordinator 为例:
public RocketMQSourceSplitCoordinator(
SourceSplitCoordinator.Context<RocketMQSplit, RocketMQState> context,
BitSailConfiguration jobConfiguration,
Boundedness boundedness) {
this.context = context;
this.jobConfiguration = jobConfiguration;
this.boundedness = boundedness;
this.discoveryInternal = jobConfiguration.get(RocketMQSourceOptions.DISCOVERY_INTERNAL);
this.pendingRocketMQSplitAssignment = Maps.newConcurrentMap();
this.discoveredPartitions = new HashSet<>();
if (context.isRestored()) {RocketMQState restoreState = context.getRestoreState();
assignedPartitions = restoreState.getAssignedWithSplits();
discoveredPartitions.addAll(assignedPartitions.keySet());
} else {assignedPartitions = Maps.newHashMap();
}
prepareConsumerProperties();}
start 办法
进行一些数据源所需分片元数据的提取工作,如果有形象进去的 Split Assigner 类,个别在这里进行初始化。如果应用的是封装的 Split Assign 函数,这里会进行待调配切片的初始化工作。
流批一体场景
以 RocketMQSourceSplitCoordinator 为例:
private void prepareRocketMQConsumer() {
try {
consumer = RocketMQUtils.prepareRocketMQConsumer(jobConfiguration,
String.format(COORDINATOR_INSTANCE_NAME_TEMPLATE,
cluster, topic, consumerGroup, UUID.randomUUID()));
consumer.start();} catch (Exception e) {throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);
}
}
@Override
public void start() {prepareRocketMQConsumer();
splitAssigner = new FairRocketMQSplitAssigner(jobConfiguration, assignedPartitions);
if (discoveryInternal > 0) {
context.runAsync(
this::fetchMessageQueues,
this::handleMessageQueueChanged,
0,
discoveryInternal
);
} else {
context.runAsyncOnce(
this::fetchMessageQueues,
this::handleMessageQueueChanged
);
}
}
批式场景
以 ClickhouseSourceSplitCoordinator 为例:
public void start() {
List<ClickhouseSourceSplit> splitList;
try {SimpleDivideSplitConstructor constructor = new SimpleDivideSplitConstructor(jobConf);
splitList = constructor.construct();} catch (IOException e) {ClickhouseSourceSplit split = new ClickhouseSourceSplit(0);
split.setReadTable(true);
splitList = Collections.singletonList(split);
LOG.error("Failed to construct splits, will directly read the table.", e);
}
int readerNum = context.totalParallelism();
LOG.info("Found {} readers and {} splits.", readerNum, splitList.size());
if (readerNum > splitList.size()) {LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size());
}
for (ClickhouseSourceSplit split : splitList) {int readerIndex = ReaderSelector.getReaderIndex(readerNum);
splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex);
}
}
Assigner
将划分好的切片调配给 Reader,开发过程中,咱们通常让 SourceSplitCoordinator 专一于解决和 Reader 的通信工作,理论 split 的散发逻辑个别封装在 Assigner 进行,这个 Assigner 能够是一个封装的 Split Assign 函数,也能够是一个形象进去的 Split Assigner 类。
Assign 函数示例
以 ClickhouseSourceSplitCoordinator 为例:
tryAssignSplitsToReader 函数将存储在 splitAssignmentPlan 中的划分好的切片调配给相应的 Reader。
private void tryAssignSplitsToReader() {Map<Integer, List<ClickhouseSourceSplit>> splitsToAssign = new HashMap<>();
for (Integer readerIndex : splitAssignmentPlan.keySet()) {if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) {splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex)));
}
}
for (Integer readerIndex : splitsToAssign.keySet()) {LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex,
splitsToAssign.get(readerIndex).stream().map(ClickhouseSourceSplit::uniqSplitId).collect(Collectors.toList()));
splitAssignmentPlan.remove(readerIndex);
context.assignSplit(readerIndex, splitsToAssign.get(readerIndex));
context.signalNoMoreSplits(readerIndex);
LOG.info("Finish assigning splits reader {}", readerIndex);
}
}
Assigner 办法示例
以 RocketMQSourceSplitCoordinator 为例:
public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> {
private BitSailConfiguration readerConfiguration;
private AtomicInteger atomicInteger;
public Map<MessageQueue, String> rocketMQSplitIncrementMapping;
public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration,
Map<MessageQueue, String> rocketMQSplitIncrementMapping) {
this.readerConfiguration = readerConfiguration;
this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping;
this.atomicInteger = new AtomicInteger(CollectionUtils
.size(rocketMQSplitIncrementMapping.keySet()));
}
@Override
public String assignSplitId(MessageQueue messageQueue) {if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) {rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement()));
}
return rocketMQSplitIncrementMapping.get(messageQueue);
}
@Override
public int assignToReader(String splitId, int totalParallelism) {return splitId.hashCode() % totalParallelism;
}
}
addReader 办法
调用 Assigner,为 Reader 增加切片。
批式场景示例
以 ClickhouseSourceSplitCoordinator 为例:
public void addReader(int subtaskId) {LOG.info("Found reader {}", subtaskId);
tryAssignSplitsToReader();}
流批一体场景示例
以 RocketMQSourceSplitCoordinator 为例:
private void notifyReaderAssignmentResult() {Map<Integer, List<RocketMQSplit>> tmpRocketMQSplitAssignments = new HashMap<>();
for (Integer pendingAssignmentReader : pendingRocketMQSplitAssignment.keySet()) {if (CollectionUtils.isNotEmpty(pendingRocketMQSplitAssignment.get(pendingAssignmentReader))
&& context.registeredReaders().contains(pendingAssignmentReader)) {tmpRocketMQSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingRocketMQSplitAssignment.get(pendingAssignmentReader)));
}
}
for (Integer pendingAssignmentReader : tmpRocketMQSplitAssignments.keySet()) {LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader,
tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
context.assignSplit(pendingAssignmentReader,
tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
Set<RocketMQSplit> removes = pendingRocketMQSplitAssignment.remove(pendingAssignmentReader);
removes.forEach(removeSplit -> {assignedPartitions.put(removeSplit.getMessageQueue(), removeSplit.getSplitId());
});
LOG.info("Assigned splits to reader {}", pendingAssignmentReader);
if (Boundedness.BOUNDEDNESS == boundedness) {LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader);
context.signalNoMoreSplits(pendingAssignmentReader);
}
}
}
@Override
public void addReader(int subtaskId) {
LOG.info("Adding reader {} to RocketMQ Split Coordinator for consumer group {}.",
subtaskId,
consumerGroup);
notifyReaderAssignmentResult();}
addSplitsBack 办法
对于一些 Reader 没有解决完的切片,进行重新分配,重新分配的策略能够本人定义,罕用的策略是哈希取模,对于返回的 Split 列表中的所有 Split 进行重新分配后再 Assign 给不同的 Reader。
批式场景示例
以 ClickhouseSourceSplitCoordinator 为例:
ReaderSelector 应用哈希取模的策略对 Split 列表进行重调配。
tryAssignSplitsToReader 办法将重调配后的 Split 汇合通过 Assigner 调配给 Reader。
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {LOG.info("Source reader {} return splits {}.", subtaskId, splits);
int readerNum = context.totalParallelism();
for (ClickhouseSourceSplit split : splits) {int readerIndex = ReaderSelector.getReaderIndex(readerNum);
splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);
}
tryAssignSplitsToReader();}
流批一体场景示例
以 RocketMQSourceSplitCoordinator 为例:
addSplitChangeToPendingAssignment 应用哈希取模的策略对 Split 列表进行重调配。
notifyReaderAssignmentResult 将重调配后的 Split 汇合通过 Assigner 调配给 Reader。
private synchronized void addSplitChangeToPendingAssignment(Set<RocketMQSplit> newRocketMQSplits) {int numReader = context.totalParallelism();
for (RocketMQSplit split : newRocketMQSplits) {int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader);
pendingRocketMQSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>())
.add(split);
}
LOG.debug("RocketMQ splits {} finished assignment.", newRocketMQSplits);
}
@Override
public void addSplitsBack(List<RocketMQSplit> splits, int subtaskId) {LOG.info("Source reader {} return splits {}.", subtaskId, splits);
addSplitChangeToPendingAssignment(new HashSet<>(splits));
notifyReaderAssignmentResult();}
snapshotState 办法
存储解决切片的快照信息,用于复原时在构造方法中应用。
public RocketMQState snapshotState() throws Exception {return new RocketMQState(assignedPartitions);
}
close 办法
敞开在分片过程中与数据源交互读取元数据信息的所有未敞开连接器。
public void close() {if (consumer != null) {consumer.shutdown();
}
}
About BitSail:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和倡议:https://github.com/bytedance/bitsail/issues
奉献代码:https://github.com/bytedance/bitsail/pulls
BitSail 官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:bitsail+subscribe@googlegroups.com
退出 BitSail 技术社群