1. Flink大数据实时处理设计方案

整套计划通过Canal + Kafka 连接器 + Protobuf,实现数据的同步接入, 由Flink服务负责对各类业务数据的实时统计解决。

2. 热销商品的统计解决

  • 性能

    实现对热销商品的统计, 统计周期为一天, 每3秒刷新一次数据。

  • 外围代码

    主逻辑实现:

        /**     * 执行Flink工作解决     * @throws Exception     */    private void executeFlinkTask() throws Exception {        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 2. 设置kafka服务连贯信息        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "10.10.20.132:9092");        properties.setProperty("group.id", "fink_group");        // 3. 创立Kafka生产端        FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(                "order_binlog",                  // 指标 topic                new SimpleStringSchema(),   // 序列化 配置                properties);        // 调试,从新从最早记录生产        kafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        // 4. 读取Kafka数据源        DataStreamSource<String> socketStr = env.addSource(kafkaProducer);        // 5. 数据过滤转换解决        socketStr.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);                String isDDL = jsonObject.get("isDdl").getAsString();                String type = jsonObject.get("type").getAsString();                // 过滤条件: 非DDL操作, 并且是新增的数据                return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);          }        }).flatMap(new FlatMapFunction<String, Order>() {          @Override            public void flatMap(String value, Collector<Order> out) throws Exception {                // 获取JSON中的data数据                JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");                // 将data数据转换为java对象                for(int i =0; i< dataArray.size(); i++) {                    JsonObject jsonObject = dataArray.get(i).getAsJsonObject();                    Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class);                    System.out.println("order => " + order);                    out.collect(order);                }            }        })        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {            @Override            public long extractTimestamp(Order element) {                return element.getExecTime();            }        })        .keyBy(Order::getGoodsId)        .timeWindow(Time.hours(24), Time.seconds(3))        .aggregate(new TotalAmount(), new AmountWindow())        .keyBy(HotOrder::getTimeWindow)        .process(new TopNHotOrder());        // 6. 执行工作        env.execute("job");    }

热销商品的金额累加解决:

   /**     * 商品金额累加器     */    private static class TotalAmount implements AggregateFunction<Order, Order, Order> {        @Override        public Order createAccumulator() {            Order order = new Order();            order.setTotalAmount(0l);            return order;        }        /**         * 累加统计商品销售总金额         * @param value         * @param accumulator         * @return         */        @Override        public Order add(Order value, Order accumulator) {            accumulator.setGoodsId(value.getGoodsId());            accumulator.setGoodsName((value.getGoodsName()));            accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));            return accumulator;        }        @Override        public Order getResult(Order accumulator) {            return accumulator;        }        @Override        public Order merge(Order a, Order b) {            return null;        }    }

热销商品的数据转换解决, 用于统计:

    /**     * 热销商品, 在工夫窗口内, 对象数据的转换解决     */    private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> {        @Override        public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception {            Order order = input.iterator().next();            out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd()));        }    }

热销商品的统计排行解决逻辑:

    /**     * 热销商品的统计排行实现     */    private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {        private ListState<HotOrder> orderState;        @Override        public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {            // 将数据退出到状态列表外面            orderState.add(value);            // 注册定时器            ctx.timerService().registerEventTimeTimer(value.getTimeWindow());        }        @Override        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {            List<HotOrder> orderList = new ArrayList<>();            for(HotOrder order : orderState.get()){                orderList.add(order);            }            // 依照成交总金额, 倒序排列            orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());            orderState.clear();            // 将数据写入至ES            HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");            StringBuffer strBuf = new StringBuffer();            for(HotOrder order: orderList) {                order.setId(order.getGoodsId());                order.setCreateDate(new Date(order.getTimeWindow()));                hotOrderRepository.save(order);                strBuf.append(order).append("\n");                System.out.println("result => " + order);            }            out.collect(strBuf.toString());        }        @Override        public void open(Configuration parameters) throws Exception {            super.open(parameters);            orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));        }    }

3. 区域热销商品统计解决 (多维度条件)

  • 性能

    性能: 依据不同区域(比方省份、城市), 实现对热销商品的统计, 统计周期为一天, 每3秒刷新一次数据。

  • 外围代码

    主逻辑代码:

        /**     * 执行Flink工作解决     * @throws Exception     */    private void executeFlinkTask() throws Exception {        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 2. 设置kafka服务连贯信息        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "10.10.20.132:9092");        properties.setProperty("group.id", "fink_group");        // 3. 创立订单的Kafka生产端        FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer(                "order_binlog",                  // 指标 topic                new SimpleStringSchema(),   // 序列化 配置                properties);        // 调试,从新从最早记录生产        orderKafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        // 4. 创立地址信息的kafka生产端        FlinkKafkaConsumer addressKafkaProducer = new FlinkKafkaConsumer(                "orderAddress_binlog",                  // 指标 topic                new SimpleStringSchema(),   // 序列化 配置                properties);        // 调试,从新从最早记录生产        addressKafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始        // 5. 读取Kafka数据源(订单数据源和地址数据源)        DataStreamSource<String> orderStream = env.addSource(orderKafkaProducer);        DataStreamSource<String> addressStream = env.addSource(addressKafkaProducer);        // 6. 数据过滤转换解决(订单数据)        DataStream<Order> orderDataStream = orderStream.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);                String isDDL = jsonObject.get("isDdl").getAsString();                String type = jsonObject.get("type").getAsString();                // 过滤条件: 非DDL操作, 并且是新增的数据                return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);            }        }).flatMap(new FlatMapFunction<String, Order>() {            @Override            public void flatMap(String value, Collector<Order> out) throws Exception {                // 获取JSON中的data数据                JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");                // 将data数据转换为java对象                for(int i =0; i< dataArray.size(); i++) {                    JsonObject jsonObject = dataArray.get(i).getAsJsonObject();                    Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class);                    System.out.println("order => " + order);                    out.collect(order);                }            }        })        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {            @Override            public long extractTimestamp(Order element) {                return element.getExecTime();            }        });        // 7. 过滤转换地址数据源        DataStream<OrderAddress> orderAddressDataStream = addressStream.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);                String isDDL = jsonObject.get("isDdl").getAsString();                String type = jsonObject.get("type").getAsString();                // 过滤条件: 非DDL操作, 并且是新增的数据                return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);            }        }).flatMap(new FlatMapFunction<String, OrderAddress>() {            @Override            public void flatMap(String value, Collector<OrderAddress> out) throws Exception {                // 获取JSON中的data数据                JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");                // 将data数据转换为java对象                for(int i =0; i< dataArray.size(); i++) {                    JsonObject jsonObject = dataArray.get(i).getAsJsonObject();                    OrderAddress orderAddress = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderAddress.class);                    System.out.println("orderAddress => " + orderAddress);                    out.collect(orderAddress);                }            }        })        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderAddress>(Time.seconds(0)) {            @Override            public long extractTimestamp(OrderAddress element) {                return element.getExecTime();            }        });        // 8. 订单数据流和地址数据流的join解决        orderDataStream.join(orderAddressDataStream).where(new KeySelector<Order, Object>() {            @Override            public Object getKey(Order value) throws Exception {                return value.getId();            }        }).equalTo(new KeySelector<OrderAddress, Object>() {            @Override            public Object getKey(OrderAddress value) throws Exception {                return value.getOrderId();            }        })        // 这里的工夫, 相比上面的工夫窗滑动值slide快一些        .window(TumblingEventTimeWindows.of(Time.seconds(2)))        .apply(new JoinFunction<Order, OrderAddress, JoinOrderAddress>() {            @Override            public JoinOrderAddress join(Order first, OrderAddress second) throws Exception {                return JoinOrderAddress.build(first, second);            }        }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderAddress>(Time.seconds(0)) {            @Override            public long extractTimestamp(JoinOrderAddress element) {                return element.getExecTime();            }        })        // 9. 依据省份和商品ID进行数据分组        .keyBy(new KeySelector<JoinOrderAddress, Tuple2<String, Long>>() {            @Override            public Tuple2<String, Long> getKey(JoinOrderAddress value) throws Exception {                return Tuple2.of(value.getProvince(), value.getGoodsId());            }        })        .timeWindow(Time.hours(24), Time.seconds(3))        .aggregate(new TotalAmount(), new AmountWindow())        .keyBy(HotDimensionOrder::getTimeWindow)        .process(new TopNDimensionOrder());        // 10. 执行工作        env.execute("job");    }

商品金额累加器:

  /**   * 商品金额累加器   */  private static class TotalAmount implements AggregateFunction<JoinOrderAddress, JoinOrderAddress, JoinOrderAddress> {      @Override      public JoinOrderAddress createAccumulator() {          JoinOrderAddress order = new JoinOrderAddress();          order.setTotalAmount(0l);          return order;      }        /**       * 商品销售总金额累加解决       * @param value       * @param accumulator       * @return       */      @Override      public JoinOrderAddress add(JoinOrderAddress value, JoinOrderAddress accumulator) {          accumulator.setGoodsId(value.getGoodsId());          accumulator.setGoodsName((value.getGoodsName()));          accumulator.setProvince(value.getProvince());          accumulator.setCity(value.getCity());          accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));          return accumulator;      }        @Override      public JoinOrderAddress getResult(JoinOrderAddress accumulator) {          return accumulator;      }        @Override      public JoinOrderAddress merge(JoinOrderAddress a, JoinOrderAddress b) {          return null;      }  }

热销商品的数据转换解决:

  private static class AmountWindow implements WindowFunction<JoinOrderAddress, HotDimensionOrder, Tuple2<String, Long>, TimeWindow> {        @Override      public void apply(Tuple2<String, Long> goodsId, TimeWindow window, Iterable<JoinOrderAddress> input, Collector<HotDimensionOrder> out) throws Exception {          JoinOrderAddress order = input.iterator().next();          out.collect(new HotDimensionOrder(order, window.getEnd()));      }  }

依据不同区域的热销商品, 实现统计排行:

  private class TopNDimensionOrder extends KeyedProcessFunction<Long, HotDimensionOrder, String> {        private ListState<HotDimensionOrder> orderState;        @Override      public void processElement(HotDimensionOrder value, Context ctx, Collector<String> out) throws Exception {          // 将数据退出到状态列表外面          orderState.add(value);          // 注册定时器          ctx.timerService().registerEventTimeTimer(value.getTimeWindow());      }        @Override      public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {          List<HotDimensionOrder> orderList = new ArrayList<>();          for(HotDimensionOrder order : orderState.get()){              orderList.add(order);          }          // 依照省份和商品的成交总金额, 倒序排列          orderList.sort(Comparator.comparing(HotDimensionOrder::getProvince).thenComparing(HotDimensionOrder::getTotalAmount, Comparator.reverseOrder()));          orderState.clear();          // 将数据写入至ES          HotDimensionRepository  hotDimensionRepository = (HotDimensionRepository) ApplicationContextUtil.getBean("hotDimensionRepository");          StringBuffer strBuf = new StringBuffer();          for(HotDimensionOrder order: orderList) {              order.setId(order.getProvince() + order.getGoodsId());              order.setCreateDate(new Date(order.getTimeWindow()));              hotDimensionRepository.save(order);              strBuf.append(order).append("\n");              System.out.println("result => " + order);          }          out.collect(strBuf.toString());      }        @Override      public void open(Configuration parameters) throws Exception {          super.open(parameters);          orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotDimensionOrder>("hot-dimension", HotDimensionOrder.class));        }  }

本文由mirson创作分享,如需进一步交换,请加QQ群:19310171或拜访www.softart.cn