更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

Source Connector


本文将次要介绍负责数据读取的组件SourceReader:

SourceReader

每个SourceReader都在独立的线程中执行,只有咱们保障SourceSplitCoordinator调配给不同SourceReader的切片没有交加,在SourceReader的执行周期中,咱们就能够不思考任何无关并发的细节。

SourceReader接口

public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {  void start();  void pollNext(SourcePipeline<T> pipeline) throws Exception;  void addSplits(List<SplitT> splits);  /*** Check source reader has more elements or not.*/boolean hasMoreElements();  /*** There will no more split will send to this source reader.* Source reader could be exited after process all assigned split.*/default void notifyNoMoreSplits() {  }  /*** Process all events which from {  @link  SourceSplitCoordinator}.*/default void handleSourceEvent(SourceEvent sourceEvent) {  }  /*** Store the split to the external system to recover when task failed.*/List<SplitT> snapshotState(long checkpointId);  /*** When all tasks finished snapshot, notify checkpoint complete will be invoked.*/default void notifyCheckpointComplete(long checkpointId) throws Exception {  }  interface Context {    TypeInfo<?>[] getTypeInfos();    String[] getFieldNames();    int getIndexOfSubtask();    void sendSplitRequest();  }}

构造方法

这里须要实现和数据源拜访各种配置的提取,比方数据库库名表名、音讯队列cluster和topic、身份认证的配置等等。

示例
public RocketMQSourceReader(BitSailConfiguration readerConfiguration,                            Context context,                            Boundedness boundedness) {  this.readerConfiguration = readerConfiguration;  this.boundedness = boundedness;  this.context = context;  this.assignedRocketMQSplits = Sets.newHashSet();  this.finishedRocketMQSplits = Sets.newHashSet();  this.deserializationSchema = new RocketMQDeserializationSchema(      readerConfiguration,      context.getTypeInfos(),      context.getFieldNames());  this.noMoreSplits = false;  cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER);  topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC);  consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);  consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG);  pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE);  pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT);  commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT);  accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY);  secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);}

start办法

初始化数据源的拜访对象,例如数据库的执行对象、音讯队列的consumer对象或者文件系统的连贯。

示例

音讯队列

public void start() {  try {    if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) {      AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(          new SessionCredentials(accessKey, secretKey));      consumer = new DefaultMQPullConsumer(aclClientRPCHook);    } else {      consumer = new DefaultMQPullConsumer();    }    consumer.setConsumerGroup(consumerGroup);    consumer.setNamesrvAddr(cluster);    consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE,        cluster, topic, consumerGroup, UUID.randomUUID()));    consumer.setConsumerPullTimeoutMillis(pollTimeout);    consumer.start();  } catch (Exception e) {    throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);  }}

数据库

public void start() {  this.connection = connectionHolder.connect();  // Construct statement.  String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos);  String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true);  try {    this.statement = connection.prepareStatement(querySql);  } catch (SQLException e) {    throw new RuntimeException("Failed to prepare statement.", e);  }  LOG.info("Task {} started.", subTaskId);}

FTP

public void start() {  this.ftpHandler.loginFtpServer();  if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) {    this.skipFirstLine = true;  }}

addSplits办法

将SourceSplitCoordinator给以后Reader调配的Splits列表增加到本人的解决队列(Queue)或者汇合(Set)中。

示例
public void addSplits(List<RocketMQSplit> splits) {  LOG.info("Subtask {} received {}(s) new splits, splits = {}.",      context.getIndexOfSubtask(),      CollectionUtils.size(splits),      splits);  assignedRocketMQSplits.addAll(splits);}

hasMoreElements办法

在无界的流计算场景中,会始终返回true保障Reader线程不被销毁。

在批式场景中,调配给该Reader的切片解决完之后会返回false,示意该Reader生命周期的完结。

public boolean hasMoreElements() {  if (boundedness == Boundedness.UNBOUNDEDNESS) {    return true;  }  if (noMoreSplits) {    return CollectionUtils.size(assignedRocketMQSplits) != 0;  }  return true;}

pollNext办法

在addSplits办法增加实现切片解决队列且hasMoreElements返回true时,该办法调用,开发者实现此办法真正和数据交互。

开发者在实现pollNext办法时候须要关注下列问题:

  • 切片数据的读取

    • 从结构好的切片中去读取数据。
  • 数据类型的转换

    • 将内部数据转换成BitSail的Row类型
示例

以RocketMQSourceReader为例:

从split队列中选取split进行解决,读取其信息,之后须要将读取到的信息转换成BitSail的Row类型,发送给上游解决。

public void pollNext(SourcePipeline<Row> pipeline) throws Exception {  for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) {    MessageQueue messageQueue = rocketmqSplit.getMessageQueue();    PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(),        consumerTag,        rocketmqSplit.getStartOffset(),        pollBatchSize,        pollTimeout);    if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) {      continue;    }    for (MessageExt message : pullResult.getMsgFoundList()) {      Row deserialize = deserializationSchema.deserialize(message.getBody());      pipeline.output(deserialize);      if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) {        LOG.info("Subtask {} rocketmq split {} in end of stream.",            context.getIndexOfSubtask(),            rocketmqSplit);        finishedRocketMQSplits.add(rocketmqSplit);        break;      }    }    rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset());    if (!commitInCheckpoint) {      consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset());    }  }  assignedRocketMQSplits.removeAll(finishedRocketMQSplits);}
转换为BitSail Row类型的罕用形式
自定义RowDeserializer类

对于不同格局的列利用不同converter,设置到相应Row的Field。

public class ClickhouseRowDeserializer {  interface FiledConverter {    Object apply(ResultSet resultSet) throws SQLException;  }  private final List<FiledConverter> converters;  private final int fieldSize;  public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) {    this.fieldSize = typeInfos.length;    this.converters = new ArrayList<>();    for (int i = 0; i < fieldSize; ++i) {      converters.add(initFieldConverter(i + 1, typeInfos[i]));    }  }  public Row convert(ResultSet resultSet) {    Row row = new Row(fieldSize);    try {      for (int i = 0; i < fieldSize; ++i) {        row.setField(i, converters.get(i).apply(resultSet));      }    } catch (SQLException e) {      throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause());    }    return row;  }  private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) {    if (!(typeInfo instanceof BasicTypeInfo)) {      throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + " is not supported yet.");    }    Class<?> curClass = typeInfo.getTypeClass();    if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getByte(index);    }    if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getShort(index);    }    if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getInt(index);    }    if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getLong(index);    }    if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> {        BigDecimal dec = resultSet.getBigDecimal(index);        return dec == null ? null : dec.toBigInteger();      };    }    if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getFloat(index);    }    if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getDouble(index);    }    if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getBigDecimal(index);    }    if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getString(index);    }    if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getDate(index);    }    if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getTimestamp(index);    }    if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getTime(index);    }    if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> resultSet.getBoolean(index);    }    if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) {      return resultSet -> null;    }    throw new UnsupportedOperationException("Unsupported data type: " + typeInfo);  }}
实现DeserializationSchema接口

绝对于实现RowDeserializer,咱们更心愿大家去实现一个继承DeserializationSchema接口的实现类,将肯定类型格局的数据对数据比方JSON、CSV转换为BitSail Row类型。

在具体的利用时,咱们能够应用对立的接口创立相应的实现类

public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {  private BitSailConfiguration deserializationConfiguration;  private TypeInfo<?>[] typeInfos;  private String[] fieldNames;  private transient DeserializationSchema<byte[], Row> deserializationSchema;  public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,                                               TypeInfo<?>[] typeInfos,                                               String[] fieldNames) {    this.deserializationConfiguration = deserializationConfiguration;    this.typeInfos = typeInfos;    this.fieldNames = fieldNames;    ContentType contentType = ContentType.valueOf(        deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());    switch (contentType) {      case CSV:        this.deserializationSchema =            new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);        break;      case JSON:        this.deserializationSchema =            new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);        break;      default:        throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type: " + contentType);    }  }  @Override  public Row deserialize(Writable message) {    return deserializationSchema.deserialize((message.toString()).getBytes());  }  @Override  public boolean isEndOfStream(Row nextElement) {    return false;  }}

也能够自定义以后须要解析类专用的DeserializationSchema:

public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {  private final BitSailConfiguration deserializationConfiguration;  private final transient DateTimeFormatter localDateTimeFormatter;  private final transient DateTimeFormatter localDateFormatter;  private final transient DateTimeFormatter localTimeFormatter;  private final int fieldSize;  private final TypeInfo<?>[] typeInfos;  private final String[] fieldNames;  private final List<DeserializationConverter> converters;  public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,                                  TypeInfo<?>[] typeInfos,                                  String[] fieldNames) {    this.deserializationConfiguration = deserializationConfiguration;    this.typeInfos = typeInfos;    this.fieldNames = fieldNames;    this.localDateTimeFormatter = DateTimeFormatter.ofPattern(        deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN));    this.localDateFormatter = DateTimeFormatter        .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN));    this.localTimeFormatter = DateTimeFormatter        .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN));    this.fieldSize = typeInfos.length;    this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList());  }  @Override  public Row deserialize(Writable message) {    int arity = fieldNames.length;    Row row = new Row(arity);    Writable[] writables = ((ArrayWritable) message).get();    for (int i = 0; i < fieldSize; ++i) {      row.setField(i, converters.get(i).convert(writables[i].toString()));    }    return row;  }  @Override  public boolean isEndOfStream(Row nextElement) {    return false;  }  private interface DeserializationConverter extends Serializable {    Object convert(String input);  }  private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) {    Class<?> typeClass = typeInfo.getTypeClass();    if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) {      return field -> null;    }    if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) {      return this::convertToBoolean;    }    if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) {      return this::convertToInt;    }    throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED,        String.format("Csv format converter not support type info: %s.", typeInfo));  }  private boolean convertToBoolean(String field) {    return Boolean.parseBoolean(field.trim());  }  private int convertToInt(String field) {    return Integer.parseInt(field.trim());  }}

snapshotState办法

生成并保留State的快照信息,用于ckeckpoint。

示例
public List<RocketMQSplit> snapshotState(long checkpointId) {  LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId);  if (commitInCheckpoint) {    for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) {      try {        consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset());        LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(),            rocketMQSplit.getMessageQueue(),            checkpointId);      } catch (MQClientException e) {        throw new RuntimeException(e);      }    }  }  return Lists.newArrayList(assignedRocketMQSplits);}

hasMoreElements办法

每次调用pollNext办法之前会做sourceReader.hasMoreElements()的判断,当且仅当判断通过,pollNext办法才会被调用。

示例
public boolean hasMoreElements() {  if (noMoreSplits) {    return CollectionUtils.size(assignedHadoopSplits) != 0;  }  return true;}

notifyNoMoreSplits办法

当Reader解决完所有切片之后,会调用此办法。

示例
public void notifyNoMoreSplits() {  LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask());  noMoreSplits = true;}

【对于BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和倡议:https://github.com/bytedance/bitsail/issues

奉献代码:https://github.com/bytedance/bitsail/pulls

BitSail官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:mailto:bitsail+subscribe@googlegroups.com

退出BitSail技术社群