元数据管理是数据仓库的外围,它不仅定义了数据仓库有什么,还指明了数据仓库中数据的内容和地位,刻画了数据的提取和转换规则,存储了与数据仓库主题无关的各种商业信息。本文次要介绍 Hive Hook 和 MetaStore Listener,应用这些性能能够进行主动的元数据管理。通过本文你能够理解到:
- 元数据管理
- Hive Hooks 和 Metastore Listeners
- Hive Hooks根本应用
- Metastore Listeners根本应用
元数据管理
元数据定义
依照传统的定义,元数据(Metadata)是对于数据的数据。元数据买通了源数据、数据仓库、数据利用,记录了数据从产生到生产的全过程。元数据次要记录数据仓库中模型的定义、各层级间的映射关系、监控数据仓库的数据状态及 ETL 的工作运行状态。在数据仓库零碎中,元数据能够帮忙数据仓库管理员和开发人员十分不便地找到他们所关怀的数据,用于领导其进行数据管理和开发工作,进步工作效率。将元数据按用处的不同分为两类:技术元数据(Technical Metadata)和业务元数据(Business Metadata)。技术元数据是存储对于数据仓库零碎技术细节的数据,是用于开发和治理数据仓库应用的数据。
元数据分类
技术元数据
- 分布式计算零碎存储元数据
如 Hive 表、列、分区等信息。记录了表的表名。分区信息、责任人信息、文件大小、表类型,以及列的字段名、字段类型、字段备注、是否是分区字段等信息。
- 分布式计算零碎运行元数据
相似于 Hive 的 Job 日志,包含作业类型、实例名称、输入输出、SQL、运行参数、执行工夫等。
- 任务调度元数据
工作的依赖类型、依赖关系等,以及不同类型调度工作的运行日志等。
业务元数据
业务元数据从业务角度形容了数据仓库中的数据,它提供了介于使用者和理论零碎之间的语义层,使得不懂计算机技术的业务人员也可能“读懂”数据仓库中的数据。常见的业务元数据有:如维度及属性、业务过程、指标等的规范化定义,用于更好地治理和应用数据;数据利用元数据,如数据报表、数据产品等的配置和运行元数据。
元数据利用
数据的真正价值在于数据驱动决策,通过数据领导经营。通过数据驱动的办法,咱们可能判断趋势,从而开展无效口头,帮忙本人发现问题,推动翻新或解决方案的产生。这就是数据化经营。同样,对于元数据,能够用于领导数据相干人员进行日常工作,实现数据化“经营”。比方对于数据使用者,能够通过元数据让其疾速找到所须要的数据;对于 ETL 工程师,能够通过元数据领导其进行模型设计、工作优化和工作下线等各种日常 ETL 工作;对于运维工程师,能够通过元数据领导其进行整个集群的存储、计算和系统优化等运维工作。
Hive Hooks 和 Metastore Listeners
Hive Hooks
对于数据治理和元数据管理框架,业界有许多开源的零碎,比方 Apache Atlas,这些开源的软件能够在简单的场景下满足元数据管理的需要。其实Apache Atlas 对于 Hive 的元数据管理,应用的是 Hive 的 Hooks。须要进行如下配置:
<property>
<name>hive.exec.post.hooks</name>
<value>org.apache.atlas.hive.hook.HiveHook<value/>
</property>
通过 Hook 监听 Hive 的各种事件,比方创立表,批改表等,而后依照特定的格局把收集的数据推送到 Kafka,最初生产元数据并存储。
Hive Hooks 分类
那么,到底什么是 Hooks 呢?
Hooks 是一种事件和音讯机制,能够将事件绑定在外部 Hive 的执行流程中,而无需从新编译 Hive。Hook 提供了扩大和继承内部组件的形式。依据不同的 Hook 类型,能够在不同的阶段运行。对于 Hooks 的类型,次要分为以下几种:
- hive.exec.pre.hooks
从名称能够看出,在执行引擎执行查问之前被调用。这个须要在 Hive 对查问打算进行过优化之后才能够应用。应用该 Hooks 须要实现接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具体在 hive-site.xml 中的配置如下:
<property>
<name>hive.exec.pre.hooks</name>
<value> 实现类的全限定名 <value/>
</property>
- hive.exec.post.hooks
在执行打算执行完结后果返回给用户之前被调用。应用时须要实现接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具体在 hive-site.xml 中的配置如下:
<property>
<name>hive.exec.post.hooks</name>
<value> 实现类的全限定名 <value/>
</property>
- hive.exec.failure.hooks
在执行打算失败之后被调用。应用时须要实现接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext, 具体在 hive-site.xml 中的配置如下:
<property>
<name>hive.exec.failure.hooks</name>
<value> 实现类的全限定名 <value/>
</property>
- hive.metastore.init.hooks
HMSHandler 初始化是被调用。应用时须要实现接口:org.apache.hadoop.hive.metastore.MetaStoreInitListener,具体在 hive-site.xml 中的配置如下:
<property>
<name>hive.metastore.init.hooks</name>
<value> 实现类的全限定名 <value/>
</property>
- hive.exec.driver.run.hooks
在 Driver.run 开始或完结时运行,应用时须要实现接口:org.apache.hadoop.hive.ql.HiveDriverRunHook,具体在 hive-site.xml 中的配置如下:
<property>
<name>hive.exec.driver.run.hooks</name>
<value> 实现类的全限定名 <value/>
</property>
- hive.semantic.analyzer.hook
Hive 对查问语句进行语义剖析的时候调用。应用时须要集成抽象类:org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook,具体在 hive-site.xml 中的配置如下:
<property>
<name>hive.semantic.analyzer.hook</name>
<value> 实现类的全限定名 <value/>
</property>
Hive Hooks 的优缺点
-
长处
- 能够很不便地在各种查问阶段嵌入或者运行自定义的代码
- 能够被用作更新元数据
-
毛病
- 当应用 Hooks 时,获取到的元数据通常须要进一步解析,否则很难了解
- 会影响查问的过程
对于 Hive Hooks,本文将给出 hive.exec.post.hook 的应用案例,该 Hooks 会在查问执行之后,返回后果之前运行。
Metastore Listeners
所谓 Metastore Listeners,指的是对 Hive metastore 的监听。用户能够自定义一些代码,用来应用对元数据的监听。
当咱们看 HiveMetaStore 这个类的源码时,会发现:在创立 HiveMetaStore 的 init()办法中,同时创立了三种 Listener, 别离为 MetaStorePreEventListener,MetaStoreEventListener 和 MetaStoreEndFunctionListener,这些 Listener 用于对每一步事件的监听。
public class HiveMetaStore extends ThriftHiveMetastore {
// ... 省略代码
public static class HMSHandler extends FacebookBase implements
IHMSHandler {
// ... 省略代码
public void init() throws MetaException {
// ... 省略代码
// 获取 MetaStorePreEventListener
preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,
hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));
// 获取 MetaStoreEventListener
listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,
hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));
listeners.add(new SessionPropertiesListener(hiveConf));
// 获取 MetaStoreEndFunctionListener
endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(
MetaStoreEndFunctionListener.class,
hiveConf,
hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));
// ... 省略代码
}
}
}
Metastore Listeners 分类
- hive.metastore.pre.event.listeners
须要扩大此抽象类,以提供在 metastore 上产生特定事件之前须要执行的操作实现。在 metastore 上产生事件之前,将调用这些办法。
应用时须要继承抽象类:org.apache.hadoop.hive.metastore.MetaStorePreEventListener,在 Hive-site.xml 中的配置为:
<property>
<name>hive.metastore.pre.event.listeners</name>
<value> 实现类的全限定名 </value>
</property>
- hive.metastore.event.listeners
须要扩大此抽象类,以提供在 metastore 上产生特定事件时须要执行的操作实现。每当 Metastore 上产生事件时,就会调用这些办法。
应用时须要继承抽象类:org.apache.hadoop.hive.metastore.MetaStoreEventListener,在 Hive-site.xml 中的配置为:
<property>
<name>hive.metastore.event.listeners</name>
<value> 实现类的全限定名 </value>
</property>
- hive.metastore.end.function.listeners
每当函数完结时,将调用这些办法。
应用时须要继承抽象类:org.apache.hadoop.hive.metastore.MetaStoreEndFunctionListener ,在 Hive-site.xml 中的配置为:
<property>
<name>hive.metastore.end.function.listeners</name>
<value> 实现类的全限定名 </value>
</property>
Metastore Listeners 优缺点
-
长处
- 元数据曾经被解析好了,很容易了解
- 不影响查问的过程,是只读的
-
毛病
- 不灵便,仅仅可能拜访属于以后事件的对象
对于 metastore listener,本文会给出 MetaStoreEventListener 的应用案例,具体会实现两个办法:onCreateTable 和 onAlterTable
Hive Hooks根本应用
代码
具体实现代码如下:
public class CustomPostHook implements ExecuteWithHookContext {private static final Logger LOGGER = LoggerFactory.getLogger(CustomPostHook.class);
// 存储 Hive 的 SQL 操作类型
private static final HashSet<String> OPERATION_NAMES = new HashSet<>();
// HiveOperation 是一个枚举类,封装了 Hive 的 SQL 操作类型
// 监控 SQL 操作类型
static {
// 建表
OPERATION_NAMES.add(HiveOperation.CREATETABLE.getOperationName());
// 批改数据库属性
OPERATION_NAMES.add(HiveOperation.ALTERDATABASE.getOperationName());
// 批改数据库属主
OPERATION_NAMES.add(HiveOperation.ALTERDATABASE_OWNER.getOperationName());
// 批改表属性, 增加列
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_ADDCOLS.getOperationName());
// 批改表属性, 表存储门路
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_LOCATION.getOperationName());
// 批改表属性
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_PROPERTIES.getOperationName());
// 表重命名
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_RENAME.getOperationName());
// 列重命名
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_RENAMECOL.getOperationName());
// 更新列, 先删除以后的列, 而后退出新的列
OPERATION_NAMES.add(HiveOperation.ALTERTABLE_REPLACECOLS.getOperationName());
// 创立数据库
OPERATION_NAMES.add(HiveOperation.CREATEDATABASE.getOperationName());
// 删除数据库
OPERATION_NAMES.add(HiveOperation.DROPDATABASE.getOperationName());
// 删除表
OPERATION_NAMES.add(HiveOperation.DROPTABLE.getOperationName());
}
@Override
public void run(HookContext hookContext) throws Exception {assert (hookContext.getHookType() == HookType.POST_EXEC_HOOK);
// 执行打算
QueryPlan plan = hookContext.getQueryPlan();
// 操作名称
String operationName = plan.getOperationName();
logWithHeader("执行的 SQL 语句:" + plan.getQueryString());
logWithHeader("操作名称:" + operationName);
if (OPERATION_NAMES.contains(operationName) && !plan.isExplain()) {logWithHeader("监控 SQL 操作");
Set<ReadEntity> inputs = hookContext.getInputs();
Set<WriteEntity> outputs = hookContext.getOutputs();
for (Entity entity : inputs) {logWithHeader("Hook metadata 输出值:" + toJson(entity));
}
for (Entity entity : outputs) {logWithHeader("Hook metadata 输入值:" + toJson(entity));
}
} else {logWithHeader("不在监控范畴,疏忽该 hook!");
}
}
private static String toJson(Entity entity) throws Exception {ObjectMapper mapper = new ObjectMapper();
// entity 的类型
// 次要包含:// DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR, FUNCTION
switch (entity.getType()) {
case DATABASE:
Database db = entity.getDatabase();
return mapper.writeValueAsString(db);
case TABLE:
return mapper.writeValueAsString(entity.getTable().getTTable());
}
return null;
}
/**
* 日志格局
*
* @param obj
*/
private void logWithHeader(Object obj) {LOGGER.info("[CustomPostHook][Thread:" + Thread.currentThread().getName() + "] |" + obj);
}
}
应用过程解释
首先将上述代码编译成 jar 包,放在 $HIVE_HOME/lib 目录下,或者应用在 Hive 的客户端中执行增加 jar 包的命令:
0: jdbc:hive2://localhost:10000> add jar /opt/softwares/com.jmx.hive-1.0-SNAPSHOT.jar;
接着配置 Hive-site.xml 文件,为了不便,咱们间接应用客户端命令进行配置:
0: jdbc:hive2://localhost:10000> set hive.exec.post.hooks=com.jmx.hooks.CustomPostHook;
查看表操作
下面的代码中咱们对一些操作进行了监控,当监控到这些操作时会触发一些自定义的代码(比方输入日志)。当咱们在 Hive 的 beeline 客户端中输出上面命令时:
0: jdbc:hive2://localhost:10000> show tables;
在 $HIVE_HOME/logs/hive.log 文件能够看到:
[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 执行的 SQL 语句: show tables
[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 操作名称: SHOWTABLES
[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 不在监控范畴,疏忽该 hook!
下面的查看表操作,不在监控范畴,所以没有绝对应的元数据日志。
建表操作
当咱们在 Hive 的 beeline 客户端中创立一张表时,如下:
CREATE TABLE testposthook(
id int COMMENT "id",
name string COMMENT "姓名"
)COMMENT "建表_测试 Hive Hooks"
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/hive/warehouse/';
察看 hive.log 日志:
下面的 Hook metastore 输入值有两个:第一个是数据库的元数据信息 , 第二个是表的元数据信息
- 数据库元数据
{
"name":"default",
"description":"Default Hive database",
"locationUri":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"parameters":{ },
"privileges":null,
"ownerName":"public",
"ownerType":"ROLE",
"setParameters":true,
"parametersSize":0,
"setOwnerName":true,
"setOwnerType":true,
"setPrivileges":false,
"setName":true,
"setDescription":true,
"setLocationUri":true
}
- 表元数据
{
"tableName":"testposthook",
"dbName":"default",
"owner":"anonymous",
"createTime":1597985444,
"lastAccessTime":0,
"retention":0,
"sd":{"cols":[],
"location":null,
"inputFormat":"org.apache.hadoop.mapred.SequenceFileInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe",
"parameters":{"serialization.format":"1"},
"setSerializationLib":true,
"setParameters":true,
"parametersSize":1,
"setName":false
},
"bucketCols":[ ],
"sortCols":[ ],
"parameters":{ },
"skewedInfo":{"skewedColNames":[],
"skewedColValues":[ ],
"skewedColValueLocationMaps":{ },
"skewedColNamesIterator":[ ],
"skewedColValuesSize":0,
"skewedColValuesIterator":[ ],
"skewedColValueLocationMapsSize":0,
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0
},
"storedAsSubDirectories":false,
"colsSize":0,
"setParameters":true,
"parametersSize":0,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"setSkewedInfo":true,
"colsIterator":[ ],
"setCompressed":false,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[ ],
"sortColsSize":0,
"sortColsIterator":[ ],
"setStoredAsSubDirectories":false,
"setCols":true,
"setLocation":false,
"setInputFormat":true
},
"partitionKeys":[ ],
"parameters":{ },
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"partitionKeysSize":0,
"setDbName":true,
"setSd":true,
"setParameters":true,
"setCreateTime":true,
"setLastAccessTime":false,
"parametersSize":0,
"setTableName":true,
"setPrivileges":false,
"setOwner":true,
"setPartitionKeys":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setRetention":false,
"partitionKeysIterator":[ ],
"setTemporary":false,
"setRewriteEnabled":false
}
咱们发现下面的表元数据信息中,cols[]列没有数据,即没有建表时的字段 id
和字段 name
的信息。如果要获取这些信息,能够执行上面的命令:
ALTER TABLE testposthook
ADD COLUMNS (age int COMMENT '年龄');
再次察看日志信息:
下面的日志中,Hook metastore 只有一个输出和一个输入:都示意 table 的元数据信息。
- 输出
{
"tableName":"testposthook",
"dbName":"default",
"owner":"anonymous",
"createTime":1597985445,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{"serialization.format":"","field.delim":" "},"setSerializationLib":true,"setParameters":true,"parametersSize":2,"setName":false
},
"bucketCols":[ ],
"sortCols":[ ],
"parameters":{ },
"skewedInfo":{"skewedColNames":[],
"skewedColValues":[ ],
"skewedColValueLocationMaps":{ },
"skewedColNamesIterator":[ ],
"skewedColValuesSize":0,
"skewedColValuesIterator":[ ],
"skewedColValueLocationMapsSize":0,
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0
},
"storedAsSubDirectories":false,
"colsSize":2,
"setParameters":true,
"parametersSize":0,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"setSkewedInfo":true,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[ ],
"sortColsSize":0,
"sortColsIterator":[ ],
"setStoredAsSubDirectories":true,
"setCols":true,
"setLocation":true,
"setInputFormat":true
},
"partitionKeys":[ ],
"parameters":{
"transient_lastDdlTime":"1597985445",
"comment":"建表_测试 Hive Hooks",
"totalSize":"0",
"numFiles":"0"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"partitionKeysSize":0,
"setDbName":true,
"setSd":true,
"setParameters":true,
"setCreateTime":true,
"setLastAccessTime":true,
"parametersSize":4,
"setTableName":true,
"setPrivileges":false,
"setOwner":true,
"setPartitionKeys":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setRetention":true,
"partitionKeysIterator":[ ],
"setTemporary":false,
"setRewriteEnabled":true
}
从下面的 json 中能够看出 “cols” 列的字段元数据信息,咱们再来看一下输入 json:
- 输入
{
"tableName":"testposthook",
"dbName":"default",
"owner":"anonymous",
"createTime":1597985445,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{"serialization.format":"","field.delim":" "},"setSerializationLib":true,"setParameters":true,"parametersSize":2,"setName":false
},
"bucketCols":[ ],
"sortCols":[ ],
"parameters":{ },
"skewedInfo":{"skewedColNames":[],
"skewedColValues":[ ],
"skewedColValueLocationMaps":{ },
"skewedColNamesIterator":[ ],
"skewedColValuesSize":0,
"skewedColValuesIterator":[ ],
"skewedColValueLocationMapsSize":0,
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0
},
"storedAsSubDirectories":false,
"colsSize":2,
"setParameters":true,
"parametersSize":0,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"setSkewedInfo":true,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setName":true,
"setType":true,
"setComment":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setName":true,
"setType":true,
"setComment":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[ ],
"sortColsSize":0,
"sortColsIterator":[ ],
"setStoredAsSubDirectories":true,
"setCols":true,
"setLocation":true,
"setInputFormat":true
},
"partitionKeys":[ ],
"parameters":{
"transient_lastDdlTime":"1597985445",
"comment":"建表_测试 Hive Hooks",
"totalSize":"0",
"numFiles":"0"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"partitionKeysSize":0,
"setDbName":true,
"setSd":true,
"setParameters":true,
"setCreateTime":true,
"setLastAccessTime":true,
"parametersSize":4,
"setTableName":true,
"setPrivileges":false,
"setOwner":true,
"setPartitionKeys":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setRetention":true,
"partitionKeysIterator":[ ],
"setTemporary":false,
"setRewriteEnabled":true
}
该
output
对象不蕴含新列age
,它示意批改表之前的元数据信息
Metastore Listeners根本应用
代码
具体实现代码如下:
public class CustomListener extends MetaStoreEventListener {private static final Logger LOGGER = LoggerFactory.getLogger(CustomListener.class);
private static final ObjectMapper objMapper = new ObjectMapper();
public CustomListener(Configuration config) {super(config);
logWithHeader("created");
}
// 监听建表操作
@Override
public void onCreateTable(CreateTableEvent event) {logWithHeader(event.getTable());
}
// 监听批改表操作
@Override
public void onAlterTable(AlterTableEvent event) {logWithHeader(event.getOldTable());
logWithHeader(event.getNewTable());
}
private void logWithHeader(Object obj) {LOGGER.info("[CustomListener][Thread:" + Thread.currentThread().getName() + "] |" + objToStr(obj));
}
private String objToStr(Object obj) {
try {return objMapper.writeValueAsString(obj);
} catch (IOException e) {LOGGER.error("Error on conversion", e);
}
return null;
}
}
应用过程解释
应用形式与 Hooks 有一点不同,Hive Hook 是与 Hiveserver 进行交互的,而 Listener 是与 Metastore 交互的,即 Listener 运行在 Metastore 过程中的。具体应用形式如下:
首先将 jar 包放在 $HIVE_HOME/lib 目录下,而后配置 hive-site.xml 文件,配置内容为:
<property>
<name>hive.metastore.event.listeners</name>
<value>com.jmx.hooks.CustomListener</value>
<description/>
</property>
配置实现之后,须要重新启动元数据服务:
bin/hive --service metastore &
建表操作
CREATE TABLE testlistener(
id int COMMENT "id",
name string COMMENT "姓名"
)COMMENT "建表_测试 Hive Listener"
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/hive/warehouse/';
察看 hive.log 日志:
{
"tableName":"testlistener",
"dbName":"default",
"owner":"anonymous",
"createTime":1597989316,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{"serialization.format":"","field.delim":" "},"setSerializationLib":true,"setParameters":true,"parametersSize":2,"setName":false
},
"bucketCols":[ ],
"sortCols":[ ],
"parameters":{ },
"skewedInfo":{"skewedColNames":[],
"skewedColValues":[ ],
"skewedColValueLocationMaps":{ },
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0,
"skewedColNamesIterator":[ ],
"skewedColValuesSize":0,
"skewedColValuesIterator":[ ],
"skewedColValueLocationMapsSize":0
},
"storedAsSubDirectories":false,
"setCols":true,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"colsSize":2,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[ ],
"sortColsSize":0,
"sortColsIterator":[ ],
"setStoredAsSubDirectories":true,
"setParameters":true,
"setLocation":true,
"setInputFormat":true,
"parametersSize":0,
"setSkewedInfo":true
},
"partitionKeys":[ ],
"parameters":{
"transient_lastDdlTime":"1597989316",
"comment":"建表_测试 Hive Listener",
"totalSize":"0",
"numFiles":"0"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":{
"userPrivileges":{
"anonymous":[
{
"privilege":"INSERT",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
},
{
"privilege":"SELECT",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
},
{
"privilege":"UPDATE",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
},
{
"privilege":"DELETE",
"createTime":-1,
"grantor":"anonymous",
"grantorType":"USER",
"grantOption":true,
"setGrantOption":true,
"setCreateTime":true,
"setGrantor":true,
"setGrantorType":true,
"setPrivilege":true
}
]
},
"groupPrivileges":null,
"rolePrivileges":null,
"setUserPrivileges":true,
"setGroupPrivileges":false,
"setRolePrivileges":false,
"userPrivilegesSize":1,
"groupPrivilegesSize":0,
"rolePrivilegesSize":0
},
"temporary":false,
"rewriteEnabled":false,
"setParameters":true,
"setPartitionKeys":true,
"partitionKeysSize":0,
"setSd":true,
"setLastAccessTime":true,
"setRetention":true,
"partitionKeysIterator":[ ],
"parametersSize":4,
"setTemporary":true,
"setRewriteEnabled":false,
"setTableName":true,
"setDbName":true,
"setOwner":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setPrivileges":true,
"setCreateTime":true
}
当咱们再执行批改表操作时
ALTER TABLE testlistener
ADD COLUMNS (age int COMMENT '年龄');
再次察看日志:
能够看出下面有两条记录,第一条记录是 old table 的信息,第二条是批改之后的表的信息。
- old table
{
"tableName":"testlistener",
"dbName":"default",
"owner":"anonymous",
"createTime":1597989316,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{"serialization.format":"","field.delim":" "},"setSerializationLib":true,"setParameters":true,"parametersSize":2,"setName":false
},
"bucketCols":[ ],
"sortCols":[ ],
"parameters":{ },
"skewedInfo":{"skewedColNames":[],
"skewedColValues":[ ],
"skewedColValueLocationMaps":{ },
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0,
"skewedColNamesIterator":[ ],
"skewedColValuesSize":0,
"skewedColValuesIterator":[ ],
"skewedColValueLocationMapsSize":0
},
"storedAsSubDirectories":false,
"setCols":true,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"colsSize":2,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[ ],
"sortColsSize":0,
"sortColsIterator":[ ],
"setStoredAsSubDirectories":true,
"setParameters":true,
"setLocation":true,
"setInputFormat":true,
"parametersSize":0,
"setSkewedInfo":true
},
"partitionKeys":[ ],
"parameters":{
"totalSize":"0",
"numFiles":"0",
"transient_lastDdlTime":"1597989316",
"comment":"建表_测试 Hive Listener"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"setParameters":true,
"setPartitionKeys":true,
"partitionKeysSize":0,
"setSd":true,
"setLastAccessTime":true,
"setRetention":true,
"partitionKeysIterator":[ ],
"parametersSize":4,
"setTemporary":false,
"setRewriteEnabled":true,
"setTableName":true,
"setDbName":true,
"setOwner":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setPrivileges":false,
"setCreateTime":true
}
- new table
{
"tableName":"testlistener",
"dbName":"default",
"owner":"anonymous",
"createTime":1597989316,
"lastAccessTime":0,
"retention":0,
"sd":{
"cols":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"age",
"type":"int",
"comment":"年龄",
"setComment":true,
"setType":true,
"setName":true
}
],
"location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",
"inputFormat":"org.apache.hadoop.mapred.TextInputFormat",
"outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"compressed":false,
"numBuckets":-1,
"serdeInfo":{
"name":null,
"serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"parameters":{"serialization.format":"","field.delim":" "},"setSerializationLib":true,"setParameters":true,"parametersSize":2,"setName":false
},
"bucketCols":[ ],
"sortCols":[ ],
"parameters":{ },
"skewedInfo":{"skewedColNames":[],
"skewedColValues":[ ],
"skewedColValueLocationMaps":{ },
"setSkewedColNames":true,
"setSkewedColValues":true,
"setSkewedColValueLocationMaps":true,
"skewedColNamesSize":0,
"skewedColNamesIterator":[ ],
"skewedColValuesSize":0,
"skewedColValuesIterator":[ ],
"skewedColValueLocationMapsSize":0
},
"storedAsSubDirectories":false,
"setCols":true,
"setOutputFormat":true,
"setSerdeInfo":true,
"setBucketCols":true,
"setSortCols":true,
"colsSize":3,
"colsIterator":[
{
"name":"id",
"type":"int",
"comment":"id",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"name",
"type":"string",
"comment":"姓名",
"setComment":true,
"setType":true,
"setName":true
},
{
"name":"age",
"type":"int",
"comment":"年龄",
"setComment":true,
"setType":true,
"setName":true
}
],
"setCompressed":true,
"setNumBuckets":true,
"bucketColsSize":0,
"bucketColsIterator":[ ],
"sortColsSize":0,
"sortColsIterator":[ ],
"setStoredAsSubDirectories":true,
"setParameters":true,
"setLocation":true,
"setInputFormat":true,
"parametersSize":0,
"setSkewedInfo":true
},
"partitionKeys":[ ],
"parameters":{
"totalSize":"0",
"last_modified_time":"1597989660",
"numFiles":"0",
"transient_lastDdlTime":"1597989660",
"comment":"建表_测试 Hive Listener",
"last_modified_by":"anonymous"
},
"viewOriginalText":null,
"viewExpandedText":null,
"tableType":"MANAGED_TABLE",
"privileges":null,
"temporary":false,
"rewriteEnabled":false,
"setParameters":true,
"setPartitionKeys":true,
"partitionKeysSize":0,
"setSd":true,
"setLastAccessTime":true,
"setRetention":true,
"partitionKeysIterator":[ ],
"parametersSize":6,
"setTemporary":false,
"setRewriteEnabled":true,
"setTableName":true,
"setDbName":true,
"setOwner":true,
"setViewOriginalText":false,
"setViewExpandedText":false,
"setTableType":true,
"setPrivileges":false,
"setCreateTime":true
}
能够看出:批改之后的表的元数据信息中,蕴含新增加的列age
。
总结
在本文中,咱们介绍了如何在 Hive 中操作元数据,从而可能主动进行元数据管理。咱们给出了 Hive Hooks 和 Metastore Listener 的根本应用形式,这些形式能够帮忙咱们实现操作元数据。当然也能够将这些元数据信息推送到 Kafka 中,以此构建本人的元数据管理系统。
公众号『大数据技术与数仓』,回复『材料』支付大数据资料包