乐趣区

关于大数据:BitSail-Connector开发详解系列三SourceReader

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

Source Connector

本文将次要介绍负责数据读取的组件 SourceReader:

SourceReader

每个 SourceReader 都在独立的线程中执行,只有咱们保障 SourceSplitCoordinator 调配给不同 SourceReader 的切片没有交加,在 SourceReader 的执行周期中,咱们就能够不思考任何无关并发的细节。

SourceReader 接口

public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {void start();

  void pollNext(SourcePipeline<T> pipeline) throws Exception;

  void addSplits(List<SplitT> splits);

  /**
* Check source reader has more elements or not.
*/
boolean hasMoreElements();

  /**
* There will no more split will send to this source reader.
* Source reader could be exited after process all assigned split.
*/
default void notifyNoMoreSplits() {}

  /**
* Process all events which from {@link  SourceSplitCoordinator}.
*/
default void handleSourceEvent(SourceEvent sourceEvent) { }

  /**
* Store the split to the external system to recover when task failed.
*/
List<SplitT> snapshotState(long checkpointId);

  /**
* When all tasks finished snapshot, notify checkpoint complete will be invoked.
*/
default void notifyCheckpointComplete(long checkpointId) throws Exception { }

  interface Context {TypeInfo<?>[] getTypeInfos();

    String[] getFieldNames();

    int getIndexOfSubtask();

    void sendSplitRequest();}
}

构造方法

这里须要实现和数据源拜访各种配置的提取,比方数据库库名表名、音讯队列 cluster 和 topic、身份认证的配置等等。

示例
public RocketMQSourceReader(BitSailConfiguration readerConfiguration,
                            Context context,
                            Boundedness boundedness) {
  this.readerConfiguration = readerConfiguration;
  this.boundedness = boundedness;
  this.context = context;
  this.assignedRocketMQSplits = Sets.newHashSet();
  this.finishedRocketMQSplits = Sets.newHashSet();
  this.deserializationSchema = new RocketMQDeserializationSchema(
      readerConfiguration,
      context.getTypeInfos(),
      context.getFieldNames());
  this.noMoreSplits = false;

  cluster = readerConfiguration.get(RocketMQSourceOptions.CLUSTER);
  topic = readerConfiguration.get(RocketMQSourceOptions.TOPIC);
  consumerGroup = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_GROUP);
  consumerTag = readerConfiguration.get(RocketMQSourceOptions.CONSUMER_TAG);
  pollBatchSize = readerConfiguration.get(RocketMQSourceOptions.POLL_BATCH_SIZE);
  pollTimeout = readerConfiguration.get(RocketMQSourceOptions.POLL_TIMEOUT);
  commitInCheckpoint = readerConfiguration.get(RocketMQSourceOptions.COMMIT_IN_CHECKPOINT);
  accessKey = readerConfiguration.get(RocketMQSourceOptions.ACCESS_KEY);
  secretKey = readerConfiguration.get(RocketMQSourceOptions.SECRET_KEY);
}

start 办法

初始化数据源的拜访对象,例如数据库的执行对象、音讯队列的 consumer 对象或者文件系统的连贯。

示例

音讯队列

public void start() {
  try {if (StringUtils.isNotEmpty(accessKey) && StringUtils.isNotEmpty(secretKey)) {
      AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
      consumer = new DefaultMQPullConsumer(aclClientRPCHook);
    } else {consumer = new DefaultMQPullConsumer();
    }

    consumer.setConsumerGroup(consumerGroup);
    consumer.setNamesrvAddr(cluster);
    consumer.setInstanceName(String.format(SOURCE_READER_INSTANCE_NAME_TEMPLATE,
        cluster, topic, consumerGroup, UUID.randomUUID()));
    consumer.setConsumerPullTimeoutMillis(pollTimeout);
    consumer.start();} catch (Exception e) {throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);
  }
}

数据库

public void start() {this.connection = connectionHolder.connect();

  // Construct statement.
  String baseSql = ClickhouseJdbcUtils.getQuerySql(dbName, tableName, columnInfos);
  String querySql = ClickhouseJdbcUtils.decorateSql(baseSql, splitField, filterSql, maxFetchCount, true);
  try {this.statement = connection.prepareStatement(querySql);
  } catch (SQLException e) {throw new RuntimeException("Failed to prepare statement.", e);
  }

  LOG.info("Task {} started.", subTaskId);
}

FTP

public void start() {this.ftpHandler.loginFtpServer();
  if (this.ftpHandler.getFtpConfig().getSkipFirstLine()) {this.skipFirstLine = true;}
}

addSplits 办法

将 SourceSplitCoordinator 给以后 Reader 调配的 Splits 列表增加到本人的解决队列(Queue)或者汇合(Set)中。

示例
public void addSplits(List<RocketMQSplit> splits) {LOG.info("Subtask {} received {}(s) new splits, splits = {}.",
      context.getIndexOfSubtask(),
      CollectionUtils.size(splits),
      splits);

  assignedRocketMQSplits.addAll(splits);
}

hasMoreElements 办法

在无界的流计算场景中,会始终返回 true 保障 Reader 线程不被销毁。

在批式场景中,调配给该 Reader 的切片解决完之后会返回 false,示意该 Reader 生命周期的完结。

public boolean hasMoreElements() {if (boundedness == Boundedness.UNBOUNDEDNESS) {return true;}
  if (noMoreSplits) {return CollectionUtils.size(assignedRocketMQSplits) != 0;
  }
  return true;
}

pollNext 办法

在 addSplits 办法增加实现切片解决队列且 hasMoreElements 返回 true 时,该办法调用,开发者实现此办法真正和数据交互。

开发者在实现 pollNext 办法时候须要关注下列问题:

  • 切片数据的读取

    • 从结构好的切片中去读取数据。
  • 数据类型的转换

    • 将内部数据转换成 BitSail 的 Row 类型
示例

以 RocketMQSourceReader 为例:

从 split 队列中选取 split 进行解决,读取其信息,之后须要将读取到的信息转换成 BitSail 的 Row 类型,发送给上游解决。

public void pollNext(SourcePipeline<Row> pipeline) throws Exception {for (RocketMQSplit rocketmqSplit : assignedRocketMQSplits) {MessageQueue messageQueue = rocketmqSplit.getMessageQueue();
    PullResult pullResult = consumer.pull(rocketmqSplit.getMessageQueue(),
        consumerTag,
        rocketmqSplit.getStartOffset(),
        pollBatchSize,
        pollTimeout);

    if (Objects.isNull(pullResult) || CollectionUtils.isEmpty(pullResult.getMsgFoundList())) {continue;}

    for (MessageExt message : pullResult.getMsgFoundList()) {Row deserialize = deserializationSchema.deserialize(message.getBody());
      pipeline.output(deserialize);
      if (rocketmqSplit.getStartOffset() >= rocketmqSplit.getEndOffset()) {LOG.info("Subtask {} rocketmq split {} in end of stream.",
            context.getIndexOfSubtask(),
            rocketmqSplit);
        finishedRocketMQSplits.add(rocketmqSplit);
        break;
      }
    }
    rocketmqSplit.setStartOffset(pullResult.getNextBeginOffset());
    if (!commitInCheckpoint) {consumer.updateConsumeOffset(messageQueue, pullResult.getMaxOffset());
    }
  }
  assignedRocketMQSplits.removeAll(finishedRocketMQSplits);
}
转换为 BitSail Row 类型的罕用形式
自定义 RowDeserializer 类

对于不同格局的列利用不同 converter,设置到相应 Row 的 Field。

public class ClickhouseRowDeserializer {

  interface FiledConverter {Object apply(ResultSet resultSet) throws SQLException;
  }

  private final List<FiledConverter> converters;
  private final int fieldSize;

  public ClickhouseRowDeserializer(TypeInfo<?>[] typeInfos) {
    this.fieldSize = typeInfos.length;
    this.converters = new ArrayList<>();
    for (int i = 0; i < fieldSize; ++i) {converters.add(initFieldConverter(i + 1, typeInfos[i]));
    }
  }

  public Row convert(ResultSet resultSet) {Row row = new Row(fieldSize);
    try {for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).apply(resultSet));
      }
    } catch (SQLException e) {throw BitSailException.asBitSailException(ClickhouseErrorCode.CONVERT_ERROR, e.getCause());
    }
    return row;
  }

  private FiledConverter initFieldConverter(int index, TypeInfo<?> typeInfo) {if (!(typeInfo instanceof BasicTypeInfo)) {throw BitSailException.asBitSailException(CommonErrorCode.UNSUPPORTED_COLUMN_TYPE, typeInfo.getTypeClass().getName() + "is not supported yet.");
    }

    Class<?> curClass = typeInfo.getTypeClass();
    if (TypeInfos.BYTE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getByte(index);
    }
    if (TypeInfos.SHORT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getShort(index);
    }
    if (TypeInfos.INT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getInt(index);
    }
    if (TypeInfos.LONG_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getLong(index);
    }
    if (TypeInfos.BIG_INTEGER_TYPE_INFO.getTypeClass() == curClass) {
      return resultSet -> {BigDecimal dec = resultSet.getBigDecimal(index);
        return dec == null ? null : dec.toBigInteger();};
    }
    if (TypeInfos.FLOAT_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getFloat(index);
    }
    if (TypeInfos.DOUBLE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDouble(index);
    }
    if (TypeInfos.BIG_DECIMAL_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBigDecimal(index);
    }
    if (TypeInfos.STRING_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getString(index);
    }
    if (TypeInfos.SQL_DATE_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getDate(index);
    }
    if (TypeInfos.SQL_TIMESTAMP_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTimestamp(index);
    }
    if (TypeInfos.SQL_TIME_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getTime(index);
    }
    if (TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> resultSet.getBoolean(index);
    }
    if (TypeInfos.VOID_TYPE_INFO.getTypeClass() == curClass) {return resultSet -> null;}
    throw new UnsupportedOperationException("Unsupported data type:" + typeInfo);
  }
}
实现 DeserializationSchema 接口

绝对于实现 RowDeserializer,咱们更心愿大家去实现一个继承 DeserializationSchema 接口的实现类,将肯定类型格局的数据对数据比方 JSON、CSV 转换为 BitSail Row 类型。

在具体的利用时,咱们能够应用对立的接口创立相应的实现类

public class TextInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {

  private BitSailConfiguration deserializationConfiguration;

  private TypeInfo<?>[] typeInfos;

  private String[] fieldNames;

  private transient DeserializationSchema<byte[], Row> deserializationSchema;

  public TextInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
                                               TypeInfo<?>[] typeInfos,
                                               String[] fieldNames) {
    this.deserializationConfiguration = deserializationConfiguration;
    this.typeInfos = typeInfos;
    this.fieldNames = fieldNames;
    ContentType contentType = ContentType.valueOf(deserializationConfiguration.getNecessaryOption(HadoopReaderOptions.CONTENT_TYPE, HadoopErrorCode.REQUIRED_VALUE).toUpperCase());
    switch (contentType) {
      case CSV:
        this.deserializationSchema =
            new CsvDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
        break;
      case JSON:
        this.deserializationSchema =
            new JsonDeserializationSchema(deserializationConfiguration, typeInfos, fieldNames);
        break;
      default:
        throw BitSailException.asBitSailException(HadoopErrorCode.UNSUPPORTED_ENCODING, "unsupported parser type:" + contentType);
    }
  }

  @Override
  public Row deserialize(Writable message) {return deserializationSchema.deserialize((message.toString()).getBytes());
  }

  @Override
  public boolean isEndOfStream(Row nextElement) {return false;}
}

也能够自定义以后须要解析类专用的 DeserializationSchema:

public class MapredParquetInputFormatDeserializationSchema implements DeserializationSchema<Writable, Row> {

  private final BitSailConfiguration deserializationConfiguration;

  private final transient DateTimeFormatter localDateTimeFormatter;
  private final transient DateTimeFormatter localDateFormatter;
  private final transient DateTimeFormatter localTimeFormatter;
  private final int fieldSize;
  private final TypeInfo<?>[] typeInfos;
  private final String[] fieldNames;
  private final List<DeserializationConverter> converters;

  public MapredParquetInputFormatDeserializationSchema(BitSailConfiguration deserializationConfiguration,
                                  TypeInfo<?>[] typeInfos,
                                  String[] fieldNames) {

    this.deserializationConfiguration = deserializationConfiguration;
    this.typeInfos = typeInfos;
    this.fieldNames = fieldNames;
    this.localDateTimeFormatter = DateTimeFormatter.ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_TIME_PATTERN));
    this.localDateFormatter = DateTimeFormatter
        .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.DATE_PATTERN));
    this.localTimeFormatter = DateTimeFormatter
        .ofPattern(deserializationConfiguration.get(CommonOptions.DateFormatOptions.TIME_PATTERN));
    this.fieldSize = typeInfos.length;
    this.converters = Arrays.stream(typeInfos).map(this::createTypeInfoConverter).collect(Collectors.toList());
  }

  @Override
  public Row deserialize(Writable message) {
    int arity = fieldNames.length;
    Row row = new Row(arity);
    Writable[] writables = ((ArrayWritable) message).get();
    for (int i = 0; i < fieldSize; ++i) {row.setField(i, converters.get(i).convert(writables[i].toString()));
    }
    return row;
  }

  @Override
  public boolean isEndOfStream(Row nextElement) {return false;}

  private interface DeserializationConverter extends Serializable {Object convert(String input);
  }

  private DeserializationConverter createTypeInfoConverter(TypeInfo<?> typeInfo) {Class<?> typeClass = typeInfo.getTypeClass();

    if (typeClass == TypeInfos.VOID_TYPE_INFO.getTypeClass()) {return field -> null;}
    if (typeClass == TypeInfos.BOOLEAN_TYPE_INFO.getTypeClass()) {return this::convertToBoolean;}
    if (typeClass == TypeInfos.INT_TYPE_INFO.getTypeClass()) {return this::convertToInt;}
    throw BitSailException.asBitSailException(CsvFormatErrorCode.CSV_FORMAT_COVERT_FAILED,
        String.format("Csv format converter not support type info: %s.", typeInfo));
  }

  private boolean convertToBoolean(String field) {return Boolean.parseBoolean(field.trim());
  }

  private int convertToInt(String field) {return Integer.parseInt(field.trim());
  }
}

snapshotState 办法

生成并保留 State 的快照信息,用于 ckeckpoint。

示例
public List<RocketMQSplit> snapshotState(long checkpointId) {LOG.info("Subtask {} start snapshotting for checkpoint id = {}.", context.getIndexOfSubtask(), checkpointId);
  if (commitInCheckpoint) {for (RocketMQSplit rocketMQSplit : assignedRocketMQSplits) {
      try {consumer.updateConsumeOffset(rocketMQSplit.getMessageQueue(), rocketMQSplit.getStartOffset());
        LOG.debug("Subtask {} committed message queue = {} in checkpoint id = {}.", context.getIndexOfSubtask(),
            rocketMQSplit.getMessageQueue(),
            checkpointId);
      } catch (MQClientException e) {throw new RuntimeException(e);
      }
    }
  }
  return Lists.newArrayList(assignedRocketMQSplits);
}

hasMoreElements 办法

每次调用 pollNext 办法之前会做 sourceReader.hasMoreElements() 的判断,当且仅当判断通过,pollNext 办法才会被调用。

示例
public boolean hasMoreElements() {if (noMoreSplits) {return CollectionUtils.size(assignedHadoopSplits) != 0;
  }
  return true;
}

notifyNoMoreSplits 办法

当 Reader 解决完所有切片之后,会调用此办法。

示例
public void notifyNoMoreSplits() {LOG.info("Subtask {} received no more split signal.", context.getIndexOfSubtask());
  noMoreSplits = true;
}

【对于 BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和倡议:https://github.com/bytedance/bitsail/issues

奉献代码:https://github.com/bytedance/bitsail/pulls

BitSail 官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:mailto:bitsail+subscribe@googlegroups.com

退出 BitSail 技术社群

退出移动版