更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
Source Connector
本文将次要介绍负责数据读取的组件SourceReader:
SourceReader
每个SourceReader都在独立的线程中执行,只有咱们保障SourceSplitCoordinator调配给不同SourceReader的切片没有交加,在SourceReader的执行周期中,咱们就能够不思考任何无关并发的细节。
SourceReader接口
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable { void start(); void pollNext(SourcePipeline<T> pipeline) throws Exception; void addSplits(List<SplitT> splits); /*** Check source reader has more elements or not.*/boolean hasMoreElements(); /*** There will no more split will send to this source reader.* Source reader could be exited after process all assigned split.*/default void notifyNoMoreSplits() { } /*** Process all events which from { @link SourceSplitCoordinator}.*/default void handleSourceEvent(SourceEvent sourceEvent) { } /*** Store the split to the external system to recover when task failed.*/List<SplitT> snapshotState(long checkpointId); /*** When all tasks finished snapshot, notify checkpoint complete will be invoked.*/default void notifyCheckpointComplete(long checkpointId) throws Exception { } interface Context { TypeInfo<?>[] getTypeInfos(); String[] getFieldNames(); int getIndexOfSubtask(); void sendSplitRequest(); }}
构造方法
这里须要实现和数据源拜访各种配置的提取,比方数据库库名表名、音讯队列cluster和topic、身份认证的配置等等。
示例
public RocketMQSourceReader(BitSailConfiguration readerConfiguration, Context context, Boundedness boundedness) { this.readerConfiguration = readerConfiguration; this.boundedness = boundedness; this.context = context; this.assignedRocketMQSplits = Sets.newHashSet(); this.finishedRocketMQSplits = Sets.newHashSet(); this.deserializationSchema = new RocketMQDeserializationSchema( readerConfiguration, context.getTypeInfos(), context.getFieldNames()); this.noMoreSplits = false; cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER); topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC); consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP); consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG); pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE); pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT); commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT); accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY); secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);}
start办法
初始化数据源的拜访对象,例如数据库的执行对象、音讯队列的consumer对象或者文件系统的连贯。
示例
音讯队列
public void start() { try { if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) { AclClientRPCHook aclClientRPCHook = new AclClientRPCHook( new SessionCredentials(accessKey, secretKey)); consumer = new DefaultMQPullConsumer(aclClientRPCHook); } else { consumer = new DefaultMQPullConsumer(); } consumer.setConsumerGroup(consumerGroup); consumer.setNamesrvAddr(cluster); consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE, cluster, topic, consumerGroup, UUID.randomUUID())); consumer.setConsumerPullTimeoutMillis(pollTimeout); consumer.start(); } catch (Exception e) { throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e); }}
数据库
public void start() { this.connection = connectionHolder.connect(); // Construct statement. String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos); String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true); try { this.statement = connection.prepareStatement(querySql); } catch (SQLException e) { throw new RuntimeException("Failed to prepare statement.", e); } LOG.info("Task {} started.", subTaskId);}
FTP
public void start() { this.ftpHandler.loginFtpServer(); if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) { this.skipFirstLine = true; }}
addSplits办法
将SourceSplitCoordinator给以后Reader调配的Splits列表增加到本人的解决队列(Queue)或者汇合(Set)中。
示例
public void addSplits(List<RocketMQSplit> splits) { LOG.info("Subtask {} received {}(s) new splits, splits = {}.", context.getIndexOfSubtask(), CollectionUtils.size(splits), splits); assignedRocketMQSplits.addAll(splits);}
hasMoreElements办法
在无界的流计算场景中,会始终返回true保障Reader线程不被销毁。
在批式场景中,调配给该Reader的切片解决完之后会返回false,示意该Reader生命周期的完结。
public boolean hasMoreElements() { if (boundedness == Boundedness.UNBOUNDEDNESS) { return true; } if (noMoreSplits) { return CollectionUtils.size(assignedRocketMQSplits) != 0; } return true;}
pollNext办法
在addSplits办法增加实现切片解决队列且hasMoreElements返回true时,该办法调用,开发者实现此办法真正和数据交互。
开发者在实现pollNext办法时候须要关注下列问题:
切片数据的读取
- 从结构好的切片中去读取数据。
数据类型的转换
- 将内部数据转换成BitSail的Row类型
示例
以RocketMQSourceReader为例:
从split队列中选取split进行解决,读取其信息,之后须要将读取到的信息转换成BitSail的Row类型,发送给上游解决。
public void pollNext(SourcePipeline<Row> pipeline) throws Exception { for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) { MessageQueue messageQueue = rocketmqSplit.getMessageQueue(); PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(), consumerTag, rocketmqSplit.getStartOffset(), pollBatchSize, pollTimeout); if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) { continue; } for (MessageExt message : pullResult.getMsgFoundList()) { Row deserialize = deserializationSchema.deserialize(message.getBody()); pipeline.output(deserialize); if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) { LOG.info("Subtask {} rocketmq split {} in end of stream.", context.getIndexOfSubtask(), rocketmqSplit); finishedRocketMQSplits.add(rocketmqSplit); break; } } rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset()); if (!commitInCheckpoint) { consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset()); } } assignedRocketMQSplits.removeAll(finishedRocketMQSplits);}
转换为BitSail Row类型的罕用形式
自定义RowDeserializer类
对于不同格局的列利用不同converter,设置到相应Row的Field。
public class ClickhouseRowDeserializer { interface FiledConverter { Object apply(ResultSet resultSet) throws SQLException; } private final List<FiledConverter> converters; private final int fieldSize; public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) { this.fieldSize = typeInfos.length; this.converters = new ArrayList<>(); for (int i = 0; i < fieldSize; ++i) { converters.add(initFieldConverter(i + 1, typeInfos[i])); } } public Row convert(ResultSet resultSet) { Row row = new Row(fieldSize); try { for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).apply(resultSet)); } } catch (SQLException e) { throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause()); } return row; } private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) { if (!(typeInfo instanceof BasicTypeInfo)) { throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet."); } Class<?> curClass = typeInfo.getTypeClass(); if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getByte(index); } if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getShort(index); } if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getInt(index); } if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getLong(index); } if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> { BigDecimal dec = resultSet.getBigDecimal(index); return dec == null ? null : dec.toBigInteger(); }; } if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getFloat(index); } if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDouble(index); } if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBigDecimal(index); } if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getString(index); } if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getDate(index); } if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTimestamp(index); } if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getTime(index); } if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> resultSet.getBoolean(index); } if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) { return resultSet -> null; } throw new UnsupportedOperationException("Unsupported data type: " + typeInfo); }}
实现DeserializationSchema接口
绝对于实现RowDeserializer,咱们更心愿大家去实现一个继承DeserializationSchema接口的实现类,将肯定类型格局的数据对数据比方JSON、CSV转换为BitSail Row类型。
在具体的利用时,咱们能够应用对立的接口创立相应的实现类
public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> { private BitSailConfiguration deserializationConfiguration; private TypeInfo<?>[] typeInfos; private String[] fieldNames; private transient DeserializationSchema<byte[], Row> deserializationSchema; public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) { this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; ContentType contentType = ContentType.valueOf( deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase()); switch (contentType) { case CSV: this.deserializationSchema = new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; case JSON: this.deserializationSchema = new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames); break; default: throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType); } } @Override public Row deserialize(Writable message) { return deserializationSchema.deserialize((message.toString()).getBytes()); } @Override public boolean isEndOfStream(Row nextElement) { return false; }}
也能够自定义以后须要解析类专用的DeserializationSchema:
public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> { private final BitSailConfiguration deserializationConfiguration; private final transient DateTimeFormatter localDateTimeFormatter; private final transient DateTimeFormatter localDateFormatter; private final transient DateTimeFormatter localTimeFormatter; private final int fieldSize; private final TypeInfo<?>[] typeInfos; private final String[] fieldNames; private final List<DeserializationConverter> converters; public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration, TypeInfo<?>[] typeInfos, String[] fieldNames) { this.deserializationConfiguration = deserializationConfiguration; this.typeInfos = typeInfos; this.fieldNames = fieldNames; this.localDateTimeFormatter = DateTimeFormatter.ofPattern( deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN)); this.localDateFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN)); this.localTimeFormatter = DateTimeFormatter .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN)); this.fieldSize = typeInfos.length; this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList()); } @Override public Row deserialize(Writable message) { int arity = fieldNames.length; Row row = new Row(arity); Writable[] writables = ((ArrayWritable) message).get(); for (int i = 0; i < fieldSize; ++i) { row.setField(i, converters.get(i).convert(writables[i].toString())); } return row; } @Override public boolean isEndOfStream(Row nextElement) { return false; } private interface DeserializationConverter extends Serializable { Object convert(String input); } private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) { Class<?> typeClass = typeInfo.getTypeClass(); if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) { return field -> null; } if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) { return this::convertToBoolean; } if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) { return this::convertToInt; } throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED, String.format("Csv format converter not support type info: %s.", typeInfo)); } private boolean convertToBoolean(String field) { return Boolean.parseBoolean(field.trim()); } private int convertToInt(String field) { return Integer.parseInt(field.trim()); }}
snapshotState办法
生成并保留State的快照信息,用于ckeckpoint。
示例
public List<RocketMQSplit> snapshotState(long checkpointId) { LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId); if (commitInCheckpoint) { for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) { try { consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset()); LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(), rocketMQSplit.getMessageQueue(), checkpointId); } catch (MQClientException e) { throw new RuntimeException(e); } } } return Lists.newArrayList(assignedRocketMQSplits);}
hasMoreElements办法
每次调用pollNext办法之前会做sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext办法才会被调用。
示例
public boolean hasMoreElements() { if (noMoreSplits) { return CollectionUtils.size(assignedHadoopSplits) != 0; } return true;}
notifyNoMoreSplits办法
当Reader解决完所有切片之后,会调用此办法。
示例
public void notifyNoMoreSplits() { LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask()); noMoreSplits = true;}
【对于BitSail】:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和倡议:https://github.com/bytedance/bitsail/issues
奉献代码:https://github.com/bytedance/bitsail/pulls
BitSail官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:mailto:bitsail+subscribe@googlegroups.com
退出BitSail技术社群