# 作者介绍
蒋鹏程,苏州万店掌软件技术有限公司

前言

CloudCanal 近期提供了自定义代码构建宽表能力,咱们第一工夫参加了该个性内测,并已落地生产稳固运行。开发流程详见官网文档 《CloudCanal自定义代码实时加工》。

能力特点包含:

  • 灵便,反对反查打宽表,特定逻辑数据荡涤,对账,告警等场景
  • 调试不便,通过工作参数配置主动关上 debug 端口,对接 IDE 调试
  • SDK 接口清晰,提供丰盛的上下文信息,不便数据逻辑开发

本文基于咱们业务中的理论需要(MySQL -> ElasticSearch 宽表构建),梳理一下具体的开发调试流程,心愿对大家有所帮忙。

应用案例

案例一:商品表和SKU宽表行构建

业务背景

在对接用户的小程序进行商品搜寻时,须要如下几个能力

  1. 基于分词的全文索引
  2. 同时搜寻不同表中的字段

须要全文索引的初衷是心愿用户搜寻商品的关键词就能够搜寻到想要的商品。这在传统数据库中个别反对的都比拟弱甚至不反对,因而须要借助 ES 分词器搜寻。

而第二个能力次要是因为业务数据通常散布在多个表中,然而 ES 并不能像须要关系型数据库那样联表查问,CloudCanal 自定义代码的能力则整号解决了咱们多表关联的痛点。

业务流程

在应用 CloudCanal 总体的流程变得非常清晰,在 CloudCanal 层面通过订阅表联合自定义代码中的反查数据库以及数据处理,能够间接生成能够写到对端 ES 的宽表行。

表构造

筹备的 mysql 表构造如下,一个商品会对应多个 SKU,咱们在对端创立好索引,其中的 sku_detail 保留一个商品关联的 SKU 信息,是一个典型的一对多场景。

ES mapping 中的字段对应主表 tb_enterprise_goods 中字段,额定新增的 sku_detail 字段就是咱们须要从子表 tb_enterprise_sku 中同步的数据。

## 商品表CREATE TABLE `tb_enterprise_goods` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `name` varchar(64) NOT NULL DEFAULT '' COMMENT '商品名称',  `enterprise_id` int(11) NOT NULL DEFAULT '0' COMMENT '企业id',  `goods_no` varchar(50) NOT NULL DEFAULT '' COMMENT '商家商品编号',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=9410 DEFAULT CHARSET=utf8mb4;
## SKU表CREATE TABLE `tb_enterprise_sku` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `enterprise_goods_id` int(11) NOT NULL COMMENT '企业商品id',  `name` varchar(255) NOT NULL DEFAULT '' COMMENT 'sku{1:2,2:1}',  `sku_no` varchar(255) DEFAULT '' COMMENT '商品sku编码',  `scan_goods` varchar(255) CHARACTER SET utf8 NOT NULL DEFAULT '' COMMENT 'sku条形码',  PRIMARY KEY (`id`),) ENGINE=InnoDB AUTO_INCREMENT=14397 DEFAULT CHARSET=utf8mb4 COMMENT='企业 sku';

ES 索引如下:

      "enterprise_id": {        "type": "integer"      },      "goods_no": {        "type": "text",        "analyzer": "custom_e",        "fields": {          "keyword": {            "type": "keyword"          }        }      },      "id": {        "type": "integer"      },      "name": {        "type": "text",        "analyzer": "ik_max_word",        "search_analyzer": "ik_smart",        "fields": {          "standard": {            "type": "text",            "analyzer": "standard"          },          "keyword":{            "type": "keyword"          }        },        "fielddata": true      },      "sku_detail": {        "type": "nested",        "properties": {          "id": {            "type": "integer"          },          "sku_no": {            "type": "text",            "analyzer": "custom_e",            "fields": {              "keyword": {                "type": "keyword"              }            }          },          "scan_goods": {            "type": "text",            "analyzer": "custom_e",            "fields": {              "keyword": {                "type": "keyword"              }            }          }
注:为了不便大家了解,此处表字段进行了缩减

自定义代码工作流程

自定义代码源码

public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) {        List<CustomRecord> customRecordList=new ArrayList<>();        String idStr = (customRecord.getFieldMapAfter().get("id")).toString();        List<EnterpriseSku> enterpriseSkuList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));        if (enterpriseSkuList.size() > 0) {            Map<String, Object> addFieldValueMap = new LinkedHashMap<>();            addFieldValueMap.put("sku_detail", JSONArray.parseArray(JSON.toJSONString(enterpriseSkuList)));            RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap);        }        customRecordList.add(customRecord);        return customRecordList;    }public List<CustomRecord> updateData(CustomRecord customRecord, DataSource dataSource) {        List<CustomRecord> customRecordList=new ArrayList<>();        String idStr = (customRecord.getFieldMapAfter().get("id")).toString();        List<EnterpriseSku> enterpriseSkuList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));        if (enterpriseSkuList.size() > 0) {            Map<String, Object> addFieldValueMap = new LinkedHashMap<>();            addFieldValueMap.put("sku_detail", JSONArray.parseArray(JSON.toJSONString(enterpriseSkuList)));            RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap);        }        customRecordList.add(customRecord);        return customRecordList;    }private List<EnterpriseSku> tryQuerySourceDs(DataSource dataSource, Integer id) {        try(Connection connection = dataSource.getConnection();            PreparedStatement ps = connection.prepareStatement("select * from `live-mini`.tb_enterprise_sku where is_del=0 and enterprise_goods_id=" + id)) {            ResultSet resultSet = ps.executeQuery();            BeanListHandler<EnterpriseSku> bh = new BeanListHandler(EnterpriseSku.class);            List<EnterpriseSku> enterpriseSkuList = bh.handle(resultSet);            return enterpriseSkuList;        } catch (Exception e) {            esLogger.error(e.getMessage());            return new ArrayList<>();        }    }

思路

customRecord 对象即自定义代码传入的参数,传入的 id 为子表 tb_enterprise_sku 的外键 enterprise_goods_id,查问出子表对于这个外键的所有数据,放入 addFieldValueMap 中,再利用源码提供的办法RecordBuilder.modifyRecordBuilder(customRecord).addField(addFieldValueMap),对 customRecord 进行加工。

创立工作步骤

新建源端对端数据源

抉择订阅表及同步到对端的索引

抉择同步字段,抉择自定义包

实现创立工作

实现成果

{      "_index" : "live-mini_pro_enterprise_goods_sku_view",        "_type" : "_doc",        "_id" : "17385",        "_score" : 12.033585,        "_source" : {          "img" : "https://ovopark.oss-cn-hangzhou.aliyuncs.com/wanji/2020-11-30/1606786889982.jpg",          "category_name" : "无类目",          "is_grounding" : 1,          "del_time" : "2021-11-01T17:13:32+08:00",          "goods_no" : "",          "distribute_second" : 0.0,          "uniform_proportion" : 0,          "description" : "赠送私域直播流量转化平台万集&线上商城",          "video" : "",          "self_uniform_proportion" : 0,          "update_time" : "2021-11-01T17:13:32+08:00",          "allocate_video" : null,          "self_commission_properation" : 0.0,          "category_id" : 0,          "is_promote" : 0,          "price" : 0.03,          "is_distributor_self" : 0,          "limit_purchases_max_quantity" : 0,          "limit_purchases_type" : 0,          "is_del" : 0,          "is_distributor" : 0,          "activity_price" : 0.0,          "id" : 17385,          "stock" : 0,          "distribute_first" : 0.0,          "is_distribution_threshold" : 0,          "refund_configure" : 1,          "create_time" : "2021-11-01T17:13:32+08:00",          "scan_goods" : "",          "limit_purchases_cycle" : 0,          "is_sku" : 1,          "allocate_mode" : 0,          "sku_detail" : [            {              "scan_goods" : "",              "sku_no" : "",              "id" : "19943"            }          ],          "enterprise_id" : 24,          "is_delivery" : 0,          "is_limit_purchases" : 0,          "name" : "测试商品测试商品测试商品测试商",          "goods_type" : 0,          "goods_order" : 0,          "ts" : "2021-11-01T17:16:42+08:00",          "delivery_price" : 0.0        }      }

案例二:订单表、商品表宽表构建

业务背景

小程序商城中须要展现猜你喜爱的商品,对猜你喜爱商品是依据用户购买商品的频率来决定,次要波及订单表,订单商品表,用户表,商品表等,应用ES 查问同样面临多表无奈 join 的问题,本案例中仍然采纳 CloudCanal 自定义代码同步为扁平化数据。

业务原应用技术及问题

同步 ES 的计划原先应用 logstash 的形式全量同步数据,因为数据量的问题,同步数据放在每日的凌晨,带来的问题为,数据同步不及时,并且只能是全量危险比拟高。屡次呈现删除索引数据后并没有同步的状况。

表构造

CREATE TABLE `tb_order` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `order_sn` varchar(32) NOT NULL COMMENT '订单编号',  `user_id` int(11) NOT NULL COMMENT '用户 id',  `user_name` varchar(255) DEFAULT NULL COMMENT '用户名称',  `user_phone` varchar(11) DEFAULT NULL COMMENT '用户电话',  `store_id` int(11) NOT NULL COMMENT '门店 id',  `enterprise_id` int(11) DEFAULT '1' COMMENT '企业id',  `order_type` int(11) NOT NULL COMMENT '0:快递配送;1:门店自取; 2:美团配送即时单; 3:美团即时配送预约单;',  `order_status` tinyint(11) DEFAULT '0' COMMENT '原订单状态:1:未付款,3:待发货/待打包,5:(待收货/待取货),6:交易实现,7:订单生效,8:交易敞开, 13:用戶勾销,18:商家强制敞开,19批准退款然而退款失敗(未用到),30:美团即时配送状态异样',  `total_price` decimal(10,2) DEFAULT '0.00' COMMENT '订单总价',  PRIMARY KEY (`id`,`total_goods_weight`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=18630 DEFAULT CHARSET=utf8mb4 COMMENT='订单表';CREATE TABLE `tb_order_goods` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `user_id` int(11) NOT NULL COMMENT '用户 id',  `order_id` int(11) NOT NULL COMMENT '订单 id',  `goods_id` int(11) NOT NULL COMMENT '订单商品 id',  `enterprise_goods_id` varchar(11) DEFAULT NULL COMMENT '企业商品id',  `name` varchar(512) DEFAULT '' COMMENT '订单商品名称',  `spec` varchar(100) DEFAULT NULL COMMENT '规格属性',  `img` varchar(100) DEFAULT '' COMMENT '订单商品图片',  PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=19159 DEFAULT CHARSET=utf8mb4 COMMENT='订单商品表';

ES 索引字段

"store_id":{        "type": "integer"      },      "user_id":{        "type": "integer"      },      "sex":{        "type": "integer"      },      "birthday":{        "type": "keyword"      },      "goods_name":{        "type": "text",        "analyzer" : "ik_max_word",        "search_analyzer" : "ik_smart",        "fields": {          "keyword":{            "type": "keyword"          }        },        "fielddata": true      },      "goods_type":{        "type": "integer"      },      "order_goods_id":{        "type": "integer"      },      "enterprise_goods_id":{        "type": "integer"      },      "goods_price":{        "type": "double"      },      "order_id":{        "type": "integer"      },      "order_create_time":{        "type": "date"      }
注:ES表构造中波及多张表,为了不便举例,这边只贴出2张表。es_doc展现纬度为订单商品纬度。

实现流程

订阅订单表

订阅字段

画出横线的即为须要同步的字段,有一个点须要特地留神:ES 中须要展现的字段肯定要勾上同步,不勾上的话在自定义代码中 add 后 也不会被同步 官网给出的解释为字段黑白名单。
这里有几个细节点,订阅的表的维度并非 ES 存储数据的维度,所以这边的 id 并不是 ES 的 _id,对于这种须要在源端同步必须传的字段,设置对端字段能够随便设置一个对端已有的字段,在自定义代码中能够灵便的去重新配置须要同步的字段。(如果设置默认,ES 的 index 会创立出这个字段,这显然不是咱们想要看到的成果)

业务流程

代码实现

查问扁平化数据

SELECT    to2.store_id,    tuc.id AS user_id,    tuc.sex AS sex,    tuc.birthday,    tog.NAME AS goods_name,    tog.goods_type,    tog.goods_id AS order_goods_id,    tog.goods_price,    tog.create_time AS order_create_time,    tog.id AS order_id,    tog.enterprise_goods_id AS enterprise_goods_id FROM    `live-mini`.tb_order to2    INNER JOIN `live-mini`.tb_order_goods tog ON to2.id = tog.order_id     AND tog.is_del = 0     AND to2.user_id = tog.user_id    INNER JOIN `live-mini`.tb_user_c tuc ON to2.user_id = tuc.id     AND tuc.is_del = 0 WHERE    to2.is_del = 0     AND to2.id= #{占位}GROUP BY tog.id

思路:自定义代码获取 order 表的主键后,查问下面的 SQL,先将原 customRecord 中数据删除,再以查问出的后果维度新增数据。批改的逻辑亦如此。

public List<CustomRecord> addData(CustomRecord customRecord, DataSource dataSource) {        List<CustomRecord> customRecordList=new ArrayList<>();        String idStr = (customRecord.getFieldMapAfter().get("id")).toString();        List<OrderGoods> orderGoodsList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));        RecordBuilder.modifyRecordBuilder(customRecord).deleteRecord();        if (orderGoodsList.size() > 0) {            for (OrderGoods orderGoods:orderGoodsList){                //增加须要的行和列                Map<String,Object> fieldMap=BeanMapTool.beanToMap(orderGoods);                customRecordList.add(RecordBuilder.createRecordBuilder().createRecord(fieldMap).build());            }        }        return customRecordList;    }    public List<CustomRecord> updateData(CustomRecord customRecord, DataSource dataSource) {        List<CustomRecord> customRecordList=new ArrayList<>();        String idStr = (customRecord.getFieldMapAfter().get("id")).toString();        List<OrderGoods> orderGoodsList = tryQuerySourceDs(dataSource, Integer.valueOf(Integer.parseInt(idStr.substring(idStr.indexOf("=") + 1, idStr.indexOf(")")))));        RecordBuilder.modifyRecordBuilder(customRecord).deleteRecord();        if (orderGoodsList.size() > 0) {            for (OrderGoods orderGoods:orderGoodsList){                //增加须要的行和列                Map<String,Object> fieldMap=BeanMapTool.beanToMap(orderGoods);                customRecordList.add(RecordBuilder.createRecordBuilder().createRecord(fieldMap).build());            }        }        return customRecordList;    }    private List<OrderGoods> tryQuerySourceDs(DataSource dataSource, Integer id) {        String sql="SELECT to2.store_id,tuc.id AS user_id,tuc.sex AS sex,tuc.birthday,tog.NAME AS goods_name,tog.goods_type,tog.goods_id AS order_goods_id,tog.goods_price,tog.create_time AS order_create_time,tog.id AS order_id,tog.enterprise_goods_id AS enterprise_goods_id FROM `live-mini`.tb_order to2 INNER JOIN `live-mini`.tb_order_goods tog ON to2.id = tog.order_id  AND tog.is_del = 0  AND to2.user_id = tog.user_id INNER JOIN `live-mini`.tb_user_c tuc ON to2.user_id = tuc.id AND tuc.is_del = 0  WHERE to2.is_del = 0  and to2.id=";        try(Connection connection = dataSource.getConnection();            PreparedStatement ps = connection.prepareStatement(sql + id+" GROUP BY tog.id")) {            ResultSet resultSet = ps.executeQuery();            BeanListHandler<OrderGoods> bh = new BeanListHandler(OrderGoods.class);            List<OrderGoods> orderGoodsList = bh.handle(resultSet);            return orderGoodsList;        } catch (Exception e) {            esLogger.error(e.getMessage());            return new ArrayList<>();        }    }

实现成果

 {        "_index" : "live-mini-order-pro",        "_type" : "_doc",        "_id" : "359",        "_score" : 1.0,        "_source" : {          "goods_type" : 0,          "order_id" : 359,          "order_goods_id" : 450,          "order_create_time" : "2020-12-22T10:45:20.000Z",          "enterprise_goods_id" : 64,          "goods_name" : "【老客户专享】万店掌2021新年定制台历",          "sex" : 2,          "goods_price" : 1.0,          "user_id" : 386,          "store_id" : 1,          "birthday" : ""        }      }

写在最初

CloudCanal 的自定义代码很好地解决了咱们多表关联同步 ES 的问题,简洁易用的界面和有深度的性能都令人印象粗浅,期待 CloudCanal 更多新能力。对于 CloudCanal 自定义代码的能力,也欢送大家与我交换。

参加内测

CloudCanal 会一直提供一些预览的能力,包含新数据链路, 优化能力,性能插件。本文所形容的自定义代码能力目前也处于内测阶段。如需体验,可增加咱们小助手(微信号:suhuayue001)进行理解和试用。

退出CloudCanal粉丝群把握一手音讯和获取更多福利,请增加咱们小助手微信:suhuayue001
CloudCanal-收费好用的企业级数据同步工具,欢送品鉴。
理解更多产品能够查看官方网站: http://www.clougence.com
CloudCanal社区:https://www.askcug.com/