共计 12978 个字符,预计需要花费 33 分钟才能阅读完成。
更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
Source Connector
本文将次要介绍负责数据读取的组件 SourceReader:
SourceReader
每个 SourceReader 都在独立的线程中执行,只有咱们保障 SourceSplitCoordinator 调配给不同 SourceReader 的切片没有交加,在 SourceReader 的执行周期中,咱们就能够不思考任何无关并发的细节。
SourceReader 接口
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {void start();
void pollNext(SourcePipeline<T> pipeline) throws Exception;
void addSplits(List<SplitT> splits);
/**
* Check source reader has more elements or not.
*/
boolean hasMoreElements();
/**
* There will no more split will send to this source reader.
* Source reader could be exited after process all assigned split.
*/
default void notifyNoMoreSplits() {}
/**
* Process all events which from {@link SourceSplitCoordinator}.
*/
default void handleSourceEvent(SourceEvent sourceEvent) { }
/**
* Store the split to the external system to recover when task failed.
*/
List<SplitT> snapshotState(long checkpointId);
/**
* When all tasks finished snapshot, notify checkpoint complete will be invoked.
*/
default void notifyCheckpointComplete(long checkpointId) throws Exception { }
interface Context {TypeInfo<?>[] getTypeInfos();
String[] getFieldNames();
int getIndexOfSubtask();
void sendSplitRequest();}
}
构造方法
这里须要实现和数据源拜访各种配置的提取,比方数据库库名表名、音讯队列 cluster 和 topic、身份认证的配置等等。
示例
public RocketMQSourceReader(BitSailConfiguration readerConfiguration,
Context context,
Boundedness boundedness) {
this.readerConfiguration = readerConfiguration;
this.boundedness = boundedness;
this.context = context;
this.assignedRocketMQSplits = Sets.newHashSet();
this.finishedRocketMQSplits = Sets.newHashSet();
this.deserializationSchema = new RocketMQDeserializationSchema(
readerConfiguration,
context.getTypeInfos(),
context.getFieldNames());
this.noMoreSplits = false;
cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER);
topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC);
consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);
consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG);
pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE);
pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT);
commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT);
accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY);
secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);
}
start 办法
初始化数据源的拜访对象,例如数据库的执行对象、音讯队列的 consumer 对象或者文件系统的连贯。
示例
音讯队列
public void start() {
try {if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) {
AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
consumer = new DefaultMQPullConsumer(aclClientRPCHook);
} else {consumer = new DefaultMQPullConsumer();
}
consumer.setConsumerGroup(consumerGroup);
consumer.setNamesrvAddr(cluster);
consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE,
cluster, topic, consumerGroup, UUID.randomUUID()));
consumer.setConsumerPullTimeoutMillis(pollTimeout);
consumer.start();} catch (Exception e) {throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);
}
}
数据库
public void start() {this.connection = connectionHolder.connect();
// Construct statement.
String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos);
String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true);
try {this.statement = connection.prepareStatement(querySql);
} catch (SQLException e) {throw new RuntimeException("Failed to prepare statement.", e);
}
LOG.info("Task {} started.", subTaskId);
}
FTP
public void start() {this.ftpHandler.loginFtpServer();
if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) {this.skipFirstLine = true;}
}
addSplits 办法
将 SourceSplitCoordinator 给以后 Reader 调配的 Splits 列表增加到本人的解决队列(Queue)或者汇合(Set)中。
示例
public void addSplits(List<RocketMQSplit> splits) {LOG.info("Subtask {} received {}(s) new splits, splits = {}.",
context.getIndexOfSubtask(),
CollectionUtils.size(splits),
splits);
assignedRocketMQSplits.addAll(splits);
}
hasMoreElements 办法
在无界的流计算场景中,会始终返回 true 保障 Reader 线程不被销毁。
在批式场景中,调配给该 Reader 的切片解决完之后会返回 false,示意该 Reader 生命周期的完结。
public boolean hasMoreElements() {if (boundedness == Boundedness.UNBOUNDEDNESS) {return true;}
if (noMoreSplits) {return CollectionUtils.size(assignedRocketMQSplits) != 0;
}
return true;
}
pollNext 办法
在 addSplits 办法增加实现切片解决队列且 hasMoreElements 返回 true 时,该办法调用,开发者实现此办法真正和数据交互。
开发者在实现 pollNext 办法时候须要关注下列问题:
-
切片数据的读取
- 从结构好的切片中去读取数据。
-
数据类型的转换
- 将内部数据转换成 BitSail 的 Row 类型
示例
以 RocketMQSourceReader 为例:
从 split 队列中选取 split 进行解决,读取其信息,之后须要将读取到的信息转换成 BitSail 的 Row 类型,发送给上游解决。
public void pollNext(SourcePipeline<Row> pipeline) throws Exception {for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) {MessageQueue messageQueue = rocketmqSplit.getMessageQueue();
PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(),
consumerTag,
rocketmqSplit.getStartOffset(),
pollBatchSize,
pollTimeout);
if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) {continue;}
for (MessageExt message : pullResult.getMsgFoundList()) {Row deserialize = deserializationSchema.deserialize(message.getBody());
pipeline.output(deserialize);
if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) {LOG.info("Subtask {} rocketmq split {} in end of stream.",
context.getIndexOfSubtask(),
rocketmqSplit);
finishedRocketMQSplits.add(rocketmqSplit);
break;
}
}
rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset());
if (!commitInCheckpoint) {consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset());
}
}
assignedRocketMQSplits.removeAll(finishedRocketMQSplits);
}
转换为 BitSail Row 类型的罕用形式
自定义 RowDeserializer 类
对于不同格局的列利用不同 converter,设置到相应 Row 的 Field。
public class ClickhouseRowDeserializer {
interface FiledConverter {Object apply(ResultSet resultSet) throws SQLException;
}
private final List<FiledConverter> converters;
private final int fieldSize;
public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) {
this.fieldSize = typeInfos.length;
this.converters = new ArrayList<>();
for (int i = 0; i < fieldSize; ++i) {converters.add(initFieldConverter(i + 1, typeInfos[i]));
}
}
public Row convert(ResultSet resultSet) {Row row = new Row(fieldSize);
try {for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).apply(resultSet));
}
} catch (SQLException e) {throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause());
}
return row;
}
private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) {if (!(typeInfo instanceof BasicTypeInfo)) {throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + "is not supported yet.");
}
Class<?> curClass = typeInfo.getTypeClass();
if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getByte(index);
}
if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getShort(index);
}
if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getInt(index);
}
if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getLong(index);
}
if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) {
return resultSet -> {BigDecimal dec = resultSet.getBigDecimal(index);
return dec == null ? null : dec.toBigInteger();};
}
if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getFloat(index);
}
if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDouble(index);
}
if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBigDecimal(index);
}
if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getString(index);
}
if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDate(index);
}
if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTimestamp(index);
}
if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTime(index);
}
if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBoolean(index);
}
if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> null;}
throw new UnsupportedOperationException("Unsupported data type:" + typeInfo);
}
}
实现 DeserializationSchema 接口
绝对于实现 RowDeserializer,咱们更心愿大家去实现一个继承 DeserializationSchema 接口的实现类,将肯定类型格局的数据对数据比方 JSON、CSV 转换为 BitSail Row 类型。
在具体的利用时,咱们能够应用对立的接口创立相应的实现类
public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private BitSailConfiguration deserializationConfiguration;
private TypeInfo<?>[] typeInfos;
private String[] fieldNames;
private transient DeserializationSchema<byte[], Row> deserializationSchema;
public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
TypeInfo<?>[] typeInfos,
String[] fieldNames) {
this.deserializationConfiguration = deserializationConfiguration;
this.typeInfos = typeInfos;
this.fieldNames = fieldNames;
ContentType contentType = ContentType.valueOf(deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());
switch (contentType) {
case CSV:
this.deserializationSchema =
new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
break;
case JSON:
this.deserializationSchema =
new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
break;
default:
throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type:" + contentType);
}
}
@Override
public Row deserialize(Writable message) {return deserializationSchema.deserialize((message.toString()).getBytes());
}
@Override
public boolean isEndOfStream(Row nextElement) {return false;}
}
也能够自定义以后须要解析类专用的 DeserializationSchema:
public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {
private final BitSailConfiguration deserializationConfiguration;
private final transient DateTimeFormatter localDateTimeFormatter;
private final transient DateTimeFormatter localDateFormatter;
private final transient DateTimeFormatter localTimeFormatter;
private final int fieldSize;
private final TypeInfo<?>[] typeInfos;
private final String[] fieldNames;
private final List<DeserializationConverter> converters;
public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
TypeInfo<?>[] typeInfos,
String[] fieldNames) {
this.deserializationConfiguration = deserializationConfiguration;
this.typeInfos = typeInfos;
this.fieldNames = fieldNames;
this.localDateTimeFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN));
this.localDateFormatter = DateTimeFormatter
.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN));
this.localTimeFormatter = DateTimeFormatter
.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN));
this.fieldSize = typeInfos.length;
this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList());
}
@Override
public Row deserialize(Writable message) {
int arity = fieldNames.length;
Row row = new Row(arity);
Writable[] writables = ((ArrayWritable) message).get();
for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).convert(writables[i].toString()));
}
return row;
}
@Override
public boolean isEndOfStream(Row nextElement) {return false;}
private interface DeserializationConverter extends Serializable {Object convert(String input);
}
private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) {Class<?> typeClass = typeInfo.getTypeClass();
if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) {return field -> null;}
if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) {return this::convertToBoolean;}
if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) {return this::convertToInt;}
throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED,
String.format("Csv format converter not support type info: %s.", typeInfo));
}
private boolean convertToBoolean(String field) {return Boolean.parseBoolean(field.trim());
}
private int convertToInt(String field) {return Integer.parseInt(field.trim());
}
}
snapshotState 办法
生成并保留 State 的快照信息,用于 ckeckpoint。
示例
public List<RocketMQSplit> snapshotState(long checkpointId) {LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId);
if (commitInCheckpoint) {for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) {
try {consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset());
LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(),
rocketMQSplit.getMessageQueue(),
checkpointId);
} catch (MQClientException e) {throw new RuntimeException(e);
}
}
}
return Lists.newArrayList(assignedRocketMQSplits);
}
hasMoreElements 办法
每次调用 pollNext 办法之前会做 sourceReader.hasMoreElements() 的判断,当且仅当判断通过,pollNext 办法才会被调用。
示例
public boolean hasMoreElements() {if (noMoreSplits) {return CollectionUtils.size(assignedHadoopSplits) != 0;
}
return true;
}
notifyNoMoreSplits 办法
当 Reader 解决完所有切片之后,会调用此办法。
示例
public void notifyNoMoreSplits() {LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask());
noMoreSplits = true;
}
【对于 BitSail】:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和倡议:https://github.com/bytedance/bitsail/issues
奉献代码:https://github.com/bytedance/bitsail/pulls
BitSail 官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:mailto:bitsail+subscribe@googlegroups.com
退出 BitSail 技术社群