更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

Sink Connector

BitSail Sink Connector交互流程介绍

  • Sink:数据写入组件的生命周期治理,次要负责和框架的交互,构架作业,它不参加作业真正的执行。
  • Writer:负责将接管到的数据写到内部存储。
  • WriterCommitter(可选):对数据进行提交操作,来实现两阶段提交的操作;实现exactly-once的语义。

开发者首先须要创立Sink类,实现Sink接口,次要负责数据写入组件的生命周期治理,构架作业。通过configure办法定义writerConfiguration的配置,通过createTypeInfoConverter办法来进行数据类型转换,将外部类型进行转换写到内部零碎,同Source局部。之后咱们再定义Writer类实现具体的数据写入逻辑,在write办法调用时将BitSail Row类型把数据写到缓存队列中,在flush办法调用时将缓存队列中的数据刷写到指标数据源中。

Sink

数据写入组件的生命周期治理,次要负责和框架的交互,构架作业,它不参加作业真正的执行。

对于每一个Sink工作,咱们要实现一个继承Sink接口的类。

Sink接口

public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {  /***  @return  The name of writer operation.*/String getWriterName();  /*** Configure writer with user defined options.**  @param  commonConfiguration Common options.*  @param  writerConfiguration Options for writer.*/void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception;  /*** Create a writer for processing elements.**  @return  An initialized writer.*/Writer<InputT, CommitT, WriterStateT> createWriter(Writer.Context<WriterStateT> context) throws IOException;  /***  @return  A converter which supports conversion from BitSail {  @link  TypeInfo}* and external engine type.*/default TypeInfoConverter createTypeInfoConverter() {    return new BitSailTypeInfoConverter();  }  /***  @return  A committer for commit committable objects.*/default Optional<WriterCommitter<CommitT>> createCommitter() {    return Optional.empty();  }  /***  @return  A serializer which convert committable object to byte array.*/default BinarySerializer<CommitT> getCommittableSerializer() {    return new SimpleBinarySerializer<CommitT>();  }  /***  @return  A serializer which convert state object to byte array.*/default BinarySerializer<WriterStateT> getWriteStateSerializer() {    return new SimpleBinarySerializer<WriterStateT>();  }}

configure办法

负责configuration的初始化,通过commonConfiguration中的配置辨别流式工作或者批式工作,向Writer类传递writerConfiguration。

示例

ElasticsearchSink:

public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) {  writerConf = writerConfiguration;}

createWriter办法

负责生成一个继承自Writer接口的connector Writer类。

createTypeInfoConverter办法

类型转换,将外部类型进行转换写到内部零碎,同Source局部。

createCommitter办法

可选办法,书写具体数据提交逻辑,个别用于想要保证数据exactly-once语义的场景,writer在实现数据写入后,committer来实现提交,进而实现二阶段提交,具体能够参考Doris Connector的实现。

Writer

具体的数据写入逻辑

Writer接口

public interface Writer<InputT, CommT, WriterStateT> extends Serializable, Closeable {  /*** Output an element to target source.**  @param  element Input data from upstream.*/void write(InputT element) throws IOException;  /*** Flush buffered input data to target source.**  @param  endOfInput Flag indicates if all input data are delivered.*/void flush(boolean endOfInput) throws IOException;  /*** Prepare commit information before snapshotting when checkpoint is triggerred.**  @return  Information to commit in this checkpoint.*  @throws  IOException Exceptions encountered when preparing committable information.*/List<CommT> prepareCommit() throws IOException;  /*** Do snapshot for at each checkpoint.**  @param  checkpointId The id of checkpoint when snapshot triggered.*  @return  The current state of writer.*  @throws  IOException Exceptions encountered when snapshotting.*/default List<WriterStateT> snapshotState(long checkpointId) throws IOException {    return Collections.emptyList();  }  /*** Closing writer when operator is closed.**  @throws  IOException Exception encountered when closing writer.*/default void close() throws IOException {  }  interface Context<WriterStateT> extends Serializable {    TypeInfo<?>[] getTypeInfos();    int getIndexOfSubTaskId();    boolean isRestored();    List<WriterStateT> getRestoreStates();  }}

构造方法

依据writerConfiguration配置初始化数据源的连贯对象。

示例
public RedisWriter(BitSailConfiguration writerConfiguration) {  // initialize ttl  int ttl = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.TTL, -1);  TtlType ttlType;  try {    ttlType = TtlType.valueOf(StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));  } catch (IllegalArgumentException e) {    throw BitSailException.asBitSailException(RedisPluginErrorCode.ILLEGAL_VALUE,        String.format("unknown ttl type: %s", writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));  }  int ttlInSeconds = ttl < 0 ? -1 : ttl * ttlType.getContainSeconds();  log.info("ttl is {}(s)", ttlInSeconds);  // initialize commandDescription  String redisDataType = StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.REDIS_DATA_TYPE));  String additionalKey = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.ADDITIONAL_KEY, "default_redis_key");  this.commandDescription = initJedisCommandDescription(redisDataType, ttlInSeconds, additionalKey);  this.columnSize = writerConfiguration.get(RedisWriterOptions.COLUMNS).size();  // initialize jedis pool  JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();  jedisPoolConfig.setMaxTotal(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_TOTAL_CONNECTIONS));  jedisPoolConfig.setMaxIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_IDLE_CONNECTIONS));  jedisPoolConfig.setMinIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MIN_IDLE_CONNECTIONS));  jedisPoolConfig.setMaxWait(Duration.ofMillis(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_WAIT_TIME_IN_MILLIS)));  String redisHost = writerConfiguration.getNecessaryOption(RedisWriterOptions.HOST, RedisPluginErrorCode.REQUIRED_VALUE);  int redisPort = writerConfiguration.getNecessaryOption(RedisWriterOptions.PORT, RedisPluginErrorCode.REQUIRED_VALUE);  String redisPassword = writerConfiguration.get(RedisWriterOptions.PASSWORD);  int timeout = writerConfiguration.get(RedisWriterOptions.CLIENT_TIMEOUT_MS);  if (StringUtils.isEmpty(redisPassword)) {    this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout);  } else {    this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout, redisPassword);  }  // initialize record queue  int batchSize = writerConfiguration.get(RedisWriterOptions.WRITE_BATCH_INTERVAL);  this.recordQueue = new CircularFifoQueue<>(batchSize);  this.logSampleInterval = writerConfiguration.get(RedisWriterOptions.LOG_SAMPLE_INTERVAL);  this.jedisFetcher = RetryerBuilder.<Jedis>newBuilder()      .retryIfResult(Objects::isNull)      .retryIfRuntimeException()      .withStopStrategy(StopStrategies.stopAfterAttempt(3))      .withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES))      .build()      .wrap(jedisPool::getResource);  this.maxAttemptCount = writerConfiguration.get(RedisWriterOptions.MAX_ATTEMPT_COUNT);  this.retryer = RetryerBuilder.<Boolean>newBuilder()      .retryIfResult(needRetry -> Objects.equals(needRetry, true))      .retryIfException(e -> !(e instanceof BitSailException))      .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))      .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttemptCount))      .build();}

write办法

该办法调用时会将BitSail Row类型把数据写到缓存队列中,也能够在这里对Row类型数据进行各种格局预处理。间接存储到缓存队列中,或者进行加工解决。如果这里设定了缓存队列的大小,那么在缓存队列写满后要调用flush进行刷写。

示例

redis:将BitSail Row格局的数据间接存储到肯定大小的缓存队列中

public void write(Row record) throws IOException {  validate(record);  this.recordQueue.add(record);  if (recordQueue.isAtFullCapacity()) {    flush(false);  }}

Druid:将BitSail Row格局的数据做格局预处理,转化到StringBuffer中储存起来。

@Overridepublic void write(final Row element) {  final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, "", "");  for (int i = 0; i < element.getArity(); i++) {    final Object v = element.getField(i);    if (v != null) {      joiner.add(v.toString());    }  }  // timestamp column is a required field to add in Druid.  // See https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp  joiner.add(String.valueOf(processTime));  data.append(joiner);  data.append(DEFAULT_LINE_DELIMITER);}

flush办法

该办法中次要实现将write办法的缓存中的数据刷写到指标数据源中。

示例

redis:将缓存队列中的BitSail Row格局的数据刷写到指标数据源中。

public void flush(boolean endOfInput) throws IOException {  processorId++;  try (PipelineProcessor processor = genPipelineProcessor(recordQueue.size(), this.complexTypeWithTtl)) {    Row record;    while ((record = recordQueue.poll()) != null) {      String key = (String) record.getField(0);      String value = (String) record.getField(1);      String scoreOrHashKey = value;      if (columnSize == SORTED_SET_OR_HASH_COLUMN_SIZE) {        value = (String) record.getField(2);        // Replace empty key with additionalKey in sorted set and hash.        if (key.length() == 0) {          key = commandDescription.getAdditionalKey();        }      }      if (commandDescription.getJedisCommand() == JedisCommand.ZADD) {        // sorted set        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), parseScoreFromString(scoreOrHashKey), value.getBytes()));      } else if (commandDescription.getJedisCommand() == JedisCommand.HSET) {        // hash        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), scoreOrHashKey.getBytes(), value.getBytes()));      } else if (commandDescription.getJedisCommand() == JedisCommand.HMSET) {        //mhset        if ((record.getArity() - 1) % 2 != 0) {          throw new BitSailException(CONVERT_NOT_SUPPORT, "Inconsistent data entry.");        }        List<byte[]> datas = Arrays.stream(record.getFields())            .collect(Collectors.toList()).stream().map(o -> ((String) o).getBytes())            .collect(Collectors.toList()).subList(1, record.getFields().length);        Map<byte[], byte[]> map = new HashMap<>((record.getArity() - 1) / 2);        for (int index = 0; index < datas.size(); index = index + 2) {          map.put(datas.get(index), datas.get(index + 1));        }        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), map));      } else {        // set and string        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), value.getBytes()));      }    }    retryer.call(processor::run);  } catch (ExecutionException | RetryException e) {    if (e.getCause() instanceof BitSailException) {      throw (BitSailException) e.getCause();    } else if (e.getCause() instanceof RedisUnexpectedException) {      throw (RedisUnexpectedException) e.getCause();    }    throw e;  } catch (IOException e) {    throw new RuntimeException("Error while init jedis client.", e);  }}

Druid:应用HTTP post形式提交sink作业给数据源。

private HttpURLConnection provideHttpURLConnection(final String coordinatorURL) throws IOException {    final URL url = new URL("http://" + coordinatorURL + DRUID_ENDPOINT);    final HttpURLConnection con = (HttpURLConnection) url.openConnection();    con.setRequestMethod("POST");    con.setRequestProperty("Content-Type", "application/json");    con.setRequestProperty("Accept", "application/json, text/plain, */*");    con.setDoOutput(true);    return con;  }    public void flush(final boolean endOfInput) throws IOException {    final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);    final ParallelIndexSupervisorTask indexTask = provideIndexTask(ioConfig);    final String inputJSON = provideInputJSONString(indexTask);    final byte[] input = inputJSON.getBytes();    try (final OutputStream os = httpURLConnection.getOutputStream()) {      os.write(input, 0, input.length);    }    try (final BufferedReader br =                 new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), StandardCharsets.UTF_8))) {      final StringBuilder response = new StringBuilder();      String responseLine;      while ((responseLine = br.readLine()) != null) {        response.append(responseLine.trim());      }      LOG.info("Druid write task has been sent, and the response is {}", response);    }  }

close办法

敞开之前创立的各种指标数据源连贯对象。

示例
public void close() throws IOException {  bulkProcessor.close();  restClient.close();  checkErrorAndRethrow();}

【对于BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和倡议:https://github.com/bytedance/bitsail/issues

奉献代码:https://github.com/bytedance/bitsail/pulls

BitSail官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:bitsail+subscribe@googlegroups.com

退出BitSail技术社群