乐趣区

[case48]聊聊flink的SocketClientSink


本文主要研究一下 flink 的 SocketClientSink
DataStream.writeToSocket
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
/**
* Writes the DataStream to a socket as a byte array. The format of the
* output is specified by a {@link SerializationSchema}.
*
* @param hostName
* host of the socket
* @param port
* port of the socket
* @param schema
* schema for serialization
* @return the closed DataStream
*/
@PublicEvolving
public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema) {
DataStreamSink<T> returnStream = addSink(new SocketClientSink<>(hostName, port, schema, 0));
returnStream.setParallelism(1); // It would not work if multiple instances would connect to the same port
return returnStream;
}
DataStream 的 writeToSocket 方法,内部创建了 SocketClientSink,这里传递了四个构造参数,分别是 hostName、port、schema、maxNumRetries(这里为 0)
SocketClientSink
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
*
* <p>The sink can be set to retry message sends after the sending failed.
*
* <p>The sink can be set to ‘autoflush’, in which case the socket stream is flushed after every
* message. This significantly reduced throughput, but also decreases message latency.
*
* @param <IN> data to be written into the Socket.
*/
@PublicEvolving
public class SocketClientSink<IN> extends RichSinkFunction<IN> {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);

private static final int CONNECTION_RETRY_DELAY = 500;

private final SerializableObject lock = new SerializableObject();
private final SerializationSchema<IN> schema;
private final String hostName;
private final int port;
private final int maxNumRetries;
private final boolean autoFlush;

private transient Socket client;
private transient OutputStream outputStream;

private int retries;

private volatile boolean isRunning = true;

/**
* Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
* and will not auto-flush the stream.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema) {
this(hostName, port, schema, 0);
}

/**
* Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
* A value of -1 for the number of retries will cause the system to retry an infinite number of times.
* The sink will not auto-flush the stream.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema, int maxNumRetries) {
this(hostName, port, schema, maxNumRetries, false);
}

/**
* Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
* A value of -1 for the number of retries will cause the system to retry an infinite number of times.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
* @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN> schema,
int maxNumRetries, boolean autoflush) {
checkArgument(port > 0 && port < 65536, “port is out of range”);
checkArgument(maxNumRetries >= -1, “maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)”);

this.hostName = checkNotNull(hostName, “hostname must not be null”);
this.port = port;
this.schema = checkNotNull(schema);
this.maxNumRetries = maxNumRetries;
this.autoFlush = autoflush;
}

// ————————————————————————
// Life cycle
// ————————————————————————

/**
* Initialize the connection with the Socket in the server.
* @param parameters Configuration.
*/
@Override
public void open(Configuration parameters) throws Exception {
try {
synchronized (lock) {
createConnection();
}
}
catch (IOException e) {
throw new IOException(“Cannot connect to socket server at ” + hostName + “:” + port, e);
}
}

/**
* Called when new data arrives to the sink, and forwards it to Socket.
*
* @param value The value to write to the socket.
*/
@Override
public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);

try {
outputStream.write(msg);
if (autoFlush) {
outputStream.flush();
}
}
catch (IOException e) {
// if no re-tries are enable, fail immediately
if (maxNumRetries == 0) {
throw new IOException(“Failed to send message ‘” + value + “‘ to socket server at ”
+ hostName + “:” + port + “. Connection re-tries are not enabled.”, e);
}

LOG.error(“Failed to send message ‘” + value + “‘ to socket server at ” + hostName + “:” + port +
“. Trying to reconnect…” , e);

// do the retries in locked scope, to guard against concurrent close() calls
// note that the first re-try comes immediately, without a wait!

synchronized (lock) {
IOException lastException = null;
retries = 0;

while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {

// first, clean up the old resources
try {
if (outputStream != null) {
outputStream.close();
}
}
catch (IOException ee) {
LOG.error(“Could not close output stream from failed write attempt”, ee);
}
try {
if (client != null) {
client.close();
}
}
catch (IOException ee) {
LOG.error(“Could not close socket from failed write attempt”, ee);
}

// try again
retries++;

try {
// initialize a new connection
createConnection();

// re-try the write
outputStream.write(msg);

// success!
return;
}
catch (IOException ee) {
lastException = ee;
LOG.error(“Re-connect to socket server and send message failed. Retry time(s): ” + retries, ee);
}

// wait before re-attempting to connect
lock.wait(CONNECTION_RETRY_DELAY);
}

// throw an exception if the task is still running, otherwise simply leave the method
if (isRunning) {
throw new IOException(“Failed to send message ‘” + value + “‘ to socket server at ”
+ hostName + “:” + port + “. Failed after ” + retries + ” retries.”, lastException);
}
}
}
}

/**
* Closes the connection with the Socket server.
*/
@Override
public void close() throws Exception {
// flag this as not running any more
isRunning = false;

// clean up in locked scope, so there is no concurrent change to the stream and client
synchronized (lock) {
// we notify first (this statement cannot fail). The notified thread will not continue
// anyways before it can re-acquire the lock
lock.notifyAll();

try {
if (outputStream != null) {
outputStream.close();
}
}
finally {
if (client != null) {
client.close();
}
}
}
}

// ————————————————————————
// Utilities
// ————————————————————————

private void createConnection() throws IOException {
client = new Socket(hostName, port);
client.setKeepAlive(true);
client.setTcpNoDelay(true);

outputStream = client.getOutputStream();
}

// ————————————————————————
// For testing
// ————————————————————————

int getCurrentNumberOfRetries() {
synchronized (lock) {
return retries;
}
}
}

SocketClientSink 继承了 RichSinkFunction,其 autoFlush 属性默认为 false
open 方法里头调用了 createConnection,来初始化与 socket 的连接,如果此时出现 IOException,则立马 fail fast;createConnection 的时候,这里设置的 keepAlive 及 tcpNoDelay 均为 true
invoke 方法首先调用 schema.serialize 方法来序列化 value,然后调用 socket 的 outputStream.write,如果 autoFlush 为 true 的话,则立马 flush outputStream;如果出现 IOException 则立马进行重试,这里重试的逻辑直接写在 catch 里头,根据 maxNumRetries 来,重试的时候,就是先 createConnection,然后调用 outputStream.write,重试的 delay 为 CONNECTION_RETRY_DELAY(500)

小结

DataStream 的 writeToSocket 方法,内部创建了 SocketClientSink,默认传递的 maxNumRetries 为 0,而且没有调用带 autoFlush 属性默认为 false 的构造器,其 autoFlush 属性默认为 false
open 方法创建的 socket,其 keepAlive 及 tcpNoDelay 均为 true,如果 open 的时候出现 IOException,则里头抛出异常终止运行
invoke 方法比较简单,就是使用 SerializationSchema 来序列化 value,然后 write 到 outputStream;这里进行了简单的失败重试,默认的重试 delay 为 CONNECTION_RETRY_DELAY(500),这个版本实现的重试比较简单,是同步进行的

doc
SocketClientSink

退出移动版