1.简介大数据开发-表数据稳定、码值散布稳定监控 && 报警,是对于实践和设计局部,初步计算曾经写完,治理平台局部,后续欠缺,本文次要针对模块设计局部,整体模块实现上是离线数据源的异步模块,分为指标跑批模块,监控报警模块,平台治理模块,指标跑批模块和监控报警模块次要是基于离线数据的表的通信,即不是耦合的架构,别离是两个例行的工作,看上面的图,
2.阐明其中表设计了五张表,别离是指标跑批记录表,报警跑批记录表,和mysql表监控配置表,报警配置表,散布字段配置表,指标跑批和监控跑批不间接工作依赖,而是通过指标跑批记录表,间接产生报警跑批记录
3.代码实现整个我的项目目录构造如下图:
resource: 配置文件
common: 一些公共的模块,Builder是发送音讯构建器,Father是Spark我的项目的公共代码
rules: 上面有5个指标的规定,别离是查看分区是否存储,查看分区数量是否大于某个值,查看分区数量稳定,查看散布,查看散布大向量稳定
utils: 外面放的是一些工具,比方日期解决工具类,表格局解决工具,sql解决工具等
Monitor: 指标跑批的主类
SunRobot: 报警跑批的主类入库
4.Monitor类阐明rule的一些实现,不细说了,依据源代码很难看懂,而Monitor是怎么依据这些规定,生成对应的流水,次要实现代码如下:
package com.houltimport com.beust.jcommander.JCommanderimport com.hoult.common.Fatherimport com.hoult.rules.{Rule1, Rule2, Rule3, Rule4, Rule5, TableMonitorConf, TableMonitorRecord}import com.hoult.utils.{DateTool, PropertiesUtils}import org.apache.spark.sql.Datasetimport scala.collection.mutable.ListBufferobject Monitor extends Father { val mysqlProps = PropertiesUtils.getMysqlProps() var broadTableConfs: Dataset[TableMonitorConf] = null def main(args: Array[String]): Unit = { val info: ObserverArgs = new ObserverArgs println("入参:: " + args.mkString(",")) JCommander.newBuilder().addObject(info).build().parse(args.toArray: _*) //播送配置表 prepare() //生成表 * 规定 个 dataframe import spark.implicits._ val tableConfArray: Array[TableMonitorConf] = spark.sql("select * from table_monitor_conf where db_table_name !='default.default'").as[TableMonitorConf].collect() val defaultTableConf = spark.sql("select * from table_monitor_conf where db_table_name ='default.default'").as[TableMonitorConf].collect().take(1)(0) var ll: ListBuffer[Dataset[TableMonitorRecord]] = ListBuffer[Dataset[TableMonitorRecord]]() //所有规定一起跑 //默认值填充 val tConfs = tableConfArray.map( conf => { TableMonitorConf( if(conf.db_table_key == null) defaultTableConf.db_table_key else conf.db_table_key, conf.db_table_name, if (conf.table_charge_people == null) defaultTableConf.table_charge_people else conf.table_charge_people, if (conf.done_path == null) defaultTableConf.done_path else conf.done_path, if (conf.where_condition == null) defaultTableConf.where_condition else conf.where_condition, if (conf.if_done == null) defaultTableConf.if_done else conf.if_done, if (conf.if_check_partition == null) defaultTableConf.if_check_partition else conf.if_check_partition, if (conf.if_check_partition_count == null) defaultTableConf.if_check_partition_count else conf.if_check_partition_count, if (conf.if_check_partition_count_fluctuates == null) defaultTableConf.if_check_partition_count_fluctuates else conf.if_check_partition_count_fluctuates, if (conf.if_check_distribute == null) defaultTableConf.if_check_distribute else conf.if_check_distribute, if (conf.if_check_distribute_fluctuates == null) defaultTableConf.if_check_distribute_fluctuates else conf.if_check_distribute_fluctuates )}) //遍历所有规定 for (elem <- tConfs) { //规定1 if ("1".equals(elem.if_check_partition)) { ll +:= Rule1.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //规定2 if ("1".equals(elem.if_check_partition_count)) { ll +:= Rule2.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //规定3 if ("1".equals(elem.if_check_partition_count_fluctuates)) { ll +:= Rule3.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //规定4 if ("1".equals(elem.if_check_distribute)) { ll +:= Rule4.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } //规定5 if ("1".equals(elem.if_check_distribute_fluctuates)) { ll +:= Rule5.runRule(elem.db_table_name, elem.db_table_key, elem.where_condition, info.runDay) } } if (ll.size == 0) return ll.reduce(_.union(_)).select( "db_table_key", "db_table_name", "check_data_time", "rule_name", "rule_result", "rule_error", "checked_partition" ).createOrReplaceTempView("temp_table_rule_records") val partition = DateTool.getLatest30minutePatition spark.sql("set hive.reduce.tasks=1") spark.sql(s"insert overwrite table table_monitor.table_rule_records partition(dt=${info.runDay},hm=$partition) select * from temp_table_rule_records") } def prepare(): Unit = { import spark.implicits._ //1.根底配置表缓存到集群 table_monitor_conf val tableConfs: Dataset[TableMonitorConf] = spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_conf", mysqlProps).as[TableMonitorConf].cache() tableConfs.createOrReplaceTempView("table_monitor_conf") //2.配置表缓存到集群 table_monitor_distribute_conf spark.read.jdbc(mysqlProps.getProperty("url"), "table_monitor_distribute_conf", mysqlProps).cache().createOrReplaceTempView("table_monitor_distribute_conf") }}整顿流程就是读配置表的信息,包含设置默认参数,最初就是调用拿进去的配置表的信息和规定的信息
...