在电商网站中,订单的领取作为间接与钱挂钩的一环,在业务流程中十分重要。对于订单而言,为了正确管制业务流程,也为了减少用户的领取志愿,网站个别会设置一个领取生效工夫,超过一段时间没领取的订单就会被勾销。另外,对于订单的领取,还应该保障最终领取的正确性,能够通过第三方领取平台的交易数据来做一个实时对账
第一个实现的成果,实时获取订单数据,剖析订单的领取状况,别离实时统计领取胜利的和 15 分钟后领取超时的状况
新建一个 maven 我的项目,这是根底依赖,如果之前引入了,就不必加了
<properties>
<maven.compiler.source>`8`</maven.compiler.source>
<maven.compiler.target>`8`</maven.compiler.target>
<flink.version>`1.10.1`</flink.version>
<scala.binary.version>`2.12`</scala.binary.version>
<kafka.version>`2.2.0`</kafka.version>
</properties>
<dependencies>
<dependency>
<groupId>`org.apache.flink`</groupId>
<artifactId>`flink-scala_${scala.binary.version}`</artifactId>
<version>`${flink.version}`</version>
</dependency>
<dependency>
<groupId>`org.apache.flink`</groupId>
<artifactId>`flink-streaming-scala_${scala.binary.version}`</artifactId>
<version>`${flink.version}`</version>
</dependency>
<dependency>
<groupId>`org.apache.kafka`</groupId>
<artifactId>`kafka_${scala.binary.version}`</artifactId>
<version>`${kafka.version}`</version>
</dependency>
<dependency>
<groupId>`org.apache.flink`</groupId>
<artifactId>`flink-connector-kafka_${scala.binary.version}`</artifactId>
<version>`${flink.version}`</version>
</dependency>
<dependency>
<groupId>`cn.hutool`</groupId>
<artifactId>`hutool-all`</artifactId>
<version>`5.5.6`</version>
</dependency>
<dependency>
<groupId>`org.apache.flink`</groupId>
<artifactId>`flink-table-planner-blink_2.12`</artifactId>
<version>`1.10.1`</version>
</dependency>
</dependencies>
这个场景须要用到 cep,所以再退出 cep 依赖
<dependencies>
<dependency>
<groupId>`org.apache.flink`</groupId>
<artifactId>`flink-cep-scala_${scala.binary.version}`</artifactId>
<version>`${flink.version}`</version>
</dependency>
</dependencies>
筹备数据源文件 src/main/resources/OrderLog.csv:
`1234,`**create**`,,`1611047605
1235,
create,,
1611047606
1236,
create,,
1611047606
1234,pay,akdb3833,
1611047616
把 java 目录改为 scala,新建 com.mafei.orderPayMonitor.OrderTimeoutMonitor.scala 的 object
/*
*
* @author mafei
* @date 2021/1/31
*/
package com.mafei.orderPayMonitor
import org.apache.flink.cep.{PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import java.util
/**
* _定义输出样例类类型,_
*
* @param orderId _订单 id_
* @param eventType _事件类别:创立订单 create 还是领取订单 pay_
* @param txId _领取流水号_
* @param ts _工夫_
*/
case class OrderEvent(orderId: Long, eventType:String,txId: String, ts: Long)
/**
* _定义输入样例类类型,_
*/
case class OrderResult(orderId: Long, resultMsg: String)
object OrderTimeoutMonitor {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
`env.setParallelism(`1`)`
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
_// 1__、从文件中读取数据_
`val resource = getClass.getResource(`"/OrderLog.csv"`)`
val orderEvnetStream = env.readTextFile(resource.getPath)
.map(d=>{
`val arr = d.split(`","`)`
`OrderEvent(arr(`0`).toLong,arr(`1`),arr(`2`), arr(`3`).toLong)` _//__把数据读出来转换成想要的样例类类型_
`}).assignAscendingTimestamps(_.ts *` 1000`L)` _//__指定 ts 字段_
`.keyBy(_.orderId)` _//__依照订单 id 分组_
_/**_
* 2__、定义事件 - 匹配模式
* _定义 15 分钟内能发现订单创立和领取_
*/
val orderPayPattern = Pattern
`.begin[OrderEvent](`"create"`).where(_.eventType ==` "create"`)` _//__先呈现一个订单创立的事件_
`.followedBy(`"pay"`).where(_.eventType ==` "pay"`)` _//__后边再进去一个领取事件_
`.within(Time.minutes(`15`))` _//__定义在 15 分钟以内,触发这 2 个事件_
_// 3__、将 pattern 利用到流外面,进行模式检测_
val patternStream = CEP.pattern(orderEvnetStream, orderPayPattern)
_//4__、定义一个侧输入流标签,用于解决超时事件_
`val orderTimeoutTag = new OutputTag[OrderResult](`"orderTimeout"`)`
_// 5__、调用 select 办法,提取并解决匹配的胜利字符事件以及超时事件_
val resultStream = patternStream.select(
orderTimeoutTag,
new OrderTimeoutSelect(),
new OrderPaySelect()
)
`resultStream.print(`"pay"`)`
resultStream.getSideOutput(orderTimeoutTag).print()
`env.execute(`"order timeout monitor"`)`
}
}
//__获取超时之后定义的事件还没触发的状况,也就是订单领取超时了。
class OrderTimeoutSelect() extends PatternTimeoutFunction[OrderEvent, OrderResult]{
override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
`val timeoutOrderId = map.get(`"create"`).iterator().next().orderId`
`OrderResult(timeoutOrderId,` "超时了。。。。超时工夫:"`+l)`
}
}
class OrderPaySelect() extends PatternSelectFunction[OrderEvent, OrderResult]{
override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
`val orderTs = map.get(`"create"`).iterator().next().ts`
`val paydTs = map.get(`"pay"`).iterator().next().ts`
`val payedOrderId = map.get(`"pay"`).iterator().next().orderId`
`OrderResult(payedOrderId,` "订单领取胜利,下单工夫:"`+orderTs+`"领取工夫:"`+paydTs)`
}
}
用 ProcessFunction 来实现下面的场景
csv 还能够用下面的数据,新建一个 scala 的 object src/main/scala/com/mafei/orderPayMonitor/OrderTimeoutMonitorWithProcessFunction.scala
/*
*
* @author mafei
* @date 2021/1/31
*/
package com.mafei.orderPayMonitor
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
object OrderTimeoutMonitorWithProcessFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 1、从文件中读取数据
val resource = getClass.getResource(“/OrderLog.csv”)
val orderEventStream = env.readTextFile(resource.getPath)
.map(d=>{
val arr = d.split(“,”)
OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) // 把数据读出来转换成想要的样例类类型
}).assignAscendingTimestamps(_.ts * 1000L) // 指定 ts 字段
.keyBy(_.orderId) // 依照订单 id 分组
val resultStream = orderEventStream
.process(new OrderPayMatchProcess())
resultStream.print(“ 领取胜利的:“)
resultStream.getSideOutput(new OutputTag[OrderResult]).print(“ 订单超时事件 ”)
env.execute(“ 订单领取监控 with ProcessFunction”)
}
}
class OrderPayMatchProcess() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]{
//
先定义状态标识,标识 create、payed、是否曾经呈现,以及对应的工夫戳
`lazy val isCreateOrderState: ValueState[`Boolean`] = getRuntimeContext.getState(new ValueStateDescriptor[`Boolean`](`"isCreateOrderState", classOf[Boolean]`))`
`lazy val isPayedOrderState: ValueState[`Boolean`] = getRuntimeContext.getState(new ValueStateDescriptor[`Boolean`](`"isPayedOrderState", classOf[Boolean]`))`
`lazy val timerTsState : ValueState[`Long`] = getRuntimeContext.getState(new ValueStateDescriptor[`Long`](`"timerTsState", classOf[Long]`))`
//
定义一个侧输入流,捕捉 timeout 的订单信息
`val orderTimeoutOutputTag = new OutputTag[`OrderResult`](`"timeout"`)`
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext, out: Collector[OrderResult]): Unit = {
// 到这里,必定不会呈现订单创立和领取同时存在的状况,因为会在 processElement 解决掉
// 如果只有订单创立
if (isCreateOrderState.value()){
ctx.output(orderTimeoutOutputTag,OrderResult(ctx.getCurrentKey,” 订单没领取或超时 ”))
}else if(isPayedOrderState.value()){
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey,” 只有领取,没看到订单提交 ”))
}
isCreateOrderState.clear()
isPayedOrderState.clear()
timerTsState.clear()
}
override def processElement(i: OrderEvent, context: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context, collector: Collector[OrderResult]): Unit = {
/**
- 判断以后事件类型,是 create 还是 pay
- 分几种状况:
- 1、判断 create 和 pay 都来了
- 要看有没有超时,没有超时就失常输入
- 超时了输入到侧输入流
- 2、create 或者 pay 有一个没来
- 注册一个定时器等着,而后等定时器触发后再输入
*
*/
val isCreate = isCreateOrderState.value()
val isPayed = isPayedOrderState.value()
val timerTs = timerTsState.value()
// 1、create 来了
if (i.eventType == “create”){
// 1.1 如果曾经领取过了,那是失常领取实现,输入匹配胜利的后果
if (isPayed){
isCreateOrderState.clear()
isPayedOrderState.clear()
timerTsState.clear()
context.timerService().deleteEventTimeTimer(timerTs)
collector.collect(OrderResult(context.getCurrentKey,” 领取胜利 ”))
}else{// 如果没有领取过,那注册一个定时器,期待 15 分钟后触发
context.timerService().registerEventTimeTimer(i.ts)
timerTsState.update(i.ts 1000L + 9001000L)
isCreateOrderState.update(true)
}
}
else if(i.eventType == “pay”){// 如果以后事件是领取事件
if(isCreate){// 判读订单创立事件曾经产生
if(i.ts * 1000L < timerTs){// 创立订单到领取的工夫在超时工夫内,代表失常领取
collector.collect(OrderResult(context.getCurrentKey,” 领取胜利 ”))
}else{
context.output(orderTimeoutOutputTag, OrderResult(context.getCurrentKey,” 曾经领取,然而没有找到订单超时了 ”))
}
isCreateOrderState.clear()
isPayedOrderState.clear()
timerTsState.clear()
context.timerService().deleteEventTimeTimer(timerTs)
}else{// 如果没看到订单创立的事件,那就注册一个定时器等着
context.timerService().registerEventTimeTimer(i.ts)
isPayedOrderState.update(true)
timerTsState.update(i.ts)
}
}
}
`}`
下面实现了监测用户领取的状况,理论中还须要对领取后的账单跟第三方领取平台做一个实时对账性能
会波及到 2 条源码交易数据流(领取和账单)的合流计算
这里模仿账单,所以须要筹备一个数据 ReceiptLog.csv
akdb3833,alipay,1611047619
`akdb3832,wechat,1611049617`
上代码:src/main/scala/com/mafei/orderPayMonitor/TxMatch.scala
/*
*
* @author mafei
* @date 2021/1/31
*/
package com.mafei.orderPayMonitor
import com.mafei.orderPayMonitor.OrderTimeoutMonitor.getClass
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.util.Collector
case class ReceiptEvent(orderId: String, payChannel:String, ts: Long)
object TxMatch {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 1、从订单文件中读取数据
val resource = getClass.getResource(“/OrderLog.csv”)
val orderEventStream = env.readTextFile(resource.getPath)
.map(d=>{
val arr = d.split(“,”)
OrderEvent(arr(0).toLong,arr(1),arr(2), arr(3).toLong) // 把数据读出来转换成想要的样例类类型
}).assignAscendingTimestamps(_.ts * 1000L) // 指定 ts 字段
.filter(_.eventType==”pay”)
.keyBy(_.txId) // 依照交易 id 分组
// 2、从账单中读取数据
val receiptResource = getClass.getResource(“/ReceiptLog.csv”)
val receiptEventStream = env.readTextFile(receiptResource.getPath)
.map(d=>{
val arr = d.split(“,”)
ReceiptEvent(arr(0),arr(1),arr(2).toLong) // 把数据读出来转换成想要的样例类类型
}).assignAscendingTimestamps(_.ts * 1000L) // 指定 ts 字段
.keyBy(_.orderId) // 依照订单 id 分组
// 3、合并两条流,进行解决
val resultStream = orderEventStream.connect(receiptEventStream)
.process(new TxPayMatchResult())
resultStream.print(“match: “)
resultStream.getSideOutput(new OutputTag[OrderEvent]).print(“unmatched-pay”)
resultStream.getSideOutput(new OutputTag[ReceiptEvent]).print(“unmatched-receipt”)
env.execute()
}
}
class TxPayMatchResult() extends CoProcessFunction[OrderEvent,ReceiptEvent,(OrderEvent,)]{
`lazy val orderEventState: ValueState[`OrderEvent`] = getRuntimeContext.getState(new ValueStateDescriptor[`OrderEvent`]
`lazy val receiptEventState: ValueState[`ReceiptEvent`] = getRuntimeContext.getState(new ValueStateDescriptor[`ReceiptEvent`](`"payEvent", classOf[ReceiptEvent]`))`
//
定义自定义侧输入流
`val unmatchedOrderEventTag = new OutputTag[`OrderEvent`](`"unmatched-pay"`)`
`val unmatchedReceiptEventTag = new OutputTag[`ReceiptEvent`](`"receipt"`)`
override def processElement1(in1: OrderEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 判断领取账单来了
val receiptEvent = receiptEventState.value()
if(receiptEvent != null){
// 如果账单曾经过去了,那间接输入
collector.collect((in1,receiptEvent))
orderEventState.clear()
receiptEventState.clear()
}else{
// 如果没来,那就注册一个定时器,期待 10 秒钟
context.timerService().registerEventTimeTimer(in1.ts*1000L + 10000L)
orderEventState.update(in1)
}
}
override def processElement2(in2: ReceiptEvent, context: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
// 判断领取事件来了
val orderEvent = orderEventState.value()
if(orderEvent != null){
// 如果账单曾经过去了,那间接输入
collector.collect((orderEvent,in2))
orderEventState.clear()
receiptEventState.clear()
}else{
// 如果没来,那就注册一个定时器,期待 2 秒钟
context.timerService().registerEventTimeTimer(in2.ts*1000L + 2000L)
receiptEventState.update(in2)
}
}
override def onTimer(timestamp: Long, ctx: CoProcessFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]#OnTimerContext, out: Collector[(OrderEvent, ReceiptEvent)]): Unit = {
if(orderEventState.value() != null){
ctx.output(unmatchedOrderEventTag, orderEventState.value())
}
else if(receiptEventState.value() != null){
ctx.output(unmatchedReceiptEventTag, receiptEventState.value())
}
orderEventState.clear()
receiptEventState.clear()
}
`}`
第二种,应用 join 来实现这个成果
这种形式长处是跟不便了,做了一层封装,毛病也很显著如果要实现一些简单状况如没匹配中的也输入之类的就不行了,具体看理论场景须要
/*
*
* @author mafei
* @date 2021/1/31
*/
package com.mafei.orderPayMonitor
import com.mafei.orderPayMonitor.TxMatch.getClass
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
object TxMatchWithJoin {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
`env.setParallelism(`1`)`
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
_// 1__、从订单文件中读取数据_
`val resource = getClass.getResource(`"/OrderLog.csv"`)`
val orderEventStream = env.readTextFile(resource.getPath)
.map(d=>{
`val arr = d.split(`","`)`
`OrderEvent(arr(`0`).toLong,arr(`1`),arr(`2`), arr(`3`).toLong)` _//__把数据读出来转换成想要的样例类类型_
`}).assignAscendingTimestamps(_.ts *` 1000`L)` _//__指定 ts 字段_
`.filter(_.eventType==`"pay"`)`
`.keyBy(_.txId)` _//__依照交易 id 分组_
_// 2__、从账单中读取数据_
`val receiptResource = getClass.getResource(`"/ReceiptLog.csv"`)`
val receiptEventStream = env.readTextFile(receiptResource.getPath)
.map(d=>{
`val arr = d.split(`","`)`
`ReceiptEvent(arr(`0`),arr(`1`),arr(`2`).toLong)` _//__把数据读出来转换成想要的样例类类型_
`}).assignAscendingTimestamps(_.ts *` 1000`L)` _//__指定 ts 字段_
`.keyBy(_.orderId)` _//__依照订单 id 分组_
val resultStream = orderEventStream.intervalJoin(receiptEventStream)
`.between(Time.seconds(`-3`), Time.seconds(`5`))`
.process(new TxMatchWithJoinResult())
resultStream.print()
env.execute()
}
}
class TxMatchWithJoinResult() extends ProcessJoinFunction[OrderEvent, ReceiptEvent,(OrderEvent,ReceiptEvent)]{
`override def processElement(in1: OrderEvent, in2: ReceiptEvent, context: ProcessJoinFunction[OrderEvent, ReceiptEvent, (OrderEvent, ReceiptEvent)]`**#Context, collector: Collector[(OrderEvent, ReceiptEvent)]): Unit = {**
collector.collect((in1,in2))
}
`}`