共计 10811 个字符,预计需要花费 28 分钟才能阅读完成。
更多技术交换、求职机会,欢送关注字节跳动数据平台微信公众号,回复【1】进入官网交换群
Sink Connector
BitSail Sink Connector 交互流程介绍
- Sink:数据写入组件的生命周期治理,次要负责和框架的交互,构架作业,它不参加作业真正的执行。
- Writer:负责将接管到的数据写到内部存储。
- WriterCommitter(可选):对数据进行提交操作,来实现两阶段提交的操作;实现 exactly-once 的语义。
开发者首先须要创立 Sink
类,实现 Sink
接口,次要负责数据写入组件的生命周期治理,构架作业。通过 configure
办法定义 writerConfiguration
的配置,通过 createTypeInfoConverter
办法来进行数据类型转换,将外部类型进行转换写到内部零碎,同 Source
局部。之后咱们再定义 Writer
类实现具体的数据写入逻辑,在 write
办法调用时将 BitSail Row
类型把数据写到缓存队列中,在 flush
办法调用时将缓存队列中的数据刷写到指标数据源中。
Sink
数据写入组件的生命周期治理,次要负责和框架的交互,构架作业,它不参加作业真正的执行。
对于每一个 Sink 工作,咱们要实现一个继承 Sink 接口的类。
Sink 接口
public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {
/**
* @return The name of writer operation.
*/
String getWriterName();
/**
* Configure writer with user defined options.
*
* @param commonConfiguration Common options.
* @param writerConfiguration Options for writer.
*/
void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception;
/**
* Create a writer for processing elements.
*
* @return An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> createWriter(Writer.Context<WriterStateT> context) throws IOException;
/**
* @return A converter which supports conversion from BitSail {@link TypeInfo}
* and external engine type.
*/
default TypeInfoConverter createTypeInfoConverter() {return new BitSailTypeInfoConverter();
}
/**
* @return A committer for commit committable objects.
*/
default Optional<WriterCommitter<CommitT>> createCommitter() {return Optional.empty();
}
/**
* @return A serializer which convert committable object to byte array.
*/
default BinarySerializer<CommitT> getCommittableSerializer() {return new SimpleBinarySerializer<CommitT>();
}
/**
* @return A serializer which convert state object to byte array.
*/
default BinarySerializer<WriterStateT> getWriteStateSerializer() {return new SimpleBinarySerializer<WriterStateT>();
}
}
configure 办法
负责 configuration 的初始化,通过 commonConfiguration 中的配置辨别流式工作或者批式工作,向 Writer 类传递 writerConfiguration。
示例
ElasticsearchSink:
public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) {writerConf = writerConfiguration;}
createWriter 办法
负责生成一个继承自 Writer 接口的 connector Writer 类。
createTypeInfoConverter 办法
类型转换,将外部类型进行转换写到内部零碎,同 Source 局部。
createCommitter 办法
可选办法,书写具体数据提交逻辑,个别用于想要保证数据 exactly-once 语义的场景,writer 在实现数据写入后,committer 来实现提交,进而实现二阶段提交,具体能够参考 Doris Connector 的实现。
Writer
具体的数据写入逻辑
Writer 接口
public interface Writer<InputT, CommT, WriterStateT> extends Serializable, Closeable {
/**
* Output an element to target source.
*
* @param element Input data from upstream.
*/
void write(InputT element) throws IOException;
/**
* Flush buffered input data to target source.
*
* @param endOfInput Flag indicates if all input data are delivered.
*/
void flush(boolean endOfInput) throws IOException;
/**
* Prepare commit information before snapshotting when checkpoint is triggerred.
*
* @return Information to commit in this checkpoint.
* @throws IOException Exceptions encountered when preparing committable information.
*/
List<CommT> prepareCommit() throws IOException;
/**
* Do snapshot for at each checkpoint.
*
* @param checkpointId The id of checkpoint when snapshot triggered.
* @return The current state of writer.
* @throws IOException Exceptions encountered when snapshotting.
*/
default List<WriterStateT> snapshotState(long checkpointId) throws IOException {return Collections.emptyList();
}
/**
* Closing writer when operator is closed.
*
* @throws IOException Exception encountered when closing writer.
*/
default void close() throws IOException {}
interface Context<WriterStateT> extends Serializable {TypeInfo<?>[] getTypeInfos();
int getIndexOfSubTaskId();
boolean isRestored();
List<WriterStateT> getRestoreStates();}
}
构造方法
依据 writerConfiguration 配置初始化数据源的连贯对象。
示例
public RedisWriter(BitSailConfiguration writerConfiguration) {
// initialize ttl
int ttl = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.TTL, -1);
TtlType ttlType;
try {ttlType = TtlType.valueOf(StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));
} catch (IllegalArgumentException e) {
throw BitSailException.asBitSailException(RedisPluginErrorCode.ILLEGAL_VALUE,
String.format("unknown ttl type: %s", writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));
}
int ttlInSeconds = ttl < 0 ? -1 : ttl * ttlType.getContainSeconds();
log.info("ttl is {}(s)", ttlInSeconds);
// initialize commandDescription
String redisDataType = StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.REDIS_DATA_TYPE));
String additionalKey = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.ADDITIONAL_KEY, "default_redis_key");
this.commandDescription = initJedisCommandDescription(redisDataType, ttlInSeconds, additionalKey);
this.columnSize = writerConfiguration.get(RedisWriterOptions.COLUMNS).size();
// initialize jedis pool
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_TOTAL_CONNECTIONS));
jedisPoolConfig.setMaxIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_IDLE_CONNECTIONS));
jedisPoolConfig.setMinIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MIN_IDLE_CONNECTIONS));
jedisPoolConfig.setMaxWait(Duration.ofMillis(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_WAIT_TIME_IN_MILLIS)));
String redisHost = writerConfiguration.getNecessaryOption(RedisWriterOptions.HOST, RedisPluginErrorCode.REQUIRED_VALUE);
int redisPort = writerConfiguration.getNecessaryOption(RedisWriterOptions.PORT, RedisPluginErrorCode.REQUIRED_VALUE);
String redisPassword = writerConfiguration.get(RedisWriterOptions.PASSWORD);
int timeout = writerConfiguration.get(RedisWriterOptions.CLIENT_TIMEOUT_MS);
if (StringUtils.isEmpty(redisPassword)) {this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout);
} else {this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout, redisPassword);
}
// initialize record queue
int batchSize = writerConfiguration.get(RedisWriterOptions.WRITE_BATCH_INTERVAL);
this.recordQueue = new CircularFifoQueue<>(batchSize);
this.logSampleInterval = writerConfiguration.get(RedisWriterOptions.LOG_SAMPLE_INTERVAL);
this.jedisFetcher = RetryerBuilder.<Jedis>newBuilder()
.retryIfResult(Objects::isNull)
.retryIfRuntimeException()
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES))
.build()
.wrap(jedisPool::getResource);
this.maxAttemptCount = writerConfiguration.get(RedisWriterOptions.MAX_ATTEMPT_COUNT);
this.retryer = RetryerBuilder.<Boolean>newBuilder()
.retryIfResult(needRetry -> Objects.equals(needRetry, true))
.retryIfException(e -> !(e instanceof BitSailException))
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(maxAttemptCount))
.build();}
write 办法
该办法调用时会将 BitSail Row 类型把数据写到缓存队列中,也能够在这里对 Row 类型数据进行各种格局预处理。间接存储到缓存队列中,或者进行加工解决。如果这里设定了缓存队列的大小,那么在缓存队列写满后要调用 flush 进行刷写。
示例
redis:将 BitSail Row 格局的数据间接存储到肯定大小的缓存队列中
public void write(Row record) throws IOException {validate(record);
this.recordQueue.add(record);
if (recordQueue.isAtFullCapacity()) {flush(false);
}
}
Druid:将 BitSail Row 格局的数据做格局预处理,转化到 StringBuffer 中储存起来。
@Override
public void write(final Row element) {final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, "","");
for (int i = 0; i < element.getArity(); i++) {final Object v = element.getField(i);
if (v != null) {joiner.add(v.toString());
}
}
// timestamp column is a required field to add in Druid.
// See https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp
joiner.add(String.valueOf(processTime));
data.append(joiner);
data.append(DEFAULT_LINE_DELIMITER);
}
flush 办法
该办法中次要实现将 write 办法的缓存中的数据刷写到指标数据源中。
示例
redis:将缓存队列中的 BitSail Row 格局的数据刷写到指标数据源中。
public void flush(boolean endOfInput) throws IOException {
processorId++;
try (PipelineProcessor processor = genPipelineProcessor(recordQueue.size(), this.complexTypeWithTtl)) {
Row record;
while ((record = recordQueue.poll()) != null) {String key = (String) record.getField(0);
String value = (String) record.getField(1);
String scoreOrHashKey = value;
if (columnSize == SORTED_SET_OR_HASH_COLUMN_SIZE) {value = (String) record.getField(2);
// Replace empty key with additionalKey in sorted set and hash.
if (key.length() == 0) {key = commandDescription.getAdditionalKey();
}
}
if (commandDescription.getJedisCommand() == JedisCommand.ZADD) {
// sorted set
processor.addInitialCommand(new Command(commandDescription, key.getBytes(), parseScoreFromString(scoreOrHashKey), value.getBytes()));
} else if (commandDescription.getJedisCommand() == JedisCommand.HSET) {
// hash
processor.addInitialCommand(new Command(commandDescription, key.getBytes(), scoreOrHashKey.getBytes(), value.getBytes()));
} else if (commandDescription.getJedisCommand() == JedisCommand.HMSET) {
//mhset
if ((record.getArity() - 1) % 2 != 0) {throw new BitSailException(CONVERT_NOT_SUPPORT, "Inconsistent data entry.");
}
List<byte[]> datas = Arrays.stream(record.getFields())
.collect(Collectors.toList()).stream().map(o -> ((String) o).getBytes())
.collect(Collectors.toList()).subList(1, record.getFields().length);
Map<byte[], byte[]> map = new HashMap<>((record.getArity() - 1) / 2);
for (int index = 0; index < datas.size(); index = index + 2) {map.put(datas.get(index), datas.get(index + 1));
}
processor.addInitialCommand(new Command(commandDescription, key.getBytes(), map));
} else {
// set and string
processor.addInitialCommand(new Command(commandDescription, key.getBytes(), value.getBytes()));
}
}
retryer.call(processor::run);
} catch (ExecutionException | RetryException e) {if (e.getCause() instanceof BitSailException) {throw (BitSailException) e.getCause();} else if (e.getCause() instanceof RedisUnexpectedException) {throw (RedisUnexpectedException) e.getCause();}
throw e;
} catch (IOException e) {throw new RuntimeException("Error while init jedis client.", e);
}
}
Druid:应用 HTTP post 形式提交 sink 作业给数据源。
private HttpURLConnection provideHttpURLConnection(final String coordinatorURL) throws IOException {final URL url = new URL("http://" + coordinatorURL + DRUID_ENDPOINT);
final HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type", "application/json");
con.setRequestProperty("Accept", "application/json, text/plain, */*");
con.setDoOutput(true);
return con;
}
public void flush(final boolean endOfInput) throws IOException {final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);
final ParallelIndexSupervisorTask indexTask = provideIndexTask(ioConfig);
final String inputJSON = provideInputJSONString(indexTask);
final byte[] input = inputJSON.getBytes();
try (final OutputStream os = httpURLConnection.getOutputStream()) {os.write(input, 0, input.length);
}
try (final BufferedReader br =
new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), StandardCharsets.UTF_8))) {final StringBuilder response = new StringBuilder();
String responseLine;
while ((responseLine = br.readLine()) != null) {response.append(responseLine.trim());
}
LOG.info("Druid write task has been sent, and the response is {}", response);
}
}
close 办法
敞开之前创立的各种指标数据源连贯对象。
示例
public void close() throws IOException {bulkProcessor.close();
restClient.close();
checkErrorAndRethrow();}
【对于 BitSail】:
⭐️ Star 不迷路 https://github.com/bytedance/bitsail
提交问题和倡议:https://github.com/bytedance/bitsail/issues
奉献代码:https://github.com/bytedance/bitsail/pulls
BitSail 官网:https://bytedance.github.io/bitsail/zh/
订阅邮件列表:bitsail+subscribe@googlegroups.com
退出 BitSail 技术社群