Process Function API
后面学习的Transformations 是无法访问事件的工夫戳和水位线信息的,如MapFunction的map转换算子是无法访问工夫戳和以后事件的事件工夫。基于此,DataStream API提供了一系列的Low Level转换算子--Process Function API,与高层算子不同,通过这些底层转换算子咱们能够拜访数据的工夫戳,watermark以及注册定时事件。Process Function 用来构建事件驱动的利用和实现自定义的业务逻辑。例如Flink SQL就是用Process Function 实现的。
Flink为咱们提供了8中process function:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessJoinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
于是我抉择了基于Flink的电商用户行为数据分析的我的项目,进行对Flink的KeyedProcessFunction进行全面学习与意识。
首先咱们重点意识下KeyedProcessFunction,KeyedProcessFunction 用来操作 KeyedStream,KeyedProcessFunction 会解决流的每一个元素,KeyedProcessFunction[KEY, IN, OUT]还额定提供了两个办法:
processElement(v: IN, ctx: Context, out: Collector[OUT])
, 流中的每一个元素都会调用这个办法,调用后果将会放在 Collector 数据类型中输入。Context能够拜访元素的工夫戳,元素的 key,以及 TimerService 工夫服务。Context还能够将后果输入到别的流(side outputs)。onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT])
是一个回调函数。当之前注册的定时器触发时调用。参数 timestamp 为定时器所设定的触发的工夫戳。Collector 为输入后果的汇合。OnTimerContext 和 processElement 的 Context 参数一样,提供了上下文的一些信息,例如定时器触发的工夫信息(事件工夫或者解决工夫)。当定时器 timer 触发时,会执行回调函数 onTimer(),留神定时器 timer 只能在keyed streams 下面应用。
实时热门页面流量统计
根本需要
- 从 web 服务器的日志中,统计实时的热门拜访页面
- 统计每分钟的 ip 访问量,取出访问量最大的 5 个地址,每 5 秒更新一次
解决思路
- 将 apache 服务器日志中的工夫,转换为工夫戳,作为 Event Time
- 构建滑动窗口,窗口长度为 1 分钟,滑动间隔为 5 秒
import java.text.SimpleDateFormatimport model.ApacheLogEventimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironmentimport org.apache.flink.streaming.api.windowing.time.Timeimport org.apache.flink.streaming.api.scala._/** * 统计近 1 个小时内的热门商品,每5分钟更新一次 */object NetWorkFlowAnalysis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(5) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val resource = getClass.getResource("/apache.log").getPath val inputStream = env.readTextFile(resource) val dataStream = inputStream .map(data => { val arr = data.split(" ") ApacheLogEvent(arr(0), arr(1), new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss").parse(arr(3)).getTime, arr(5), arr(6)) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.seconds(1)) { override def extractTimestamp(t: ApacheLogEvent): Long = t.timestamp }) .keyBy(_.url) .timeWindow(Time.minutes(10), Time.seconds(5)) .aggregate(new NetWorkFlowAggregateFunction, new NetWorkFlowWindowFunction) .keyBy(_.windowEnd) .process(new NetWorkFlowKeyedProcessFunction(3)) .print() env.execute("NetWorkFlowAnalysis") }}
实时流量统计—PV 和 UV
根本需要
- 从埋点日志中,统计实时的 PV 和 UV
解决思路
- 统计埋点日志中的 pv 行为,利用 Set 数据结构进行去重
object PageView { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val resource = getClass.getResource("/UserBehavior.csv").getPath val inputStream = env.readTextFile(resource) val dataStream = inputStream .map(data => { val arr = data.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) .filter(_.behavior == "pv") .map(data => ("pv", 1)) .keyBy(_._1) .timeWindow(Time.hours(1)) .aggregate(new PvAggregateFunction, new PvWindowFunction) .keyBy(_.windowEnd) .process(new PvKeyedProcessFunction) .print() env.execute("PageView") }}
object UniqueVisitor { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val resource = getClass.getResource("/UserBehavior.csv").getPath val inputStream = env.readTextFile(resource) val dataStream = inputStream .map(data => { val arr = data.split(",") UserBehavior(arr(0).toLong, arr(1).toLong, arr(2).toInt, arr(3), arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) .filter(_.behavior == "pv") .timeWindowAll(Time.hours(1)) .apply(new UvAllWindowFunction) .print() env.execute() }}
市场营销剖析— APP市场推广打算
根本需要
- 从埋点日志中,统计 APP 市场推广的数据指标
- 依照不同的推广渠道,别离统计数据
解决思路
- 通过过滤日志中的用户行为数据,依照不同的渠道进行统计
- 能够用 Process function 解决,失去自定义的输入数据信息
object AppMarket { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sourceStream = env.addSource(new SimulatedSourceFunction).assignAscendingTimestamps(_.timestamp) val processStream = sourceStream .keyBy(data => (data.channel, data.behavior)) .timeWindow(Time.days(1), Time.seconds(5)) .process(new AppMarketProcessWindowFunction) .print() env.execute() }}
市场营销剖析— 页面广告统计
根本需要
- 从埋点日志中,统计每小时页面广告的点击量, 5 秒刷新一次,并依照不同省份进行划分
- 对于 ”刷单“ 式的频繁点击行为进行过滤,并将该用户退出黑名单
解决思路
- 依据省份进行分组,创立长度为 1 小时、滑动间隔为 5 秒的工夫窗口进行统计
- 能够用 process function 进行黑名单过滤,检测用户对同一广告的点击量,
如果超过下限则将用户信息以测输入流流出到黑名单中
object BrushOrderAlert { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val resource = getClass.getResource("/AdClickLog.csv").getPath val socketStream = env.readTextFile(resource) val dataStream = socketStream .map(data => { val arr = data.split(",") UserAdvertClick(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4).toLong) }) .assignAscendingTimestamps(_.timestamp * 1000L) //将有刷单行为的用户输入到侧输入流(黑名单报警) val filterStream = dataStream .keyBy(data=>(data.userId, data.advertId)) .process(new BrushOrderKeyedProcessFunction(100)) val aggProviceStream = filterStream .keyBy(_.province) .timeWindow(Time.hours(1), Time.seconds(5)) .aggregate(new BrushOrderAggregateFunction, new BrushOrderWindowFunction) aggProviceStream.print("各省份下单详情") filterStream.getSideOutput(new OutputTag[BlackList]("BlackList")).print("歹意刷单详情") env.execute() }}
歹意登录监控
根本需要
- 用户在短时间内频繁登录失败,有程序歹意攻打的可能
- 同一用户(能够是不同 Ip)在 2 秒内间断两次登录失败,须要报警
解决思路
- 将用户的登录失败行为存入 ListState,设定定时器 2 秒后触发,查看 ListState 中有几次失败登录
- 更加准确的检测,能够应用 CEP 库实现事件流的模式匹配
object ContinuousLoginFailure { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val resource = getClass.getResource("/LoginLog.csv").getPath val socketStream = env.readTextFile(resource) val dataStream = socketStream .map(data => { val arr = data.split(",") LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(3)) { override def extractTimestamp(t: LoginEvent): Long = t.timestamp * 1000L }) val loginStream = dataStream .keyBy(_.userId) .process(new LoginFailAdvanceKeyedProcessFunction(2)) dataStream.print("LoginEvent"); loginStream.print("LoginFailWarning") env.execute() }}
订单领取实时监控
根本需要
- 用户下单之后,应设置订单生效工夫,以进步用户领取的志愿,并升高零碎危险
- 用户下单后 15 分钟未领取,则输入监控信息
解决思路
- 利用 CEP 库进行事件流的模式匹配,并设定匹配的工夫距离
- 也能够利用状态编程,用 process function 实现解决逻辑
object OrderTimeoutWithCep { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val resource = getClass.getResource("/OrderLog.csv").getPath val inputStream = env.readTextFile(resource) val orderEventStream = inputStream .map(data => { val dataArray = data.split(",") OrderEvent(dataArray(0).trim.toLong, dataArray(1).trim, dataArray(2).trim, dataArray(3).trim.toLong) }) .assignAscendingTimestamps(_.eventTime * 1000L) .keyBy(_.orderId) //带工夫限度的pattern val orderPayPattern = Pattern.begin[OrderEvent]("begin") .where(_.eventType.equals("create")) .followedBy("follow") .where(_.eventType.equals("pay")) .within(Time.minutes(15)) //将pattern作用到inputStream上,失去一个patternStream val patternStream = CEP.pattern(orderEventStream, orderPayPattern) val orderTimeoutTag = OutputTag[OrderResult]("orderTimeoutTag") //调用select,对超时的做测流输入报警 val resultStream = patternStream.select(orderTimeoutTag, new OrderTimeOutPatternTimeoutFunction, new OrderPayPatternSelectFunction) resultStream.print("payed") resultStream.getSideOutput(orderTimeoutTag).print("timeout") env.execute() }}
侧输入流(SideOutput)
大部分的 DataStream API 的算子的输入是繁多输入,也就是某种数据类型的流。processfunction 的 side outputs 性能能够产生多条流,并且这些流的数据类型能够不一样。一个 side output 能够定义为 OutputTag[X]对象,X 是输入流的数据类型。processfunction 能够通过 Context 对象发射一个事件到一个或者多个 sideoutputs。
/** * 1s之内温度间断回升就报警 */object TempIncreaseAlert { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) val socketStream = env.socketTextStream("localhost", 9998) val dataStream = socketStream .map(data => { val arr = data.split(",") TempReading(arr(0).trim, arr(1).trim.toLong, arr(2).trim.toDouble) }) .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TempReading](Time.seconds(1)) { override def extractTimestamp(t: TempReading): Long = {t.timestamp + 1000L} }) val processStream = dataStream .keyBy(_.id) .process(new TempKeyedProcessFunction) dataStream.print("data") processStream.getSideOutput(new OutputTag[String]("alert")).print("output") env.execute() }}
我把代码上传到我的github上,具体代码请移步:flink_behavior_analysis