大家好,我是大圣

最近大圣在工作中遇到这样一个数据开发的需要,就是监控用户在某个产品的哪几个界面产生的用户行为,其实简略来说就是计算用户在咱们这个产品的某些界面的PV/UV,以及用户前一个操作界面和前面一个操作界面之间的行为数据。

我的思路是和前端埋点人员约定好这几个界面的事件点击类型,而后利用Flink框架从Kafka外面实时生产数据,针对每一个事件点击类型去编写一个Flink Job,接着在具体的Flink Job外面去实现要监控用户的行为指标。思路如下图:

这个计划是的确能够的,我就是这样实现的,最初我编写了8个Flink Job。然而在代码编写过程中发现了大量的反复代码,因为这个8个Flink Job 可能就是分组的字段不一样或者每个Flink Job 的解决细节不一样。然而我这样编写了这么多个Job 的话会导致保护起来很麻烦,代码的可扩展性也不好,而且我每一个Flink Job都会去生产Kafka的同一条数据,所以这样做是不可取的,所以这里能够采纳工厂设计模式把多个Flink Job合并成一个Flink Job。

本文次要是讲利用工厂设计模式把多个Flink Job合并成一个Flink Job,所以工厂设计模式的这一块次要简略介绍一下最简略的工厂设计模式,如下图:

这个模式是最简略的工厂设计模式,因为它其实就是对不同类对象的创立进行了一层封装,通过向工厂模式传递不同的点击事件类型来指定要创立的对象。本文所用的也正是这种形式。

上面次要讲怎么在Flink Job中应用工厂模式去合并多个Flink Job,废话不多说间接上图:

这是咱们在从Kafka生产到数据之后,咱们依据不同的点击事件类型把数据封装成不同的Bean对象。我在这里是创立了一个BaseBean,对于每一种点击事件类型再创立本人的Bean对象,让每一种点击事件类型去继承这个BaseBean这个父类。再利用工厂模式去依据不同的点击事件类型去创立不同的Bean对象,如下图:

这是利用工厂设计模式去依据不同的点击事件类型去创立不同的用户行为的类外面的initBean()办法外面初始化本人的Bean对象,如下图:

这样咱们在流中就能够返回BaseBean,用这个BaseBean去持续向上游去传递,这个中央用到了Java多态中的向上转型,多个子类转化为一个父类。

上面咱们来看一下Flink Job中流的框架代码,如下图:

当BaseBean达到process办法的时候,这个时候咱们就去实现计算PV/UV的逻辑,这个时候咱们还是能够利用点击事件类型把不同的点击类型去交给不同的行为类去进行解决,如下图:

咱们在行为类外面去把计算的后果封装成一个BaesModel,让每个行为类也创立一个计算结果的实体类去继承这个BaseModel,这里和下面是一样的,而后在这外面咱们就把BaseModel向上游去传递到addSink算子外面,如下图:

最初在存入数据库的时候,还是利用点击事件类型不同去执行不同的行为类代码,然而因为是BaseModel是一个父类,所以咱们要把他转为为每一个具体的子类,这个中央须要用到Java多态中的向下转型,如下图:

这样就能够实现利用工厂设计模式把多个FlinkJob合并成一个Flink Job,而且程序比拟好扩大当前再有什么新的业务指标咱们能够在每一个行为类外面去进行扩大就行了。

文章中有说的不分明的,或者想要文章的示例代码的能够扫码上面的图片,关注微信公众号而后点击分割我,加我微信与我交换就行。

本文由博客一文多发平台 OpenWrite 公布!