更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
Source Connector
本文将次要介绍创立、治理Split的角色SplitCoordinator。SourceSplitCoordinator
大数据处理框架的外围目标就是将大规模的数据拆分成为多个正当的Split,SplitCoordinator承当这个创立、治理Split的角色。
SourceSplitCoordinator接口
public interface SourceSplitCoordinator<SplitT extends SourceSplit, StateT> extends Serializable, AutoCloseable { void start(); void addReader(int subtaskId); void addSplitsBack(List<SplitT> splits, int subtaskId); void handleSplitRequest(int subtaskId, @Nullable String requesterHostname); default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { } StateT snapshotState() throws Exception; default void notifyCheckpointComplete(long checkpointId) throws Exception { } void close(); interface Context<SplitT extends SourceSplit, StateT> { boolean isRestored(); /** * Return the state to the split coordinator, for the exactly-once. */ StateT getRestoreState(); /** * Return total parallelism of the source reader. */ int totalParallelism(); /** * When Source reader started, it will be registered itself to coordinator. */ Set<Integer> registeredReaders(); /** * Assign splits to reader. */ void assignSplit(int subtaskId, List<SplitT> splits); /** * Mainly use in boundedness situation, represents there will no more split will send to source reader. */ void signalNoMoreSplits(int subtask); /** * If split coordinator have any event want to send source reader, use this method. * Like send Pause event to Source Reader in CDC2.0. */ void sendEventToSourceReader(int subtaskId, SourceEvent event); /** * Schedule to run the callable and handler, often used in un-boundedness mode. */ <T> void runAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, int initialDelay, long interval); /** * Just run callable and handler once, often used in boundedness mode. */ <T> void runAsyncOnce(Callable<T> callable, BiConsumer<T, Throwable> handler); }}
构造方法
开发者在构造方法中个别次要进行一些配置的设置和分片信息存储的容器的创立。
以ClickhouseSourceSplitCoordinator的结构为例:
public ClickhouseSourceSplitCoordinator(SourceSplitCoordinator.Context<ClickhouseSourceSplit, EmptyState> context, BitSailConfiguration jobConf) { this.context = context; this.jobConf = jobConf; this.splitAssignmentPlan = Maps.newConcurrentMap();}
在自定义了State的场景中,须要对checkpoint时存储在SourceSplitCoordinator.Context的状态进行保留和复原。
以RocketMQSourceSplitCoordinator为例:
public RocketMQSourceSplitCoordinator( SourceSplitCoordinator.Context<RocketMQSplit, RocketMQState> context, BitSailConfiguration jobConfiguration, Boundedness boundedness) { this.context = context; this.jobConfiguration = jobConfiguration; this.boundedness = boundedness; this.discoveryInternal = jobConfiguration.get(RocketMQSourceOptions.DISCOVERY_INTERNAL); this.pendingRocketMQSplitAssignment = Maps.newConcurrentMap(); this.discoveredPartitions = new HashSet<>(); if (context.isRestored()) { RocketMQState restoreState = context.getRestoreState(); assignedPartitions = restoreState.getAssignedWithSplits(); discoveredPartitions.addAll(assignedPartitions.keySet()); } else { assignedPartitions = Maps.newHashMap(); } prepareConsumerProperties();}
start办法
进行一些数据源所需分片元数据的提取工作,如果有形象进去的Split Assigner类,个别在这里进行初始化。如果应用的是封装的Split Assign函数,这里会进行待调配切片的初始化工作。
流批一体场景
以RocketMQSourceSplitCoordinator为例:
private void prepareRocketMQConsumer() { try { consumer = RocketMQUtils.prepareRocketMQConsumer(jobConfiguration, String.format(COORDINATOR_INSTANCE_NAME_TEMPLATE, cluster, topic, consumerGroup, UUID.randomUUID())); consumer.start(); } catch (Exception e) { throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e); }}@Overridepublic void start() { prepareRocketMQConsumer(); splitAssigner = new FairRocketMQSplitAssigner(jobConfiguration, assignedPartitions); if (discoveryInternal > 0) { context.runAsync( this::fetchMessageQueues, this::handleMessageQueueChanged, 0, discoveryInternal ); } else { context.runAsyncOnce( this::fetchMessageQueues, this::handleMessageQueueChanged ); }}
批式场景
以ClickhouseSourceSplitCoordinator为例:
public void start() { List<ClickhouseSourceSplit> splitList; try { SimpleDivideSplitConstructor constructor = new SimpleDivideSplitConstructor(jobConf); splitList = constructor.construct(); } catch (IOException e) { ClickhouseSourceSplit split = new ClickhouseSourceSplit(0); split.setReadTable(true); splitList = Collections.singletonList(split); LOG.error("Failed to construct splits, will directly read the table.", e); } int readerNum = context.totalParallelism(); LOG.info("Found {} readers and {} splits.", readerNum, splitList.size()); if (readerNum > splitList.size()) { LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size()); } for (ClickhouseSourceSplit split : splitList) { int readerIndex = ReaderSelector.getReaderIndex(readerNum); splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex); }}
Assigner
将划分好的切片调配给Reader,开发过程中,咱们通常让SourceSplitCoordinator专一于解决和Reader 的通信工作,理论split的散发逻辑个别封装在Assigner进行,这个Assigner能够是一个封装的Split Assign函数,也能够是一个形象进去的Split Assigner类。
Assign函数示例
以ClickhouseSourceSplitCoordinator为例:
tryAssignSplitsToReader函数将存储在splitAssignmentPlan中的划分好的切片调配给相应的Reader。
private void tryAssignSplitsToReader() { Map<Integer, List<ClickhouseSourceSplit>> splitsToAssign = new HashMap<>(); for (Integer readerIndex : splitAssignmentPlan.keySet()) { if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) { splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex))); } } for (Integer readerIndex : splitsToAssign.keySet()) { LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex, splitsToAssign.get(readerIndex).stream().map(ClickhouseSourceSplit::uniqSplitId).collect(Collectors.toList())); splitAssignmentPlan.remove(readerIndex); context.assignSplit(readerIndex, splitsToAssign.get(readerIndex)); context.signalNoMoreSplits(readerIndex); LOG.info("Finish assigning splits reader {}", readerIndex); }}
Assigner办法示例
以RocketMQSourceSplitCoordinator为例:
public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> { private BitSailConfiguration readerConfiguration; private AtomicInteger atomicInteger; public Map<MessageQueue, String> rocketMQSplitIncrementMapping; public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration, Map<MessageQueue, String> rocketMQSplitIncrementMapping) { this.readerConfiguration = readerConfiguration; this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping; this.atomicInteger = new AtomicInteger(CollectionUtils .size(rocketMQSplitIncrementMapping.keySet())); } @Override public String assignSplitId(MessageQueue messageQueue) { if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) { rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement())); } return rocketMQSplitIncrementMapping.get(messageQueue); } @Override public int assignToReader(String splitId, int totalParallelism) { return splitId.hashCode() % totalParallelism; }}
addReader办法
调用Assigner,为Reader增加切片。
批式场景示例
以ClickhouseSourceSplitCoordinator为例:
public void addReader(int subtaskId) { LOG.info("Found reader {}", subtaskId); tryAssignSplitsToReader();}
流批一体场景示例
以RocketMQSourceSplitCoordinator为例:
private void notifyReaderAssignmentResult() { Map<Integer, List<RocketMQSplit>> tmpRocketMQSplitAssignments = new HashMap<>(); for (Integer pendingAssignmentReader : pendingRocketMQSplitAssignment.keySet()) { if (CollectionUtils.isNotEmpty(pendingRocketMQSplitAssignment.get(pendingAssignmentReader)) && context.registeredReaders().contains(pendingAssignmentReader)) { tmpRocketMQSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingRocketMQSplitAssignment.get(pendingAssignmentReader))); } } for (Integer pendingAssignmentReader : tmpRocketMQSplitAssignments.keySet()) { LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader, tmpRocketMQSplitAssignments.get(pendingAssignmentReader)); context.assignSplit(pendingAssignmentReader, tmpRocketMQSplitAssignments.get(pendingAssignmentReader)); Set<RocketMQSplit> removes = pendingRocketMQSplitAssignment.remove(pendingAssignmentReader); removes.forEach(removeSplit -> { assignedPartitions.put(removeSplit.getMessageQueue(), removeSplit.getSplitId()); }); LOG.info("Assigned splits to reader {}", pendingAssignmentReader); if (Boundedness.BOUNDEDNESS == boundedness) { LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader); context.signalNoMoreSplits(pendingAssignmentReader); } }}@Overridepublic void addReader(int subtaskId) { LOG.info( "Adding reader {} to RocketMQ Split Coordinator for consumer group {}.", subtaskId, consumerGroup); notifyReaderAssignmentResult();}
addSplitsBack办法
对于一些Reader没有解决完的切片,进行重新分配,重新分配的策略能够本人定义,罕用的策略是哈希取模,对于返回的Split列表中的所有Split进行重新分配后再Assign给不同的Reader。
批式场景示例
以ClickhouseSourceSplitCoordinator为例:
ReaderSelector应用哈希取模的策略对Split列表进行重调配。
tryAssignSplitsToReader办法将重调配后的Split汇合通过Assigner调配给Reader。
public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) { LOG.info("Source reader {} return splits {}.", subtaskId, splits); int readerNum = context.totalParallelism(); for (ClickhouseSourceSplit split : splits) { int readerIndex = ReaderSelector.getReaderIndex(readerNum); splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split); LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex); } tryAssignSplitsToReader();}
流批一体场景示例
以RocketMQSourceSplitCoordinator为例:
addSplitChangeToPendingAssignment应用哈希取模的策略对Split列表进行重调配。
notifyReaderAssignmentResult将重调配后的Split汇合通过Assigner调配给Reader。
private synchronized void addSplitChangeToPendingAssignment(Set<RocketMQSplit> newRocketMQSplits) { int numReader = context.totalParallelism(); for (RocketMQSplit split : newRocketMQSplits) { int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader); pendingRocketMQSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>()) .add(split); } LOG.debug("RocketMQ splits {} finished assignment.", newRocketMQSplits);}@Overridepublic void addSplitsBack(List<RocketMQSplit> splits, int subtaskId) { LOG.info("Source reader {} return splits {}.", subtaskId, splits); addSplitChangeToPendingAssignment(new HashSet<>(splits)); notifyReaderAssignmentResult();}
snapshotState办法
存储解决切片的快照信息,用于复原时在构造方法中应用。
public RocketMQState snapshotState() throws Exception { return new RocketMQState(assignedPartitions);}
close办法
敞开在分片过程中与数据源交互读取元数据信息的所有未敞开连接器。
public void close() { if (consumer != null) { consumer.shutdown(); }}
About BitSail:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和倡议:https://github.com/bytedance/bitsail/issues
奉献代码:https://github.com/bytedance/bitsail/pulls
BitSail官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:bitsail+subscribe@googlegroups.com
退出BitSail技术社群