1. 订单领取状态跟踪统计(CEP使用)

  • 性能

    实现对热销商品的统计, 统计周期为一天, 每3秒刷新一次数据。

  • 外围代码

    主逻辑代码实现:

          /**     * 执行Flink工作解决     * @throws Exception     */    private void executeFlinkTask() throws Exception {        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        // 2. 设置kafka服务连贯信息        Properties properties = new Properties();        properties.setProperty("bootstrap.servers", "10.10.20.132:9092");        properties.setProperty("group.id", "fink_group");        // 3. 创立Kafka生产端        FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer(                "orderPayment_binlog",                  // 指标 topic                new SimpleStringSchema(),   // 序列化 配置                properties);        // 调试,从新从最早记录生产        kafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        // 4. 读取Kafka数据源        DataStreamSource<String> socketStr = env.addSource(kafkaProducer);        // 5. 数据过滤转换解决        DataStream<OrderPayment> orderPaymentDataStream = socketStr.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);                String isDDL = jsonObject.get("isDdl").getAsString();                String type = jsonObject.get("type").getAsString();                // 过滤条件: 非DDL操作, 并且是新增的数据                return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);            }        }).flatMap(new FlatMapFunction<String, OrderPayment>() {            @Override            public void flatMap(String value, Collector<OrderPayment> out) throws Exception {                // 获取JSON中的data数据                JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");                // 将data数据转换为java对象                for(int i =0; i< dataArray.size(); i++) {                    JsonObject jsonObject = dataArray.get(i).getAsJsonObject();                    OrderPayment orderPayment = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderPayment.class);                    System.out.println("orderPayment => " + orderPayment);                    out.collect(orderPayment);                }            }        })        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderPayment>(Time.seconds(0)) {            @Override            public long extractTimestamp(OrderPayment element) {                return element.getUpdateTime();            }        })        .keyBy(OrderPayment::getOrderId);          // 6.通过CEP机制, 判断领取胜利的数据        Pattern<OrderPayment, ?> pattern = Pattern.<OrderPayment>begin("begin")                .where(new SimpleCondition<OrderPayment>() {                    @Override                    public boolean filter(OrderPayment value) throws Exception {                        return value.getStatus() == 0;                    }                }).next("follow").where(new SimpleCondition<OrderPayment>() {                    @Override                    public boolean filter(OrderPayment value) throws Exception {                        return value.getStatus() == 1;                    }                }).within(Time.seconds(15)).times(1);        PatternStream<OrderPayment> patternStream = CEP.pattern(orderPaymentDataStream, pattern);        // 7.定义超时数据的TAG标记        OutputTag orderExpired = new OutputTag<OrderPayment>("orderExpired"){};        DataStream<OrderPaymentResult> selectResult = patternStream.select(orderExpired,                new OrderExpiredMatcher(), new OrderPayedMatcher());        selectResult.print("payed");        // 8. 创立Kafka生产端(订单数据源)        FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer(                "order_binlog",                  // 指标 topic                new SimpleStringSchema(),   // 序列化 配置                properties);        orderKafkaProducer.setStartFromEarliest();     // 尽可能从最早的记录开始        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);        env.setParallelism(1);        DataStreamSource<String> orderSource = env.addSource(orderKafkaProducer);        // 9. 数据过滤转换解决(订单数据源)        DataStream<Order> orderDataStream = orderSource.filter(new FilterFunction<String>() {            @Override            public boolean filter(String value) throws Exception {                JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value);                String isDDL = jsonObject.get("isDdl").getAsString();                String type = jsonObject.get("type").getAsString();                // 过滤条件: 非DDL操作, 并且是新增的数据                return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type);            }        }).flatMap(new FlatMapFunction<String, Order>() {            @Override            public void flatMap(String value, Collector<Order> out) throws Exception {                // 获取JSON中的data数据                JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data");                // 将data数据转换为java对象                for(int i =0; i< dataArray.size(); i++) {                    JsonObject jsonObject = dataArray.get(i).getAsJsonObject();                    Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class);                    System.out.println("order => " + order);                    out.collect(order);                }            }        })        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) {            @Override            public long extractTimestamp(Order element) {                return element.getExecTime();            }        });        // 10. 数据源关联解决        orderDataStream.keyBy(Order::getId).intervalJoin(selectResult.keyBy(OrderPaymentResult::getOrderId))                .between(Time.seconds(0), Time.seconds(15))                .process(new ProcessJoinFunction<Order, OrderPaymentResult, JoinOrderPayment>() {                    @Override                    public void processElement(Order left, OrderPaymentResult right, Context ctx, Collector<JoinOrderPayment> out) throws Exception {                        JoinOrderPayment joinResult = JoinOrderPayment.build(left, right);                        out.collect(joinResult);                    }                })                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderPayment>(Time.seconds(0)) {                    @Override                    public long extractTimestamp(JoinOrderPayment element) {                        return element.getUpdateTime();                    }                })                .keyBy(JoinOrderPayment::getGoodsId)                .timeWindow(Time.hours(24), Time.seconds(3))                .aggregate(new TotalAmount(), new AmountWindow())                .keyBy(HotOrder::getTimeWindow)                .process(new TopNHotOrder());        // 11. 执行工作        env.execute("job");    }

商品金额累加器:

   /**     * 商品金额累加器     */    private static class TotalAmount implements AggregateFunction<JoinOrderPayment, JoinOrderPayment, JoinOrderPayment> {        @Override        public JoinOrderPayment createAccumulator() {            JoinOrderPayment order = new JoinOrderPayment();            order.setTotalAmount(0l);            return order;        }        /**         * 商品销售总金额累加解决         * @param value         * @param accumulator         * @return         */        @Override        public JoinOrderPayment add(JoinOrderPayment value, JoinOrderPayment accumulator) {            accumulator.setGoodsId(value.getGoodsId());            accumulator.setGoodsName((value.getGoodsName()));            accumulator.setStatus(value.getStatus());            accumulator.setUpdateTime(value.getUpdateTime());            accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));            return accumulator;        }        @Override        public JoinOrderPayment getResult(JoinOrderPayment accumulator) {            return accumulator;        }        @Override        public JoinOrderPayment merge(JoinOrderPayment a, JoinOrderPayment b) {            return null;        }    }

热销商品转换解决:

    /**     * 热销商品, 工夫窗口对象转换解决     */    private static class AmountWindow implements WindowFunction<JoinOrderPayment, HotOrder, Long, TimeWindow> {        @Override        public void apply(Long goodsId, TimeWindow window, Iterable<JoinOrderPayment> input, Collector<HotOrder> out) throws Exception {            JoinOrderPayment order = input.iterator().next();            out.collect(new HotOrder(order.getGoodsId(), order.getGoodsName(), order.getTotalAmount(), window.getEnd()));        }    }

热销商品的统计排行实现:

    /**     * 热销商品的统计排行实现     */    private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {        private ListState<HotOrder> orderState;        @Override        public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {            // 将数据退出到状态列表外面            orderState.add(value);            // 注册定时器            ctx.timerService().registerEventTimeTimer(value.getTimeWindow());        }        @Override        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {            List<HotOrder> orderList = new ArrayList<>();            for(HotOrder order : orderState.get()){                orderList.add(order);            }            // 依照成交总金额, 倒序排列            orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());            orderState.clear();            // 将数据写入至ES            HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");            StringBuffer strBuf = new StringBuffer();            for(HotOrder order: orderList) {                order.setId(order.getGoodsId());                order.setCreateDate(new Date(order.getTimeWindow()));                hotOrderRepository.save(order);                strBuf.append(order).append("\n");                System.out.println("result => " + order);            }            out.collect(strBuf.toString());        }        @Override        public void open(Configuration parameters) throws Exception {            super.open(parameters);            orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));        }    }

超时数据的匹配解决:

private class OrderExpiredMatcher implements PatternTimeoutFunction<OrderPayment, OrderPaymentResult> {        @Override        public OrderPaymentResult timeout(Map<String, List<OrderPayment>> map, long l) throws Exception {            OrderPaymentResult result = new OrderPaymentResult();            OrderPayment payment = map.get("begin").iterator().next();            result.setOrderId(payment.getOrderId());            result.setStatus(payment.getStatus());            result.setUpdateTime(payment.getUpdateTime());            result.setMessage("领取超时");            return result;        }    }

领取胜利的匹配解决:

private class OrderPayedMatcher implements PatternSelectFunction<OrderPayment, OrderPaymentResult> {        @Override        public OrderPaymentResult select(Map<String, List<OrderPayment>> map) throws Exception {            OrderPaymentResult result = new OrderPaymentResult();            OrderPayment payment = map.get("follow").iterator().next();            result.setOrderId(payment.getOrderId());            result.setStatus(payment.getStatus());            result.setUpdateTime(payment.getUpdateTime());            result.setMessage("领取胜利");            return result;        }    }

2. 商品UV统计(一般统计)

  • 性能

    统计商品在一段时间内的UV(Unique Visitor),去重后的点击量, 依据IP去重。

  • 外围代码

    主逻辑实现:

    public class ScreenUniqueVisitorProcessor {    /**     * 执行flink工作解决     * @throws Exception     */    public void executeFlinkTask() throws Exception {        // 1. 创立运行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);        env.setParallelism(1);        // 2. 读取Socket数据源//        DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n");        DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log");        // 3. 数据解析转换解决        socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() {            @Override            public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception {                // 获取Json中的data数据                // 依据分隔符解析数据                String[] arrValue = value.split("\t");                System.out.println("receive msg => " + value);                // 将数据组装为对象                GoodsAccessLog log = new GoodsAccessLog();                for(int i=0; i<arrValue.length; i++) {                    if(i == 0) {                        log.setIp(arrValue[i]);                    }else if( i== 1) {                        log.setAccessTime(Long.valueOf(arrValue[i]));                    }else if( i== 2) {                        log.setEventType(arrValue[i]);                    }else if( i== 3) {                        log.setGoodsId(arrValue[i]);                    }                }                out.collect(log);            }        })        .filter(new FilterFunction<GoodsAccessLog>() {            @Override            public boolean filter(GoodsAccessLog value) throws Exception {                return value.getEventType().equals("view");            }        })        .keyBy(GoodsAccessLog::getGoodsId)        .timeWindow(Time.seconds(10))        .process( new ProcessWindowFunction<GoodsAccessLog, Map<String, String> , String, TimeWindow>(){            @Override            public void process(String key, Context context, Iterable<GoodsAccessLog> elements, Collector<Map<String, String>> out) throws Exception {                Set<String> ipSet = new HashSet<>();                Map<String, String> goodsUV = new LinkedHashMap<>();                elements.forEach( log -> {                    ipSet.add(log.getIp());                });                goodsUV.put(key , context.window().getEnd() + ":" + ipSet.size());                out.collect(goodsUV);            }        })        .print("uv result").setParallelism(1);        // 5. 执行工作        env.execute("job");    }}

热销商品的金额累加解决:

/** * 商品金额累加器 */private static class TotalAmount implements AggregateFunction<Order, Order, Order> {    @Override    public Order createAccumulator() {        Order order = new Order();        order.setTotalAmount(0l);        return order;    }    /**     * 累加统计商品销售总金额     * @param value     * @param accumulator     * @return     */    @Override    public Order add(Order value, Order accumulator) {        accumulator.setGoodsId(value.getGoodsId());        accumulator.setGoodsName((value.getGoodsName()));        accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));        return accumulator;    }    @Override    public Order getResult(Order accumulator) {        return accumulator;    }    @Override    public Order merge(Order a, Order b) {        return null;    }}

热销商品的数据转换解决, 用于统计:

/** * 热销商品, 在工夫窗口内, 对象数据的转换解决 */private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> {    @Override    public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception {        Order order = input.iterator().next();        out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd()));    }}

热销商品的统计排行解决逻辑:

/** * 热销商品的统计排行实现 */private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {    private ListState<HotOrder> orderState;    @Override    public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {        // 将数据退出到状态列表外面        orderState.add(value);        // 注册定时器        ctx.timerService().registerEventTimeTimer(value.getTimeWindow());    }    @Override    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {        List<HotOrder> orderList = new ArrayList<>();        for(HotOrder order : orderState.get()){            orderList.add(order);        }        // 依照成交总金额, 倒序排列        orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());        orderState.clear();        // 将数据写入至ES        HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");        StringBuffer strBuf = new StringBuffer();        for(HotOrder order: orderList) {            order.setId(order.getGoodsId());            order.setCreateDate(new Date(order.getTimeWindow()));            hotOrderRepository.save(order);            strBuf.append(order).append("\n");            System.out.println("result => " + order);        }        out.collect(strBuf.toString());    }    @Override    public void open(Configuration parameters) throws Exception {        super.open(parameters);        orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));    }}

3. 商品UV统计(布隆过滤器)

  • 性能

    性能: 统计商品在一段时间内的UV(采纳布隆过滤器),去重后的点击量, 依据IP去重。

  • 外围代码

    主逻辑代码:

    /** * 执行flink工作解决 * @throws Exception */public void executeFlinkTask() throws Exception {    // 1. 创立运行环境    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);    // 2. 读取Socket数据源//        DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n");    DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log");    // 3. 数据解析转换解决    socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() {        @Override        public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception {            // 获取Json中的data数据            // 依据分隔符解析数据            String[] arrValue = value.split("\t");            // 将数据组装为对象            GoodsAccessLog log = new GoodsAccessLog();            for(int i=0; i<arrValue.length; i++) {                if(i == 0) {                    log.setIp(arrValue[i]);                }else if( i== 1) {                    log.setAccessTime(Long.valueOf(arrValue[i]));                }else if( i== 2) {                    log.setEventType(arrValue[i]);                }else if( i== 3) {                    log.setGoodsId(arrValue[i]);                }            }            out.collect(log);        }    })    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<GoodsAccessLog>(Time.seconds(0)) {        @Override        public long extractTimestamp(GoodsAccessLog element) {            return element.getAccessTime();        }    })    .filter(new FilterFunction<GoodsAccessLog>() {        @Override        public boolean filter(GoodsAccessLog value) throws Exception {            return value.getEventType().equals("view");        }    })    .keyBy(GoodsAccessLog::getGoodsId)    .timeWindow(Time.minutes(30))    .trigger(new CustomWindowTrigger())    .process(new CustomUVBloom())    .keyBy(0)    .timeWindow(Time.seconds(3))    .max(1)    .print("uv result => ").setParallelism(1);    // 5. 执行工作    env.execute("job");}

本文由mirson创作分享,如需进一步交换,请加QQ群:19310171或拜访www.softart.cn