1. 订单领取状态跟踪统计(CEP 使用)
-
性能
实现对热销商品的统计,统计周期为一天,每 3 秒刷新一次数据。
-
外围代码
主逻辑代码实现:
/** * 执行 Flink 工作解决 * @throws Exception */ private void executeFlinkTask() throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 2. 设置 kafka 服务连贯信息 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "10.10.20.132:9092"); properties.setProperty("group.id", "fink_group"); // 3. 创立 Kafka 生产端 FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer( "orderPayment_binlog", // 指标 topic new SimpleStringSchema(), // 序列化 配置 properties); // 调试,从新从最早记录生产 kafkaProducer.setStartFromEarliest(); // 尽可能从最早的记录开始 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 4. 读取 Kafka 数据源 DataStreamSource<String> socketStr = env.addSource(kafkaProducer); // 5. 数据过滤转换解决 DataStream<OrderPayment> orderPaymentDataStream = socketStr.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception {JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 过滤条件:非 DDL 操作,并且是新增的数据 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, OrderPayment>() { @Override public void flatMap(String value, Collector<OrderPayment> out) throws Exception { // 获取 JSON 中的 data 数据 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 将 data 数据转换为 java 对象 for(int i =0; i< dataArray.size(); i++) {JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); OrderPayment orderPayment = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, OrderPayment.class); System.out.println("orderPayment =>" + orderPayment); out.collect(orderPayment); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderPayment>(Time.seconds(0)) { @Override public long extractTimestamp(OrderPayment element) {return element.getUpdateTime(); } }) .keyBy(OrderPayment::getOrderId); // 6. 通过 CEP 机制,判断领取胜利的数据 Pattern<OrderPayment, ?> pattern = Pattern.<OrderPayment>begin("begin") .where(new SimpleCondition<OrderPayment>() { @Override public boolean filter(OrderPayment value) throws Exception {return value.getStatus() == 0; } }).next("follow").where(new SimpleCondition<OrderPayment>() { @Override public boolean filter(OrderPayment value) throws Exception {return value.getStatus() == 1; } }).within(Time.seconds(15)).times(1); PatternStream<OrderPayment> patternStream = CEP.pattern(orderPaymentDataStream, pattern); // 7. 定义超时数据的 TAG 标记 OutputTag orderExpired = new OutputTag<OrderPayment>("orderExpired"){}; DataStream<OrderPaymentResult> selectResult = patternStream.select(orderExpired, new OrderExpiredMatcher(), new OrderPayedMatcher()); selectResult.print("payed"); // 8. 创立 Kafka 生产端(订单数据源)FlinkKafkaConsumer orderKafkaProducer = new FlinkKafkaConsumer( "order_binlog", // 指标 topic new SimpleStringSchema(), // 序列化 配置 properties); orderKafkaProducer.setStartFromEarliest(); // 尽可能从最早的记录开始 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); DataStreamSource<String> orderSource = env.addSource(orderKafkaProducer); // 9. 数据过滤转换解决 ( 订单数据源)DataStream<Order> orderDataStream = orderSource.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception {JsonObject jsonObject = GsonConvertUtil.getSingleton().getJsonObject(value); String isDDL = jsonObject.get("isDdl").getAsString(); String type = jsonObject.get("type").getAsString(); // 过滤条件:非 DDL 操作,并且是新增的数据 return isDDL.equalsIgnoreCase("false") && "INSERT".equalsIgnoreCase(type); } }).flatMap(new FlatMapFunction<String, Order>() { @Override public void flatMap(String value, Collector<Order> out) throws Exception { // 获取 JSON 中的 data 数据 JsonArray dataArray = GsonConvertUtil.getSingleton().getJsonObject(value).getAsJsonArray("data"); // 将 data 数据转换为 java 对象 for(int i =0; i< dataArray.size(); i++) {JsonObject jsonObject = dataArray.get(i).getAsJsonObject(); Order order = GsonConvertUtil.getSingleton().cvtJson2Obj(jsonObject, Order.class); System.out.println("order =>" + order); out.collect(order); } } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Order>(Time.seconds(0)) { @Override public long extractTimestamp(Order element) {return element.getExecTime(); } }); // 10. 数据源关联解决 orderDataStream.keyBy(Order::getId).intervalJoin(selectResult.keyBy(OrderPaymentResult::getOrderId)) .between(Time.seconds(0), Time.seconds(15)) .process(new ProcessJoinFunction<Order, OrderPaymentResult, JoinOrderPayment>() { @Override public void processElement(Order left, OrderPaymentResult right, Context ctx, Collector<JoinOrderPayment> out) throws Exception {JoinOrderPayment joinResult = JoinOrderPayment.build(left, right); out.collect(joinResult); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<JoinOrderPayment>(Time.seconds(0)) { @Override public long extractTimestamp(JoinOrderPayment element) {return element.getUpdateTime(); } }) .keyBy(JoinOrderPayment::getGoodsId) .timeWindow(Time.hours(24), Time.seconds(3)) .aggregate(new TotalAmount(), new AmountWindow()) .keyBy(HotOrder::getTimeWindow) .process(new TopNHotOrder()); // 11. 执行工作 env.execute("job"); }
商品金额累加器:
/**
* 商品金额累加器
*/
private static class TotalAmount implements AggregateFunction<JoinOrderPayment, JoinOrderPayment, JoinOrderPayment> {
@Override
public JoinOrderPayment createAccumulator() {JoinOrderPayment order = new JoinOrderPayment();
order.setTotalAmount(0l);
return order;
}
/**
* 商品销售总金额累加解决
* @param value
* @param accumulator
* @return
*/
@Override
public JoinOrderPayment add(JoinOrderPayment value, JoinOrderPayment accumulator) {accumulator.setGoodsId(value.getGoodsId());
accumulator.setGoodsName((value.getGoodsName()));
accumulator.setStatus(value.getStatus());
accumulator.setUpdateTime(value.getUpdateTime());
accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));
return accumulator;
}
@Override
public JoinOrderPayment getResult(JoinOrderPayment accumulator) {return accumulator;}
@Override
public JoinOrderPayment merge(JoinOrderPayment a, JoinOrderPayment b) {return null;}
}
热销商品转换解决:
/**
* 热销商品,工夫窗口对象转换解决
*/
private static class AmountWindow implements WindowFunction<JoinOrderPayment, HotOrder, Long, TimeWindow> {
@Override
public void apply(Long goodsId, TimeWindow window, Iterable<JoinOrderPayment> input, Collector<HotOrder> out) throws Exception {JoinOrderPayment order = input.iterator().next();
out.collect(new HotOrder(order.getGoodsId(), order.getGoodsName(), order.getTotalAmount(), window.getEnd()));
}
}
热销商品的统计排行实现:
/**
* 热销商品的统计排行实现
*/
private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {
private ListState<HotOrder> orderState;
@Override
public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {
// 将数据退出到状态列表外面
orderState.add(value);
// 注册定时器
ctx.timerService().registerEventTimeTimer(value.getTimeWindow());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {List<HotOrder> orderList = new ArrayList<>();
for(HotOrder order : orderState.get()){orderList.add(order);
}
// 依照成交总金额,倒序排列
orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());
orderState.clear();
// 将数据写入至 ES
HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");
StringBuffer strBuf = new StringBuffer();
for(HotOrder order: orderList) {order.setId(order.getGoodsId());
order.setCreateDate(new Date(order.getTimeWindow()));
hotOrderRepository.save(order);
strBuf.append(order).append("\n");
System.out.println("result =>" + order);
}
out.collect(strBuf.toString());
}
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));
}
}
超时数据的匹配解决:
private class OrderExpiredMatcher implements PatternTimeoutFunction<OrderPayment, OrderPaymentResult> {
@Override
public OrderPaymentResult timeout(Map<String, List<OrderPayment>> map, long l) throws Exception {OrderPaymentResult result = new OrderPaymentResult();
OrderPayment payment = map.get("begin").iterator().next();
result.setOrderId(payment.getOrderId());
result.setStatus(payment.getStatus());
result.setUpdateTime(payment.getUpdateTime());
result.setMessage("领取超时");
return result;
}
}
领取胜利的匹配解决:
private class OrderPayedMatcher implements PatternSelectFunction<OrderPayment, OrderPaymentResult> {
@Override
public OrderPaymentResult select(Map<String, List<OrderPayment>> map) throws Exception {OrderPaymentResult result = new OrderPaymentResult();
OrderPayment payment = map.get("follow").iterator().next();
result.setOrderId(payment.getOrderId());
result.setStatus(payment.getStatus());
result.setUpdateTime(payment.getUpdateTime());
result.setMessage("领取胜利");
return result;
}
}
2. 商品 UV 统计(一般统计)
-
性能
统计商品在一段时间内的 UV(Unique Visitor),去重后的点击量,依据 IP 去重。
-
外围代码
主逻辑实现:
public class ScreenUniqueVisitorProcessor { /** * 执行 flink 工作解决 * @throws Exception */ public void executeFlinkTask() throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); env.setParallelism(1); // 2. 读取 Socket 数据源 // DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log"); // 3. 数据解析转换解决 socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() { @Override public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception { // 获取 Json 中的 data 数据 // 依据分隔符解析数据 String[] arrValue = value.split("\t"); System.out.println("receive msg =>" + value); // 将数据组装为对象 GoodsAccessLog log = new GoodsAccessLog(); for(int i=0; i<arrValue.length; i++) {if(i == 0) {log.setIp(arrValue[i]); }else if(i== 1) {log.setAccessTime(Long.valueOf(arrValue[i])); }else if(i== 2) {log.setEventType(arrValue[i]); }else if(i== 3) {log.setGoodsId(arrValue[i]); } } out.collect(log); } }) .filter(new FilterFunction<GoodsAccessLog>() { @Override public boolean filter(GoodsAccessLog value) throws Exception {return value.getEventType().equals("view"); } }) .keyBy(GoodsAccessLog::getGoodsId) .timeWindow(Time.seconds(10)) .process(new ProcessWindowFunction<GoodsAccessLog, Map<String, String> , String, TimeWindow>(){ @Override public void process(String key, Context context, Iterable<GoodsAccessLog> elements, Collector<Map<String, String>> out) throws Exception {Set<String> ipSet = new HashSet<>(); Map<String, String> goodsUV = new LinkedHashMap<>(); elements.forEach( log -> {ipSet.add(log.getIp()); }); goodsUV.put(key , context.window().getEnd() + ":" + ipSet.size()); out.collect(goodsUV); } }) .print("uv result").setParallelism(1); // 5. 执行工作 env.execute("job"); } }
热销商品的金额累加解决:
/**
* 商品金额累加器
*/
private static class TotalAmount implements AggregateFunction<Order, Order, Order> {
@Override
public Order createAccumulator() {Order order = new Order();
order.setTotalAmount(0l);
return order;
}
/**
* 累加统计商品销售总金额
* @param value
* @param accumulator
* @return
*/
@Override
public Order add(Order value, Order accumulator) {accumulator.setGoodsId(value.getGoodsId());
accumulator.setGoodsName((value.getGoodsName()));
accumulator.setTotalAmount(accumulator.getTotalAmount() + (value.getExecPrice() * value.getExecVolume()));
return accumulator;
}
@Override
public Order getResult(Order accumulator) {return accumulator;}
@Override
public Order merge(Order a, Order b) {return null;}
}
热销商品的数据转换解决,用于统计:
/**
* 热销商品,在工夫窗口内,对象数据的转换解决
*/
private static class AmountWindow implements WindowFunction<Order, HotOrder, Long, TimeWindow> {
@Override
public void apply(Long goodsId, TimeWindow window, Iterable<Order> input, Collector<HotOrder> out) throws Exception {Order order = input.iterator().next();
out.collect(new HotOrder(goodsId, order.getGoodsName(), order.getTotalAmount(), window.getEnd()));
}
}
热销商品的统计排行解决逻辑:
/**
* 热销商品的统计排行实现
*/
private class TopNHotOrder extends KeyedProcessFunction<Long, HotOrder, String> {
private ListState<HotOrder> orderState;
@Override
public void processElement(HotOrder value, Context ctx, Collector<String> out) throws Exception {
// 将数据退出到状态列表外面
orderState.add(value);
// 注册定时器
ctx.timerService().registerEventTimeTimer(value.getTimeWindow());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {List<HotOrder> orderList = new ArrayList<>();
for(HotOrder order : orderState.get()){orderList.add(order);
}
// 依照成交总金额,倒序排列
orderList.sort(Comparator.comparing(HotOrder::getTotalAmount).reversed());
orderState.clear();
// 将数据写入至 ES
HotOrderRepository hotOrderRepository = (HotOrderRepository) ApplicationContextUtil.getBean("hotOrderRepository");
StringBuffer strBuf = new StringBuffer();
for(HotOrder order: orderList) {order.setId(order.getGoodsId());
order.setCreateDate(new Date(order.getTimeWindow()));
hotOrderRepository.save(order);
strBuf.append(order).append("\n");
System.out.println("result =>" + order);
}
out.collect(strBuf.toString());
}
@Override
public void open(Configuration parameters) throws Exception {super.open(parameters);
orderState = getRuntimeContext().getListState(new ListStateDescriptor<HotOrder>("hot-order", HotOrder.class));
}
}
3. 商品 UV 统计(布隆过滤器)
-
性能
性能:统计商品在一段时间内的 UV(采纳布隆过滤器),去重后的点击量,依据 IP 去重。
-
外围代码
主逻辑代码:
/** * 执行 flink 工作解决 * @throws Exception */ public void executeFlinkTask() throws Exception { // 1. 创立运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 2. 读取 Socket 数据源 // DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n"); DataStreamSource<String> socketStr = env.readTextFile("./data/goods_access.log"); // 3. 数据解析转换解决 socketStr.flatMap(new FlatMapFunction<String, GoodsAccessLog>() { @Override public void flatMap(String value, Collector<GoodsAccessLog> out) throws Exception { // 获取 Json 中的 data 数据 // 依据分隔符解析数据 String[] arrValue = value.split("\t"); // 将数据组装为对象 GoodsAccessLog log = new GoodsAccessLog(); for(int i=0; i<arrValue.length; i++) {if(i == 0) {log.setIp(arrValue[i]); }else if(i== 1) {log.setAccessTime(Long.valueOf(arrValue[i])); }else if(i== 2) {log.setEventType(arrValue[i]); }else if(i== 3) {log.setGoodsId(arrValue[i]); } } out.collect(log); } }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<GoodsAccessLog>(Time.seconds(0)) { @Override public long extractTimestamp(GoodsAccessLog element) {return element.getAccessTime(); } }) .filter(new FilterFunction<GoodsAccessLog>() { @Override public boolean filter(GoodsAccessLog value) throws Exception {return value.getEventType().equals("view"); } }) .keyBy(GoodsAccessLog::getGoodsId) .timeWindow(Time.minutes(30)) .trigger(new CustomWindowTrigger()) .process(new CustomUVBloom()) .keyBy(0) .timeWindow(Time.seconds(3)) .max(1) .print("uv result =>").setParallelism(1); // 5. 执行工作 env.execute("job"); }
本文由 mirson 创作分享,如需进一步交换,请加 QQ 群:19310171 或拜访 www.softart.cn