关于flink:Flink自定义HBasesink的过程

46次阅读

共计 12034 个字符,预计需要花费 31 分钟才能阅读完成。

文末有官网的残缺的 HBasesink 代码;

Flink 自定义 sink 端

自定义 sink 次要有二种实现形式, 一种是实现 RichSinkFunction 和 SInkFuncion

如果要实现 Checkpointed 还须要实现 CheckpointedFunction 接口

举荐应用 RichSinkFunction 全生命周期函数

HBaseSink 的实现

简略版 实现 RichSinkFunction 但不具备通用性, 没对一个源就须要批改代码, 且没有提供构建办法, 开始本人写的一个版本


import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
import java.io.IOException;
 
 
/**
 * @author WangKaiYu
 * @date 2022-02-18 15:19
 */
 
public class HbaseSink extends RichSinkFunction<String>{private final static Logger logger = LoggerFactory.getLogger(HbaseSink.class);
    private static org.apache.hadoop.conf.Configuration configuration;
    private static Connection connection= null;
    private static BufferedMutator mutator;
    private static String Tablename="student";
 
    @Override
    public void open(Configuration parameters) throws Exception {configuration=HBaseConfiguration.create();
        configuration.set("hbase.master", "192.168.10.154:16000");
        configuration.set("hbase.zookeeper.quorum", "192.168.10.154,192.168.10.155,192.168.10.196");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        try {//           asyncConnection = ConnectionFactory.createAsyncConnection(configuration);
             connection = ConnectionFactory.createConnection(configuration);
//            connection.getBufferedMutator(TableName.valueOf(Tablename).)
        }catch (Exception e){e.printStackTrace();
        }
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(Tablename));
 
        // 缓存大小
        params.writeBufferSize(2*1024*1024);
        // 最大工夫
        params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L);
//        AsyncConnection asyncConnection = HbaseSink.asyncConnection.get();
        try {mutator = connection.getBufferedMutator(params);
        }catch (IOException e){logger.error("以后获取 bufferedMutator 失败:" + e.getMessage());
        }
 
    }
 
    @Override
    public void close() throws Exception {if (mutator != null) {mutator.close();
        }
        if (connection != null) {connection.close();
        }
 
    }
 
    @Override
    public void invoke(String value, Context context) throws Exception {
        String familyName = "info";
        String[] values = value.split(",");
        Put put = new Put(Bytes.toBytes(values[0]));
        put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes("name"),Bytes.toBytes(values[1]));
        put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes("age"),Bytes.toBytes(values[2]));
        mutator.mutate(put);
        // 指定工夫内的数据强制刷写到 hbase
        mutator.flush();}
}

源码介绍

1,次要类介绍

BufferedMutatorParams

实例化一个 BufferedMutator 所须要的参数。

主要参数 TableName(表名),writeBufferSize(写缓存大小),maxKeyValueSize(最大 key-value 大小),ExecutorService(执行线程池),ExceptionListener(监听 BufferedMutator 的异样)。

AsyncProcess

AsyncProcess 外部保护的有一个线程池,咱们的操作会被封装成 runnable,而后扔到线程池里执行。这个过程是异步的,直到工作数达到最大值。

HConnectionImplementation

一个集群的链接。通过它能够找到 master,定位到 regions 的散布,放弃 locations 的缓存,并领导如何校准 localtions 信息。

BufferedMutator 构建的过程


1),首先是要构建一个 HBaseConfiguration
 
Configuration conf =  HBaseConfiguration.create();
 conf.set("hbase.zookeeper.quorum", "zookeeperHost");
 
2),接着是构建 BufferedMutatorParams
 
final BufferedMutator.ExceptionListener listener = newBufferedMutator.ExceptionListener() {
  @Override
  public void onException(RetriesExhaustedWithDetailsException e,BufferedMutator mutator) {for (int i = 0; i < e.getNumExceptions(); i++) {LOG.info("Failed to sent put" + e.getRow(i) + ".");
    }
  }
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE)
    .listener(listener);
params.writeBufferSize(123);
 
 
 
3),最初构建 HConnection
 
 Connection conn = ConnectionFactory.createConnection(getConf())
 
 
4),最初构建 BufferMutator
 
BufferedMutator mutator = conn.getBufferedMutator(params)

数据发送的过程


 
1),构建 put 或者 List[put]
 
2),调用 BufferedMutator.mutate 办法
 
3),刷写到 hbase。三种办法:一是,显式调用 BufferedMutator.flush
 
二是,发送完结的时候调用 BufferedMutator.close
 
三是,它依据以后缓存大于了设置的写缓存大小 

而后我有找到了一份 flink 官网的 HBasesink 源码然而官网确没有他的介绍, 相较下面更残缺并且有了 checkpoint 和异步写入异样的设置 默认线程池为一


import java.io.IOException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.util.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
import org.apache.hadoop.hbase.client.BufferedMutator.ExceptionListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
@Internal
public class HBaseSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction, ExceptionListener {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseSinkFunction.class);
    private final String hTableName;
    private final byte[] serializedConfig;
    private final long bufferFlushMaxSizeInBytes;
    private final long bufferFlushMaxMutations;
    private final long bufferFlushIntervalMillis;
    private final HBaseMutationConverter<T> mutationConverter;
    private transient Connection connection;
    private transient BufferedMutator mutator;
    private transient ScheduledExecutorService executor;
    private transient ScheduledFuture scheduledFuture;
    private transient AtomicLong numPendingRequests;
    private transient volatile boolean closed = false;
    private final AtomicReference<Throwable> failureThrowable = new AtomicReference();
 
    public HBaseSinkFunction(String hTableName, Configuration conf, HBaseMutationConverter<T> mutationConverter, long bufferFlushMaxSizeInBytes, long bufferFlushMaxMutations, long bufferFlushIntervalMillis) {
        this.hTableName = hTableName;
        this.serializedConfig = HBaseConfigurationUtil.serializeConfiguration(conf);
        this.mutationConverter = mutationConverter;
        this.bufferFlushMaxSizeInBytes = bufferFlushMaxSizeInBytes;
        this.bufferFlushMaxMutations = bufferFlushMaxMutations;
        this.bufferFlushIntervalMillis = bufferFlushIntervalMillis;
    }
 
    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {LOG.info("start open ...");
        Configuration config = this.prepareRuntimeConfiguration();
 
        try {this.mutationConverter.open();
            this.numPendingRequests = new AtomicLong(0L);
            if (null == this.connection) {this.connection = ConnectionFactory.createConnection(config);
            }
 
            BufferedMutatorParams params = (new BufferedMutatorParams(TableName.valueOf(this.hTableName))).listener(this);
            if (this.bufferFlushMaxSizeInBytes > 0L) {params.writeBufferSize(this.bufferFlushMaxSizeInBytes);
            }
 
            this.mutator = this.connection.getBufferedMutator(params);
            if (this.bufferFlushIntervalMillis > 0L && this.bufferFlushMaxMutations != 1L) {this.executor = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture = this.executor.scheduleWithFixedDelay(() -> {if (!this.closed) {
                        try {this.flush();
                        } catch (Exception var2) {this.failureThrowable.compareAndSet((Throwable) null, var2);
                        }
 
                    }
                }, this.bufferFlushIntervalMillis, this.bufferFlushIntervalMillis, TimeUnit.MILLISECONDS);
            }
        } catch (TableNotFoundException var4) {LOG.error("The table" + this.hTableName + "not found", var4);
            throw new RuntimeException("HBase table'" + this.hTableName + "'not found.", var4);
        } catch (IOException var5) {LOG.error("Exception while creating connection to HBase.", var5);
            throw new RuntimeException("Cannot create connection to HBase.", var5);
        }
 
        LOG.info("end open.");
    }
 
    private Configuration prepareRuntimeConfiguration() throws IOException {Configuration runtimeConfig = HBaseConfigurationUtil.deserializeConfiguration(this.serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get("hbase.zookeeper.quorum"))) {LOG.error("Can not connect to HBase without {} configuration", "hbase.zookeeper.quorum");
            throw new IOException("Check HBase configuration failed, lost:'hbase.zookeeper.quorum'!");
        } else {return runtimeConfig;}
    }
 
    private void checkErrorAndRethrow() {Throwable cause = (Throwable)this.failureThrowable.get();
        if (cause != null) {throw new RuntimeException("An error occurred in HBaseSink.", cause);
        }
    }
 
    @Override
    public void invoke(T value, Context context) throws Exception {this.checkErrorAndRethrow();
        this.mutator.mutate(this.mutationConverter.convertToMutation(value));
        if (this.bufferFlushMaxMutations > 0L && this.numPendingRequests.incrementAndGet() >= this.bufferFlushMaxMutations) {this.flush();
        }
 
    }
 
    private void flush() throws IOException {this.mutator.flush();
        this.numPendingRequests.set(0L);
        this.checkErrorAndRethrow();}
 
    @Override
    public void close() throws Exception {
        this.closed = true;
        if (this.mutator != null) {
            try {this.mutator.close();
            } catch (IOException var3) {LOG.warn("Exception occurs while closing HBase BufferedMutator.", var3);
            }
 
            this.mutator = null;
        }
 
        if (this.connection != null) {
            try {this.connection.close();
            } catch (IOException var2) {LOG.warn("Exception occurs while closing HBase Connection.", var2);
            }
 
            this.connection = null;
        }
 
        if (this.scheduledFuture != null) {this.scheduledFuture.cancel(false);
            if (this.executor != null) {this.executor.shutdownNow();
            }
        }
 
    }
 
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {while(this.numPendingRequests.get() != 0L) {this.flush();
        }
 
    }
 
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception { }
 
    @Override
    public void onException(RetriesExhaustedWithDetailsException exception, BufferedMutator mutator) throws RetriesExhaustedWithDetailsException {this.failureThrowable.compareAndSet((Throwable) null, exception);
    }
}

### 主代码

主要参数 表名,hbase 配置信息,HBaseMutationConverter 数据转换器, 缓冲区最大大小字节, 缓冲区最大写入申请数, 缓冲区刷新距离


package com.bool.ods;
 
import com.bool.util.HBaseSinkFunction;
import com.bool.util.HbaseSink;
import com.bool.util.MyKafkaUtil;
 
 
import org.apache.flink.connector.hbase.sink.HBaseMutationConverter;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
 
import java.util.Date;
 
/**
 * @author WangKaiYu
 * @date 2022-02-15 17:19
 */
public class HbaseSinkTest {public static void main(String[] args) throws Exception {
        String Tablename="student";
        Configuration configuration;
        configuration= HBaseConfiguration.create();
        configuration.set("hbase.master", "192.168.10.154:16000");
        configuration.set("hbase.zookeeper.quorum", "192.168.10.154,192.168.10.155,192.168.10.196");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
//        DataStreamSource<String> streamSource = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("hbasetest","hbasetest"));
        int count= 0;
        DataStreamSource<String> streamSource = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer("test","flink-hbase-tset"));
 
        BufferedMutator mutator;
        // 表名,hbase 配置信息,HBaseMutationConverter 数据转换器, 缓冲区最大大小字节, 缓冲区最大写入申请数, 缓冲区刷新距离
        streamSource.addSink(new HBaseSinkFunction<String>(Tablename, configuration, new HBaseMutationConverter<String>() {
            @Override
            public void open() {BufferedMutator mutator;}
 
            @Override
            public Mutation convertToMutation(String value) {
                String familyName = "info";
                String[] values = value.split(",");
                Put put = new Put(Bytes.toBytes(values[0]+System.currentTimeMillis()));
                System.out.println(System.currentTimeMillis());
                for (int i = 1; i < values.length; i++) {put.addColumn(Bytes.toBytes(familyName),Bytes.toBytes("FN"+i),Bytes.toBytes(values[i]));
                }
                return put;
            }
        },1024*1024*5,1000,3));
        System.currentTimeMillis();
 
        env.execute();}
}

正文完
 0