生产者要将发送的数据转化为字节数组能力通过网络动员给Kafka,对于一些简略的数据,Kafka自带了一些序列化工具。

//创立生产者实例private static Producer<String , String> createProducer(){    Properties properties = new Properties();    properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);    properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);    properties.put("serializer.class" , StringEncoder.class.getName());        return new Producer<String, String>(new ProducerConfig(properties));}

在通常的微服务中,服务之间须要频繁的传递各种负责的数据结构,然而kafka仅仅反对简略的类型如String,Integer。于是咱们在服务之间应用JSONObject,因为JSON能够很容易的转化为String,而String的序列化和反序列化曾经被反对。

JSONObject jsonObject = new JSONObject();jsonObject.put("logFileName", logFileName);jsonObject.put("logFileOffset", logFileOffset);jsonObject.put("dbName", dbName);jsonObject.put("tableName", tableName);jsonObject.put("eventType", eventType);jsonObject.put("columnValueList", columnValueList);jsonObject.put("emptyCount", emptyCount);jsonObject.put("timestamp", timestamp);//拼接所有binlog解析的字段String data = JSON.toJSONString(jsonObject);// 解析后的数据发送到kafkaKafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);

ResourceBundle类是用来读取propertise资源文件的,能够在初始化时把配置项全部一次读入,并保留在动态成员变量中。防止每次须要的时候才去读取相干配置文件的class,I/O速度慢,容易造成性能上的瓶颈。

//读取application.properties文件private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");public static String canalHost= resourceBundle.getString("canal.host");public static String canalPort = resourceBundle.getString("canal.port");public static String canalInstance = resourceBundle.getString("canal.instance");public static String mysqlUsername = resourceBundle.getString("mysql.username");public static String mysqlPassword=  resourceBundle.getString("mysql.password");public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");public static String kafkaInput = resourceBundle.getString("kafka.input.topic");

残缺代码

#pom文件<dependency>      <groupId>com.alibaba.otter</groupId>      <artifactId>canal.client</artifactId>      <version>1.0.24</version>  </dependency>  <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->  <dependency>      <groupId>org.apache.kafka</groupId>      <artifactId>kafka_2.11</artifactId>      <version>0.9.0.1</version>      <exclusions>          <exclusion>              <groupId>org.slf4j</groupId>              <artifactId>slf4j-log4j12</artifactId>          </exclusion>      </exclusions>  </dependency>    <!--对象和json 相互转换的-->  <dependency>      <groupId>com.alibaba</groupId>      <artifactId>fastjson</artifactId>      <version>1.2.44</version>  </dependency>
import java.util.Locale;  import java.util.ResourceBundle;    /**   * 配置文件的公共类   */  public class GlobalConfigUtil {        //读取application.properties文件   private static ResourceBundle resourceBundle = ResourceBundle.getBundle("application");        public static String canalHost= resourceBundle.getString("canal.host");      public static String canalPort = resourceBundle.getString("canal.port");      public static String canalInstance = resourceBundle.getString("canal.instance");      public static String mysqlUsername = resourceBundle.getString("mysql.username");      public static String mysqlPassword=  resourceBundle.getString("mysql.password");      public static String kafkaBootstrap= resourceBundle.getString("kafka.bootstrap.servers");      public static String kafkaZookeeper= resourceBundle.getString("kafka.zookeeper.connect");      public static String kafkaInput = resourceBundle.getString("kafka.input.topic");        public static void main(String[] args) {          System.out.println(canalHost);      }  }
import kafka.javaapi.producer.Producer;  import kafka.producer.KeyedMessage;  import kafka.producer.ProducerConfig;  import kafka.serializer.StringEncoder;    import java.util.Properties;    /**   * Kafka生产音讯工具类   */  public class KafkaSender {      private String topic;        public KafkaSender(String topic){          super();          this.topic = topic;      }        /**   * 发送音讯到Kafka指定topic   * * @param topic topic名字   * @param key 键值   * @param data 数据   */   public static void sendMessage(String topic , String key , String data){          Producer<String, String> producer = createProducer();          producer.send(new KeyedMessage<String , String>(topic , key , data));      }        /**   * 创立生产者实例   * @return   */   private static Producer<String , String> createProducer(){          Properties properties = new Properties();            properties.put("metadata.broker.list" , GlobalConfigUtil.kafkaBootstrap);          properties.put("zookeeper.connect" , GlobalConfigUtil.kafkaZookeeper);          properties.put("serializer.class" , StringEncoder.class.getName());            return new Producer<String, String>(new ProducerConfig(properties));      }  }
import com.alibaba.fastjson.JSON;  import com.alibaba.fastjson.JSONObject;  import com.alibaba.otter.canal.client.CanalConnector;  import com.alibaba.otter.canal.client.CanalConnectors;  import com.alibaba.otter.canal.protocol.CanalEntry;  import com.alibaba.otter.canal.protocol.Message;    import java.net.InetSocketAddress;  import java.util.ArrayList;  import java.util.List;  import java.util.UUID;    /**   * Canal解析binlog日志工具类   */  public class CanalClient {        static class ColumnValuePair {          private String columnName;          private String columnValue;          private Boolean isValid;            public ColumnValuePair(String columnName, String columnValue, Boolean isValid) {              this.columnName = columnName;              this.columnValue = columnValue;              this.isValid = isValid;          }            public String getColumnName() { return columnName; }          public void setColumnName(String columnName) { this.columnName = columnName; }          public String getColumnValue() { return columnValue; }          public void setColumnValue(String columnValue) { this.columnValue = columnValue; }          public Boolean getIsValid() { return isValid; }          public void setIsValid(Boolean isValid) { this.isValid = isValid; }      }        /**   * 获取Canal连贯   *   * @param host 主机名   * @param port 端口号   * @param instance Canal实例名   * @param username 用户名   * @param password 明码   * @return Canal连接器   */   public static CanalConnector getConn(String host, int port, String instance, String username, String password) {          CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(host, port), instance, username, password);          return canalConnector;      }        /**   * 解析Binlog日志   *   * @param entries Binlog音讯实体   * @param emptyCount 操作的序号   */   public static void analysis(List<CanalEntry.Entry> entries, int emptyCount) {          for (CanalEntry.Entry entry : entries) {              // 只解析mysql事务的操作,其余的不解析   if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN ||                      entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {                  continue;              }                // 那么解析binlog   CanalEntry.RowChange rowChange = null;                try {                  rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());              } catch (Exception e) {                  e.printStackTrace();              }                // 获取操作类型字段(减少  删除  批改)   CanalEntry.EventType eventType = rowChange.getEventType();              // 获取binlog文件名称   String logfileName = entry.getHeader().getLogfileName();              // 读取以后操作在binlog文件的地位   long logfileOffset = entry.getHeader().getLogfileOffset();              // 获取以后操作所属的数据库   String dbName = entry.getHeader().getSchemaName();              // 获取以后操作所属的表   String tableName = entry.getHeader().getTableName();//以后操作的是哪一张表   long timestamp = entry.getHeader().getExecuteTime();//执行工夫     // 解析操作的行数据   for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {                  // 删除操作   if (eventType == CanalEntry.EventType.DELETE) {                      // 获取删除之前的所有列数据   dataDetails(rowData.getBeforeColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);                  }                  // 新增操作   else if (eventType == CanalEntry.EventType.INSERT) {                      // 获取新增之后的所有列数据   dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);                  }                  // 更新操作   else {                      // 获取更新之后的所有列数据   dataDetails(rowData.getAfterColumnsList(), logfileName, logfileOffset, dbName, tableName, eventType, emptyCount,timestamp);                  }              }          }      }        /**   * 解析具体一条Binlog音讯的数据   *   * @param columns 以后行所有的列数据   * @param logFileName binlog文件名   * @param logFileOffset 以后操作在binlog中的地位   * @param dbName 以后操作所属数据库名称   * @param tableName 以后操作所属表名称   * @param eventType 以后操作类型(新增、批改、删除)   * @param emptyCount 操作的序号   */   private static void dataDetails(List<CanalEntry.Column> columns,                                      String logFileName,                                      Long logFileOffset,                                      String dbName,                                      String tableName,                                      CanalEntry.EventType eventType,                                      int emptyCount,                                      long timestamp) {            // 找到以后那些列产生了扭转  以及扭转的值   List<ColumnValuePair> columnValueList = new ArrayList<ColumnValuePair>();            for (CanalEntry.Column column : columns) {              ColumnValuePair columnValuePair = new ColumnValuePair(column.getName(), column.getValue(), column.getUpdated());              columnValueList.add(columnValuePair);          }            String key = UUID.randomUUID().toString();          JSONObject jsonObject = new JSONObject();  //        jsonObject.put("logFileName", logFileName);  //        jsonObject.put("logFileOffset", logFileOffset);   jsonObject.put("dbName", dbName);          jsonObject.put("tableName", tableName);          jsonObject.put("eventType", eventType);          jsonObject.put("columnValueList", columnValueList);  //        jsonObject.put("emptyCount", emptyCount);  //        jsonObject.put("timestamp", timestamp);       // 拼接所有binlog解析的字段   String data = JSON.toJSONString(jsonObject);            System.out.println("【JSON】" + data);            // 解析后的数据发送到kafka   KafkaSender.sendMessage(GlobalConfigUtil.kafkaInput, JSON.toJSONString(key), data);      }        /**   * 客户端入口办法   * @param args   */   public static void main(String[] args) {          // 加载配置文件   String host = GlobalConfigUtil.canalHost;          int port = Integer.parseInt(GlobalConfigUtil.canalPort);          String instance = GlobalConfigUtil.canalInstance;          String username = GlobalConfigUtil.mysqlUsername;          String password = GlobalConfigUtil.mysqlPassword;            // 获取Canal连贯   CanalConnector conn = getConn(host, port, instance, username, password);            // 从binlog中读取数据   int batchSize = 100;          int emptyCount = 1;            try {              conn.connect();              conn.subscribe(".*..*");              conn.rollback();                int totalCount = 120; //循环次数     while (emptyCount < totalCount) {                  // 获取数据   Message message = conn.getWithoutAck(batchSize);                    long id = message.getId();                  int size = message.getEntries().size();                  if (id == -1 || size == 0) {                      emptyCount=0;                      //没有读取到任何数据   System.out.println("目前没有读取到任何数据...");                  } else {                      //有数据,那么解析binlog日志   analysis(message.getEntries(), emptyCount);                      emptyCount++;                  }                  // 确认音讯   conn.ack(message.getId());              }          } catch (Exception e) {              e.printStackTrace();          } finally {              conn.disconnect();          }      }  }
#application.properties, 以下请更改为自已的数据库信息  canal.host=xxx.xx.xxx.xxx  canal.port=11111  canal.instance=example  mysql.username=root  mysql.password=xxxxxx  kafka.bootstrap.servers = xxx.xx.xxx.xxx:9092  kafka.zookeeper.connect = xxx.xx.xxx.xxx:2182  kafka.input.topic=test

具体代码请移步:SimpleMysqlCanalKafkaSample