关于大数据:Connector开发详解系列四SinkWriter

7次阅读

共计 10811 个字符,预计需要花费 28 分钟才能阅读完成。

更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群

Sink Connector

BitSail Sink Connector 交互流程介绍

  • Sink:数据写入组件的生命周期治理,次要负责和框架的交互,构架作业,它不参加作业真正的执行。
  • Writer:负责将接管到的数据写到内部存储。
  • WriterCommitter(可选):对数据进行提交操作,来实现两阶段提交的操作;实现 exactly-once 的语义。

开发者首先须要创立 Sink 类,实现 Sink 接口,次要负责数据写入组件的生命周期治理,构架作业。通过 configure 办法定义 writerConfiguration 的配置,通过 createTypeInfoConverter 办法来进行数据类型转换,将外部类型进行转换写到内部零碎,同 Source 局部。之后咱们再定义 Writer 类实现具体的数据写入逻辑,在 write 办法调用时将 BitSail Row 类型把数据写到缓存队列中,在 flush 办法调用时将缓存队列中的数据刷写到指标数据源中。

Sink

数据写入组件的生命周期治理,次要负责和框架的交互,构架作业,它不参加作业真正的执行。

对于每一个 Sink 工作,咱们要实现一个继承 Sink 接口的类。

Sink 接口

public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {

  /**
*  @return  The name of writer operation.
*/
String getWriterName();

  /**
* Configure writer with user defined options.
*
*  @param  commonConfiguration Common options.
*  @param  writerConfiguration Options for writer.
*/
void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception;

  /**
* Create a writer for processing elements.
*
*  @return  An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> createWriter(Writer.Context<WriterStateT> context) throws IOException;

  /**
*  @return  A converter which supports conversion from BitSail {@link  TypeInfo}
* and external engine type.
*/
default TypeInfoConverter createTypeInfoConverter() {return new BitSailTypeInfoConverter();
  }

  /**
*  @return  A committer for commit committable objects.
*/
default Optional<WriterCommitter<CommitT>> createCommitter() {return Optional.empty();
  }

  /**
*  @return  A serializer which convert committable object to byte array.
*/
default BinarySerializer<CommitT> getCommittableSerializer() {return new SimpleBinarySerializer<CommitT>();
  }

  /**
*  @return  A serializer which convert state object to byte array.
*/
default BinarySerializer<WriterStateT> getWriteStateSerializer() {return new SimpleBinarySerializer<WriterStateT>();
  }
}

configure 办法

负责 configuration 的初始化,通过 commonConfiguration 中的配置辨别流式工作或者批式工作,向 Writer 类传递 writerConfiguration。

示例

ElasticsearchSink:

public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) {writerConf = writerConfiguration;}

createWriter 办法

负责生成一个继承自 Writer 接口的 connector Writer 类。

createTypeInfoConverter 办法

类型转换,将外部类型进行转换写到内部零碎,同 Source 局部。

createCommitter 办法

可选办法,书写具体数据提交逻辑,个别用于想要保证数据 exactly-once 语义的场景,writer 在实现数据写入后,committer 来实现提交,进而实现二阶段提交,具体能够参考 Doris Connector 的实现。

Writer

具体的数据写入逻辑

Writer 接口

public interface Writer<InputT, CommT, WriterStateT> extends Serializable, Closeable {

  /**
* Output an element to target source.
*
*  @param  element Input data from upstream.
*/
void write(InputT element) throws IOException;

  /**
* Flush buffered input data to target source.
*
*  @param  endOfInput Flag indicates if all input data are delivered.
*/
void flush(boolean endOfInput) throws IOException;

  /**
* Prepare commit information before snapshotting when checkpoint is triggerred.
*
*  @return  Information to commit in this checkpoint.
*  @throws  IOException Exceptions encountered when preparing committable information.
*/
List<CommT> prepareCommit() throws IOException;

  /**
* Do snapshot for at each checkpoint.
*
*  @param  checkpointId The id of checkpoint when snapshot triggered.
*  @return  The current state of writer.
*  @throws  IOException Exceptions encountered when snapshotting.
*/
default List<WriterStateT> snapshotState(long checkpointId) throws IOException {return Collections.emptyList();
  }

  /**
* Closing writer when operator is closed.
*
*  @throws  IOException Exception encountered when closing writer.
*/
default void close() throws IOException {}

  interface Context<WriterStateT> extends Serializable {TypeInfo<?>[] getTypeInfos();

    int getIndexOfSubTaskId();

    boolean isRestored();

    List<WriterStateT> getRestoreStates();}
}

构造方法

依据 writerConfiguration 配置初始化数据源的连贯对象。

示例
public RedisWriter(BitSailConfiguration writerConfiguration) {
  // initialize ttl
  int ttl = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.TTL, -1);
  TtlType ttlType;
  try {ttlType = TtlType.valueOf(StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));
  } catch (IllegalArgumentException e) {
    throw BitSailException.asBitSailException(RedisPluginErrorCode.ILLEGAL_VALUE,
        String.format("unknown ttl type: %s", writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));
  }
  int ttlInSeconds = ttl < 0 ? -1 : ttl * ttlType.getContainSeconds();
  log.info("ttl is {}(s)", ttlInSeconds);

  // initialize commandDescription
  String redisDataType = StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.REDIS_DATA_TYPE));
  String additionalKey = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.ADDITIONAL_KEY, "default_redis_key");
  this.commandDescription = initJedisCommandDescription(redisDataType, ttlInSeconds, additionalKey);
  this.columnSize = writerConfiguration.get(RedisWriterOptions.COLUMNS).size();

  // initialize jedis pool
  JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  jedisPoolConfig.setMaxTotal(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_TOTAL_CONNECTIONS));
  jedisPoolConfig.setMaxIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_IDLE_CONNECTIONS));
  jedisPoolConfig.setMinIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MIN_IDLE_CONNECTIONS));
  jedisPoolConfig.setMaxWait(Duration.ofMillis(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_WAIT_TIME_IN_MILLIS)));

  String redisHost = writerConfiguration.getNecessaryOption(RedisWriterOptions.HOST, RedisPluginErrorCode.REQUIRED_VALUE);
  int redisPort = writerConfiguration.getNecessaryOption(RedisWriterOptions.PORT, RedisPluginErrorCode.REQUIRED_VALUE);
  String redisPassword = writerConfiguration.get(RedisWriterOptions.PASSWORD);
  int timeout = writerConfiguration.get(RedisWriterOptions.CLIENT_TIMEOUT_MS);

  if (StringUtils.isEmpty(redisPassword)) {this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout);
  } else {this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout, redisPassword);
  }

  // initialize record queue
  int batchSize = writerConfiguration.get(RedisWriterOptions.WRITE_BATCH_INTERVAL);
  this.recordQueue = new CircularFifoQueue<>(batchSize);

  this.logSampleInterval = writerConfiguration.get(RedisWriterOptions.LOG_SAMPLE_INTERVAL);
  this.jedisFetcher = RetryerBuilder.<Jedis>newBuilder()
      .retryIfResult(Objects::isNull)
      .retryIfRuntimeException()
      .withStopStrategy(StopStrategies.stopAfterAttempt(3))
      .withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES))
      .build()
      .wrap(jedisPool::getResource);

  this.maxAttemptCount = writerConfiguration.get(RedisWriterOptions.MAX_ATTEMPT_COUNT);
  this.retryer = RetryerBuilder.<Boolean>newBuilder()
      .retryIfResult(needRetry -> Objects.equals(needRetry, true))
      .retryIfException(e -> !(e instanceof BitSailException))
      .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
      .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttemptCount))
      .build();}

write 办法

该办法调用时会将 BitSail Row 类型把数据写到缓存队列中,也能够在这里对 Row 类型数据进行各种格局预处理。间接存储到缓存队列中,或者进行加工解决。如果这里设定了缓存队列的大小,那么在缓存队列写满后要调用 flush 进行刷写。

示例

redis:将 BitSail Row 格局的数据间接存储到肯定大小的缓存队列中

public void write(Row record) throws IOException {validate(record);
  this.recordQueue.add(record);
  if (recordQueue.isAtFullCapacity()) {flush(false);
  }
}

Druid:将 BitSail Row 格局的数据做格局预处理,转化到 StringBuffer 中储存起来。

@Override
public void write(final Row element) {final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, "","");
  for (int i = 0; i < element.getArity(); i++) {final Object v = element.getField(i);
    if (v != null) {joiner.add(v.toString());
    }
  }
  // timestamp column is a required field to add in Druid.
  // See https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp
  joiner.add(String.valueOf(processTime));
  data.append(joiner);
  data.append(DEFAULT_LINE_DELIMITER);
}

flush 办法

该办法中次要实现将 write 办法的缓存中的数据刷写到指标数据源中。

示例

redis:将缓存队列中的 BitSail Row 格局的数据刷写到指标数据源中。

public void flush(boolean endOfInput) throws IOException {
  processorId++;
  try (PipelineProcessor processor = genPipelineProcessor(recordQueue.size(), this.complexTypeWithTtl)) {
    Row record;
    while ((record = recordQueue.poll()) != null) {String key = (String) record.getField(0);
      String value = (String) record.getField(1);
      String scoreOrHashKey = value;
      if (columnSize == SORTED_SET_OR_HASH_COLUMN_SIZE) {value = (String) record.getField(2);
        // Replace empty key with additionalKey in sorted set and hash.
        if (key.length() == 0) {key = commandDescription.getAdditionalKey();
        }
      }

      if (commandDescription.getJedisCommand() == JedisCommand.ZADD) {
        // sorted set
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), parseScoreFromString(scoreOrHashKey), value.getBytes()));
      } else if (commandDescription.getJedisCommand() == JedisCommand.HSET) {
        // hash
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), scoreOrHashKey.getBytes(), value.getBytes()));
      } else if (commandDescription.getJedisCommand() == JedisCommand.HMSET) {
        //mhset
        if ((record.getArity() - 1) % 2 != 0) {throw new BitSailException(CONVERT_NOT_SUPPORT, "Inconsistent data entry.");
        }
        List<byte[]> datas = Arrays.stream(record.getFields())
            .collect(Collectors.toList()).stream().map(o -> ((String) o).getBytes())
            .collect(Collectors.toList()).subList(1, record.getFields().length);
        Map<byte[], byte[]> map = new HashMap<>((record.getArity() - 1) / 2);
        for (int index = 0; index < datas.size(); index = index + 2) {map.put(datas.get(index), datas.get(index + 1));
        }
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), map));
      } else {
        // set and string
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), value.getBytes()));
      }
    }
    retryer.call(processor::run);
  } catch (ExecutionException | RetryException e) {if (e.getCause() instanceof BitSailException) {throw (BitSailException) e.getCause();} else if (e.getCause() instanceof RedisUnexpectedException) {throw (RedisUnexpectedException) e.getCause();}
    throw e;
  } catch (IOException e) {throw new RuntimeException("Error while init jedis client.", e);
  }
}

Druid:应用 HTTP post 形式提交 sink 作业给数据源。

private HttpURLConnection provideHttpURLConnection(final String coordinatorURL) throws IOException {final URL url = new URL("http://" + coordinatorURL + DRUID_ENDPOINT);
    final HttpURLConnection con = (HttpURLConnection) url.openConnection();
    con.setRequestMethod("POST");
    con.setRequestProperty("Content-Type", "application/json");
    con.setRequestProperty("Accept", "application/json, text/plain, */*");
    con.setDoOutput(true);
    return con;
  }
  
  public void flush(final boolean endOfInput) throws IOException {final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);
    final ParallelIndexSupervisorTask indexTask = provideIndexTask(ioConfig);
    final String inputJSON = provideInputJSONString(indexTask);
    final byte[] input = inputJSON.getBytes();
    try (final OutputStream os = httpURLConnection.getOutputStream()) {os.write(input, 0, input.length);
    }
    try (final BufferedReader br =
                 new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), StandardCharsets.UTF_8))) {final StringBuilder response = new StringBuilder();
      String responseLine;
      while ((responseLine = br.readLine()) != null) {response.append(responseLine.trim());
      }
      LOG.info("Druid write task has been sent, and the response is {}", response);
    }
  }

close 办法

敞开之前创立的各种指标数据源连贯对象。

示例
public void close() throws IOException {bulkProcessor.close();
  restClient.close();
  checkErrorAndRethrow();}

【对于 BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和倡议:https://github.com/bytedance/bitsail/issues

奉献代码:https://github.com/bytedance/bitsail/pulls

BitSail 官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:bitsail+subscribe@googlegroups.com

退出 BitSail 技术社群

正文完
 0