生产者要将发送的数据转化为字节数组能力通过网络动员给 Kafka,对于一些简略的数据,Kafka 自带了一些序列化工具。
// 创立生产者实例
private static Producer<String , String> createProducer(){Properties properties = new Properties();
properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
properties.put("serializer.class" , StringEncoder.class.getName());
return new Producer<String, String>(new ProducerConfig(properties));
}
在通常的微服务中,服务之间须要频繁的传递各种负责的数据结构,然而 kafka 仅仅反对简略的类型如 String,Integer。于是咱们在服务之间应用 JSONObject,因为 JSON 能够很容易的转化为 String,而 String 的序列化和反序列化曾经被反对。
JSONObject jsonObject = new JSONObject();
jsonObject.put("logFileName", logFileName);
jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
jsonObject.put("emptyCount", emptyCount);
jsonObject.put("timestamp", timestamp);
// 拼接所有 binlog 解析的字段
String data = JSON.toJSONString(jsonObject);
// 解析后的数据发送到 kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);
ResourceBundle 类是用来读取 propertise 资源文件的,能够在初始化时把配置项全部一次读入,并保留在动态成员变量中。防止每次须要的时候才去读取相干配置文件的 class,I/ O 速度慢,容易造成性能上的瓶颈。
// 读取 application.properties 文件
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword= resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
残缺代码
#pom 文件
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 对象和 json 相互转换的 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
import java.util.Locale;
import java.util.ResourceBundle;
/**
* 配置文件的公共类
*/
public class GlobalConfigUtil {
// 读取 application.properties 文件
private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");
public static String canalHost= resourceBundle.getString("canal.host");
public static String canalPort = resourceBundle.getString("canal.port");
public static String canalInstance = resourceBundle.getString("canal.instance");
public static String mysqlUsername = resourceBundle.getString("mysql.username");
public static String mysqlPassword= resourceBundle.getString("mysql.password");
public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");
public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");
public static String kafkaInput = resourceBundle.getString("kafka.input.topic");
public static void main(String[] args) {System.out.println(canalHost);
}
}
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import java.util.Properties;
/**
* Kafka 生产音讯工具类
*/
public class KafkaSender {
private String topic;
public KafkaSender(String topic){super();
this.topic = topic;
}
/**
* 发送音讯到 Kafka 指定 topic
* * @param topic topic 名字
* @param key 键值
* @param data 数据
*/
public static void sendMessage(String topic , String key , String data){Producer<String, String> producer = createProducer();
producer.send(new KeyedMessage<String , String>(topic , key , data));
}
/**
* 创立生产者实例
* @return
*/
private static Producer<String , String> createProducer(){Properties properties = new Properties();
properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);
properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);
properties.put("serializer.class" , StringEncoder.class.getName());
return new Producer<String, String>(new ProducerConfig(properties));
}
}
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
/**
* Canal 解析 binlog 日志工具类
*/
public class CanalClient {
static class ColumnValuePair {
private String columnName;
private String columnValue;
private Boolean isValid;
public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {
this.columnName = columnName;
this.columnValue = columnValue;
this.isValid = isValid;
}
public String getColumnName() { return columnName;}
public void setColumnName(String columnName) {this.columnName = columnName;}
public String getColumnValue() { return columnValue;}
public void setColumnValue(String columnValue) {this.columnValue = columnValue;}
public Boolean getIsValid() { return isValid;}
public void setIsValid(Boolean isValid) {this.isValid = isValid;}
}
/**
* 获取 Canal 连贯
*
* @param host 主机名
* @param port 端口号
* @param instance Canal 实例名
* @param username 用户名
* @param password 明码
* @return Canal 连接器
*/
public static CanalConnector getConn(String host, int port, String instance, String username, String password) {CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);
return canalConnector;
}
/**
* 解析 Binlog 日志
*
* @param entries Binlog 音讯实体
* @param emptyCount 操作的序号
*/
public static void analysis(List<CanalEntry.Entry> entries, int emptyCount) {for (CanalEntry.Entry entry : entries) {
// 只解析 mysql 事务的操作,其余的不解析
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||
entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}
// 那么解析 binlog
CanalEntry.RowChange rowChange = null;
try {rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {e.printStackTrace();
}
// 获取操作类型字段(减少 删除 批改)CanalEntry.EventType eventType = rowChange.getEventType();
// 获取 binlog 文件名称
String logfileName = entry.getHeader().getLogfileName();
// 读取以后操作在 binlog 文件的地位
long logfileOffset = entry.getHeader().getLogfileOffset();
// 获取以后操作所属的数据库
String dbName = entry.getHeader().getSchemaName();
// 获取以后操作所属的表
String tableName = entry.getHeader().getTableName();// 以后操作的是哪一张表
long timestamp = entry.getHeader().getExecuteTime();// 执行工夫
// 解析操作的行数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 删除操作
if (eventType == CanalEntry.EventType.DELETE) {
// 获取删除之前的所有列数据
dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
// 新增操作
else if (eventType == CanalEntry.EventType.INSERT) {
// 获取新增之后的所有列数据
dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
// 更新操作
else {
// 获取更新之后的所有列数据
dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);
}
}
}
}
/**
* 解析具体一条 Binlog 音讯的数据
*
* @param columns 以后行所有的列数据
* @param logFileName binlog 文件名
* @param logFileOffset 以后操作在 binlog 中的地位
* @param dbName 以后操作所属数据库名称
* @param tableName 以后操作所属表名称
* @param eventType 以后操作类型(新增、批改、删除)* @param emptyCount 操作的序号
*/
private static void dataDetails(List<CanalEntry.Column> columns,
String logFileName,
Long logFileOffset,
String dbName,
String tableName,
CanalEntry.EventType eventType,
int emptyCount,
long timestamp) {
// 找到以后那些列产生了扭转 以及扭转的值
List<ColumnValuePair> columnValueList = new ArrayList<ColumnValuePair>();
for (CanalEntry.Column column : columns) {ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());
columnValueList.add(columnValuePair);
}
String key = UUID.randomUUID().toString();
JSONObject jsonObject = new JSONObject();
// jsonObject.put("logFileName", logFileName);
// jsonObject.put("logFileOffset", logFileOffset);
jsonObject.put("dbName", dbName);
jsonObject.put("tableName", tableName);
jsonObject.put("eventType", eventType);
jsonObject.put("columnValueList", columnValueList);
// jsonObject.put("emptyCount", emptyCount);
// jsonObject.put("timestamp", timestamp);
// 拼接所有 binlog 解析的字段
String data = JSON.toJSONString(jsonObject);
System.out.println("【JSON】" + data);
// 解析后的数据发送到 kafka
KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);
}
/**
* 客户端入口办法
* @param args
*/
public static void main(String[] args) {
// 加载配置文件
String host = GlobalConfigUtil.canalHost;
int port = Integer.parseInt(GlobalConfigUtil.canalPort);
String instance = GlobalConfigUtil.canalInstance;
String username = GlobalConfigUtil.mysqlUsername;
String password = GlobalConfigUtil.mysqlPassword;
// 获取 Canal 连贯
CanalConnector conn = getConn(host, port, instance, username, password);
// 从 binlog 中读取数据
int batchSize = 100;
int emptyCount = 1;
try {conn.connect();
conn.subscribe(".*..*");
conn.rollback();
int totalCount = 120; // 循环次数
while (emptyCount < totalCount) {
// 获取数据
Message message = conn.getWithoutAck(batchSize);
long id = message.getId();
int size = message.getEntries().size();
if (id == -1 || size == 0) {
emptyCount=0;
// 没有读取到任何数据
System.out.println("目前没有读取到任何数据...");
} else {
// 有数据,那么解析 binlog 日志
analysis(message.getEntries(), emptyCount);
emptyCount++;
}
// 确认音讯
conn.ack(message.getId());
}
} catch (Exception e) {e.printStackTrace();
} finally {conn.disconnect();
}
}
}
#application.properties, 以下请更改为自已的数据库信息
canal.host=xxx.xx.xxx.xxx
canal.port=11111
canal.instance=example
mysql.username=root
mysql.password=xxxxxx
kafka.bootstrap.servers = xxx.xx.xxx.xxx:9092
kafka.zookeeper.connect = xxx.xx.xxx.xxx:2182
kafka.input.topic=test
具体代码请移步:SimpleMysqlCanalKafkaSample