元数据管理是数据仓库的外围,它不仅定义了数据仓库有什么,还指明了数据仓库中数据的内容和地位,刻画了数据的提取和转换规则,存储了与数据仓库主题无关的各种商业信息。本文次要介绍Hive Hook和MetaStore Listener,应用这些性能能够进行主动的元数据管理。通过本文你能够理解到:

  • 元数据管理
  • Hive Hooks 和 Metastore Listeners
  • Hive Hooks根本应用
  • Metastore Listeners根本应用

元数据管理

元数据定义

依照传统的定义,元数据( Metadata )是对于数据的数据。元数据买通了源数据、数据仓库、数据利用,记录了数据从产生到生产的全过程。元数据次要记录数据仓库中模型的定义、各层级间的映射关系、监控数据仓库的数据状态及ETL 的工作运行状态。在数据仓库零碎中,元数据能够帮忙数据仓库管理员和开发人员十分不便地找到他们所关怀的数据,用于领导其进行数据管理和开发工作,进步工作效率。将元数据按用处的不同分为两类:技术元数据( Technical Metadata)和业务元数据( Business Metadata )。技术元数据是存储对于数据仓库零碎技术细节的数据,是用于开发和治理数据仓库应用的数据。

元数据分类

技术元数据

  • 分布式计算零碎存储元数据

如Hive表、列、分区等信息。记录了表的表名。分区信息、责任人信息、文件大小、表类型,以及列的字段名、字段类型、字段备注、是否是分区字段等信息。

  • 分布式计算零碎运行元数据

    相似于Hive 的Job 日志,包含作业类型、实例名称、输入输出、SQL 、运行参数、执行工夫等。

  • 任务调度元数据

    工作的依赖类型、依赖关系等,以及不同类型调度工作的运行日志等。

业务元数据

业务元数据从业务角度形容了数据仓库中的数据,它提供了介于使用者和理论零碎之间的语义层,使得不懂计算机技术的业务人员也可能“ 读懂”数据仓库中的数据。常见的业务元数据有:如维度及属性、业务过程、指标等的规范化定义,用于更好地治理和应用数据;数据利用元数据,如数据报表、数据产品等的配置和运行元数据。

元数据利用

数据的真正价值在于数据驱动决策,通过数据领导经营。通过数据驱动的办法,咱们可能判断趋势,从而开展无效口头,帮忙本人发现问题,推动翻新或解决方案的产生。这就是数据化经营。同样,对于元数据,能够用于领导数据相干人员进行日常工作,实现数据化“经营”。比方对于数据使用者,能够通过元数据让其疾速找到所须要的数据;对于ETL 工程师,能够通过元数据领导其进行模型设计、工作优化和工作下线等各种日常ETL 工作;对于运维工程师,能够通过元数据领导其进行整个集群的存储、计算和系统优化等运维工作。

Hive Hooks 和 Metastore Listeners

Hive Hooks

对于数据治理和元数据管理框架,业界有许多开源的零碎,比方Apache Atlas,这些开源的软件能够在简单的场景下满足元数据管理的需要。其实Apache Atlas对于Hive的元数据管理,应用的是Hive的Hooks。须要进行如下配置:

<property>    <name>hive.exec.post.hooks</name>    <value>org.apache.atlas.hive.hook.HiveHook<value/></property>

通过Hook监听Hive的各种事件,比方创立表,批改表等,而后依照特定的格局把收集的数据推送到Kafka,最初生产元数据并存储。

Hive Hooks分类

那么,到底什么是Hooks呢?

Hooks 是一种事件和音讯机制, 能够将事件绑定在外部 Hive 的执行流程中,而无需从新编译 Hive。Hook 提供了扩大和继承内部组件的形式。依据不同的 Hook 类型,能够在不同的阶段运行。对于Hooks的类型,次要分为以下几种:

  • hive.exec.pre.hooks

从名称能够看出,在执行引擎执行查问之前被调用。这个须要在 Hive 对查问打算进行过优化之后才能够应用。应用该Hooks须要实现接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具体在hive-site.xml中的配置如下:

<property>    <name>hive.exec.pre.hooks</name>    <value>实现类的全限定名<value/></property>
  • hive.exec.post.hooks

在执行打算执行完结后果返回给用户之前被调用。应用时须要实现接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具体在hive-site.xml中的配置如下:

<property>    <name>hive.exec.post.hooks</name>    <value>实现类的全限定名<value/></property>
  • hive.exec.failure.hooks

在执行打算失败之后被调用。应用时须要实现接口:org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext,具体在hive-site.xml中的配置如下:

<property>    <name>hive.exec.failure.hooks</name>    <value>实现类的全限定名<value/></property>
  • hive.metastore.init.hooks

HMSHandler初始化是被调用。应用时须要实现接口:org.apache.hadoop.hive.metastore.MetaStoreInitListener,具体在hive-site.xml中的配置如下:

<property>    <name>hive.metastore.init.hooks</name>    <value>实现类的全限定名<value/></property>
  • hive.exec.driver.run.hooks

在Driver.run开始或完结时运行,应用时须要实现接口:org.apache.hadoop.hive.ql.HiveDriverRunHook,具体在hive-site.xml中的配置如下:

<property>    <name>hive.exec.driver.run.hooks</name>    <value>实现类的全限定名<value/></property>
  • hive.semantic.analyzer.hook

Hive 对查问语句进行语义剖析的时候调用。应用时须要集成抽象类:org.apache.hadoop.hive.ql.parse.AbstractSemanticAnalyzerHook,具体在hive-site.xml中的配置如下:

<property>    <name>hive.semantic.analyzer.hook</name>    <value>实现类的全限定名<value/></property>

Hive Hooks的优缺点

  • 长处

    • 能够很不便地在各种查问阶段嵌入或者运行自定义的代码
    • 能够被用作更新元数据
  • 毛病

    • 当应用Hooks时,获取到的元数据通常须要进一步解析,否则很难了解
    • 会影响查问的过程
对于Hive Hooks,本文将给出hive.exec.post.hook的应用案例,该Hooks会在查问执行之后,返回后果之前运行。

Metastore Listeners

所谓Metastore Listeners,指的是对Hive metastore的监听。用户能够自定义一些代码,用来应用对元数据的监听。

当咱们看HiveMetaStore这个类的源码时,会发现:在创立HiveMetaStore的init()办法中,同时创立了三种Listener,别离为MetaStorePreEventListener,MetaStoreEventListener和MetaStoreEndFunctionListener,这些Listener用于对每一步事件的监听。

public class HiveMetaStore extends ThriftHiveMetastore {    // ...省略代码    public static class HMSHandler extends FacebookBase implements            IHMSHandler {        // ...省略代码        public void init() throws MetaException {            // ...省略代码            // 获取MetaStorePreEventListener            preListeners = MetaStoreUtils.getMetaStoreListeners(MetaStorePreEventListener.class,                    hiveConf,                    hiveConf.getVar(HiveConf.ConfVars.METASTORE_PRE_EVENT_LISTENERS));            // 获取MetaStoreEventListener            listeners = MetaStoreUtils.getMetaStoreListeners(MetaStoreEventListener.class,                    hiveConf,                    hiveConf.getVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS));            listeners.add(new SessionPropertiesListener(hiveConf));            // 获取MetaStoreEndFunctionListener            endFunctionListeners = MetaStoreUtils.getMetaStoreListeners(                    MetaStoreEndFunctionListener.class,                     hiveConf,                    hiveConf.getVar(HiveConf.ConfVars.METASTORE_END_FUNCTION_LISTENERS));            // ...省略代码        }    }}

Metastore Listeners分类

  • hive.metastore.pre.event.listeners

须要扩大此抽象类,以提供在metastore上产生特定事件之前须要执行的操作实现。在metastore上产生事件之前,将调用这些办法。

应用时须要继承抽象类:org.apache.hadoop.hive.metastore.MetaStorePreEventListener,在Hive-site.xml中的配置为:

 <property>    <name>hive.metastore.pre.event.listeners</name>    <value>实现类的全限定名</value>   </property>
  • hive.metastore.event.listeners

须要扩大此抽象类,以提供在metastore上产生特定事件时须要执行的操作实现。每当Metastore上产生事件时,就会调用这些办法。

应用时须要继承抽象类:org.apache.hadoop.hive.metastore.MetaStoreEventListener,在Hive-site.xml中的配置为:

  <property>    <name>hive.metastore.event.listeners</name>    <value>实现类的全限定名</value>   </property>
  • hive.metastore.end.function.listeners

每当函数完结时,将调用这些办法。

应用时须要继承抽象类:org.apache.hadoop.hive.metastore.MetaStoreEndFunctionListener ,在Hive-site.xml中的配置为:

<property>    <name>hive.metastore.end.function.listeners</name>    <value>实现类的全限定名</value> </property>

Metastore Listeners优缺点

  • 长处

    • 元数据曾经被解析好了,很容易了解
    • 不影响查问的过程,是只读的
  • 毛病

    • 不灵便,仅仅可能拜访属于以后事件的对象
对于metastore listener,本文会给出MetaStoreEventListener的应用案例,具体会实现两个办法:onCreateTable和onAlterTable

Hive Hooks根本应用

代码

具体实现代码如下:

public class CustomPostHook implements ExecuteWithHookContext {    private static final Logger LOGGER = LoggerFactory.getLogger(CustomPostHook.class);    // 存储Hive的SQL操作类型    private static final HashSet<String> OPERATION_NAMES = new HashSet<>();    // HiveOperation是一个枚举类,封装了Hive的SQL操作类型    // 监控SQL操作类型    static {        // 建表        OPERATION_NAMES.add(HiveOperation.CREATETABLE.getOperationName());        // 批改数据库属性        OPERATION_NAMES.add(HiveOperation.ALTERDATABASE.getOperationName());        // 批改数据库属主        OPERATION_NAMES.add(HiveOperation.ALTERDATABASE_OWNER.getOperationName());        // 批改表属性,增加列        OPERATION_NAMES.add(HiveOperation.ALTERTABLE_ADDCOLS.getOperationName());        // 批改表属性,表存储门路        OPERATION_NAMES.add(HiveOperation.ALTERTABLE_LOCATION.getOperationName());        // 批改表属性        OPERATION_NAMES.add(HiveOperation.ALTERTABLE_PROPERTIES.getOperationName());        // 表重命名        OPERATION_NAMES.add(HiveOperation.ALTERTABLE_RENAME.getOperationName());        // 列重命名        OPERATION_NAMES.add(HiveOperation.ALTERTABLE_RENAMECOL.getOperationName());        // 更新列,先删除以后的列,而后退出新的列        OPERATION_NAMES.add(HiveOperation.ALTERTABLE_REPLACECOLS.getOperationName());        // 创立数据库        OPERATION_NAMES.add(HiveOperation.CREATEDATABASE.getOperationName());        // 删除数据库        OPERATION_NAMES.add(HiveOperation.DROPDATABASE.getOperationName());        // 删除表        OPERATION_NAMES.add(HiveOperation.DROPTABLE.getOperationName());    }    @Override    public void run(HookContext hookContext) throws Exception {        assert (hookContext.getHookType() == HookType.POST_EXEC_HOOK);        // 执行打算        QueryPlan plan = hookContext.getQueryPlan();        // 操作名称        String operationName = plan.getOperationName();        logWithHeader("执行的SQL语句: " + plan.getQueryString());        logWithHeader("操作名称: " + operationName);        if (OPERATION_NAMES.contains(operationName) && !plan.isExplain()) {            logWithHeader("监控SQL操作");            Set<ReadEntity> inputs = hookContext.getInputs();            Set<WriteEntity> outputs = hookContext.getOutputs();            for (Entity entity : inputs) {                logWithHeader("Hook metadata输出值: " + toJson(entity));            }            for (Entity entity : outputs) {                logWithHeader("Hook metadata输入值: " + toJson(entity));            }        } else {            logWithHeader("不在监控范畴,疏忽该hook!");        }    }    private static String toJson(Entity entity) throws Exception {        ObjectMapper mapper = new ObjectMapper();        //  entity的类型        // 次要包含:        // DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR, FUNCTION        switch (entity.getType()) {            case DATABASE:                Database db = entity.getDatabase();                return mapper.writeValueAsString(db);            case TABLE:                return mapper.writeValueAsString(entity.getTable().getTTable());        }        return null;    }    /**     * 日志格局     *     * @param obj     */    private void logWithHeader(Object obj) {        LOGGER.info("[CustomPostHook][Thread: " + Thread.currentThread().getName() + "] | " + obj);    }    }

应用过程解释

首先将上述代码编译成jar包,放在$HIVE_HOME/lib目录下,或者应用在Hive的客户端中执行增加jar包的命令:

0: jdbc:hive2://localhost:10000> add jar /opt/softwares/com.jmx.hive-1.0-SNAPSHOT.jar;

接着配置Hive-site.xml文件,为了不便,咱们间接应用客户端命令进行配置:

0: jdbc:hive2://localhost:10000> set hive.exec.post.hooks=com.jmx.hooks.CustomPostHook;

查看表操作

下面的代码中咱们对一些操作进行了监控,当监控到这些操作时会触发一些自定义的代码(比方输入日志)。当咱们在Hive的beeline客户端中输出上面命令时:

0: jdbc:hive2://localhost:10000> show tables;

在$HIVE_HOME/logs/hive.log文件能够看到:

[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 执行的SQL语句: show tables[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] | 操作名称: SHOWTABLES[CustomPostHook][Thread: cab9a763-c63e-4f25-9f9a-affacb3cecdb main] |不在监控范畴,疏忽该hook!

下面的查看表操作,不在监控范畴,所以没有绝对应的元数据日志。

建表操作

当咱们在Hive的beeline客户端中创立一张表时,如下:

CREATE TABLE testposthook(  id int COMMENT "id",  name string COMMENT "姓名")COMMENT "建表_测试Hive Hooks"ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/user/hive/warehouse/';

察看hive.log日志:

下面的Hook metastore输入值有两个:第一个是数据库的元数据信息第二个是表的元数据信息

  • 数据库元数据
{    "name":"default",    "description":"Default Hive database",    "locationUri":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",    "parameters":{    },    "privileges":null,    "ownerName":"public",    "ownerType":"ROLE",    "setParameters":true,    "parametersSize":0,    "setOwnerName":true,    "setOwnerType":true,    "setPrivileges":false,    "setName":true,    "setDescription":true,    "setLocationUri":true}
  • 表元数据
{    "tableName":"testposthook",    "dbName":"default",    "owner":"anonymous",    "createTime":1597985444,    "lastAccessTime":0,    "retention":0,    "sd":{        "cols":[        ],        "location":null,        "inputFormat":"org.apache.hadoop.mapred.SequenceFileInputFormat",        "outputFormat":"org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",        "compressed":false,        "numBuckets":-1,        "serdeInfo":{            "name":null,            "serializationLib":"org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe",            "parameters":{                "serialization.format":"1"            },            "setSerializationLib":true,            "setParameters":true,            "parametersSize":1,            "setName":false        },        "bucketCols":[        ],        "sortCols":[        ],        "parameters":{        },        "skewedInfo":{            "skewedColNames":[            ],            "skewedColValues":[            ],            "skewedColValueLocationMaps":{            },            "skewedColNamesIterator":[            ],            "skewedColValuesSize":0,            "skewedColValuesIterator":[            ],            "skewedColValueLocationMapsSize":0,            "setSkewedColNames":true,            "setSkewedColValues":true,            "setSkewedColValueLocationMaps":true,            "skewedColNamesSize":0        },        "storedAsSubDirectories":false,        "colsSize":0,        "setParameters":true,        "parametersSize":0,        "setOutputFormat":true,        "setSerdeInfo":true,        "setBucketCols":true,        "setSortCols":true,        "setSkewedInfo":true,        "colsIterator":[        ],        "setCompressed":false,        "setNumBuckets":true,        "bucketColsSize":0,        "bucketColsIterator":[        ],        "sortColsSize":0,        "sortColsIterator":[        ],        "setStoredAsSubDirectories":false,        "setCols":true,        "setLocation":false,        "setInputFormat":true    },    "partitionKeys":[    ],    "parameters":{    },    "viewOriginalText":null,    "viewExpandedText":null,    "tableType":"MANAGED_TABLE",    "privileges":null,    "temporary":false,    "rewriteEnabled":false,    "partitionKeysSize":0,    "setDbName":true,    "setSd":true,    "setParameters":true,    "setCreateTime":true,    "setLastAccessTime":false,    "parametersSize":0,    "setTableName":true,    "setPrivileges":false,    "setOwner":true,    "setPartitionKeys":true,    "setViewOriginalText":false,    "setViewExpandedText":false,    "setTableType":true,    "setRetention":false,    "partitionKeysIterator":[    ],    "setTemporary":false,    "setRewriteEnabled":false}

咱们发现下面的表元数据信息中,cols[]列没有数据,即没有建表时的字段id和字段name的信息。如果要获取这些信息,能够执行上面的命令:

ALTER TABLE testposthook ADD COLUMNS (age int COMMENT '年龄');

再次察看日志信息:

下面的日志中,Hook metastore只有一个输出和一个输入:都示意table的元数据信息。

  • 输出
{    "tableName":"testposthook",    "dbName":"default",    "owner":"anonymous",    "createTime":1597985445,    "lastAccessTime":0,    "retention":0,    "sd":{        "cols":[            {                "name":"id",                "type":"int",                "comment":"id",                "setName":true,                "setType":true,                "setComment":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setName":true,                "setType":true,                "setComment":true            }        ],        "location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",        "inputFormat":"org.apache.hadoop.mapred.TextInputFormat",        "outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",        "compressed":false,        "numBuckets":-1,        "serdeInfo":{            "name":null,            "serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",            "parameters":{                "serialization.format":"    ",                "field.delim":"    "            },            "setSerializationLib":true,            "setParameters":true,            "parametersSize":2,            "setName":false        },        "bucketCols":[        ],        "sortCols":[        ],        "parameters":{        },        "skewedInfo":{            "skewedColNames":[            ],            "skewedColValues":[            ],            "skewedColValueLocationMaps":{            },            "skewedColNamesIterator":[            ],            "skewedColValuesSize":0,            "skewedColValuesIterator":[            ],            "skewedColValueLocationMapsSize":0,            "setSkewedColNames":true,            "setSkewedColValues":true,            "setSkewedColValueLocationMaps":true,            "skewedColNamesSize":0        },        "storedAsSubDirectories":false,        "colsSize":2,        "setParameters":true,        "parametersSize":0,        "setOutputFormat":true,        "setSerdeInfo":true,        "setBucketCols":true,        "setSortCols":true,        "setSkewedInfo":true,        "colsIterator":[            {                "name":"id",                "type":"int",                "comment":"id",                "setName":true,                "setType":true,                "setComment":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setName":true,                "setType":true,                "setComment":true            }        ],        "setCompressed":true,        "setNumBuckets":true,        "bucketColsSize":0,        "bucketColsIterator":[        ],        "sortColsSize":0,        "sortColsIterator":[        ],        "setStoredAsSubDirectories":true,        "setCols":true,        "setLocation":true,        "setInputFormat":true    },    "partitionKeys":[    ],    "parameters":{        "transient_lastDdlTime":"1597985445",        "comment":"建表_测试Hive Hooks",        "totalSize":"0",        "numFiles":"0"    },    "viewOriginalText":null,    "viewExpandedText":null,    "tableType":"MANAGED_TABLE",    "privileges":null,    "temporary":false,    "rewriteEnabled":false,    "partitionKeysSize":0,    "setDbName":true,    "setSd":true,    "setParameters":true,    "setCreateTime":true,    "setLastAccessTime":true,    "parametersSize":4,    "setTableName":true,    "setPrivileges":false,    "setOwner":true,    "setPartitionKeys":true,    "setViewOriginalText":false,    "setViewExpandedText":false,    "setTableType":true,    "setRetention":true,    "partitionKeysIterator":[    ],    "setTemporary":false,    "setRewriteEnabled":true}

从下面的json中能够看出"cols"列的字段元数据信息,咱们再来看一下输入json:

  • 输入
{    "tableName":"testposthook",    "dbName":"default",    "owner":"anonymous",    "createTime":1597985445,    "lastAccessTime":0,    "retention":0,    "sd":{        "cols":[            {                "name":"id",                "type":"int",                "comment":"id",                "setName":true,                "setType":true,                "setComment":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setName":true,                "setType":true,                "setComment":true            }        ],        "location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",        "inputFormat":"org.apache.hadoop.mapred.TextInputFormat",        "outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",        "compressed":false,        "numBuckets":-1,        "serdeInfo":{            "name":null,            "serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",            "parameters":{                "serialization.format":"    ",                "field.delim":"    "            },            "setSerializationLib":true,            "setParameters":true,            "parametersSize":2,            "setName":false        },        "bucketCols":[        ],        "sortCols":[        ],        "parameters":{        },        "skewedInfo":{            "skewedColNames":[            ],            "skewedColValues":[            ],            "skewedColValueLocationMaps":{            },            "skewedColNamesIterator":[            ],            "skewedColValuesSize":0,            "skewedColValuesIterator":[            ],            "skewedColValueLocationMapsSize":0,            "setSkewedColNames":true,            "setSkewedColValues":true,            "setSkewedColValueLocationMaps":true,            "skewedColNamesSize":0        },        "storedAsSubDirectories":false,        "colsSize":2,        "setParameters":true,        "parametersSize":0,        "setOutputFormat":true,        "setSerdeInfo":true,        "setBucketCols":true,        "setSortCols":true,        "setSkewedInfo":true,        "colsIterator":[            {                "name":"id",                "type":"int",                "comment":"id",                "setName":true,                "setType":true,                "setComment":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setName":true,                "setType":true,                "setComment":true            }        ],        "setCompressed":true,        "setNumBuckets":true,        "bucketColsSize":0,        "bucketColsIterator":[        ],        "sortColsSize":0,        "sortColsIterator":[        ],        "setStoredAsSubDirectories":true,        "setCols":true,        "setLocation":true,        "setInputFormat":true    },    "partitionKeys":[    ],    "parameters":{        "transient_lastDdlTime":"1597985445",        "comment":"建表_测试Hive Hooks",        "totalSize":"0",        "numFiles":"0"    },    "viewOriginalText":null,    "viewExpandedText":null,    "tableType":"MANAGED_TABLE",    "privileges":null,    "temporary":false,    "rewriteEnabled":false,    "partitionKeysSize":0,    "setDbName":true,    "setSd":true,    "setParameters":true,    "setCreateTime":true,    "setLastAccessTime":true,    "parametersSize":4,    "setTableName":true,    "setPrivileges":false,    "setOwner":true,    "setPartitionKeys":true,    "setViewOriginalText":false,    "setViewExpandedText":false,    "setTableType":true,    "setRetention":true,    "partitionKeysIterator":[    ],    "setTemporary":false,    "setRewriteEnabled":true}
output对象不蕴含新列age,它示意批改表之前的元数据信息

Metastore Listeners根本应用

代码

具体实现代码如下:

public class CustomListener extends MetaStoreEventListener {    private static final Logger LOGGER = LoggerFactory.getLogger(CustomListener.class);    private static final ObjectMapper objMapper = new ObjectMapper();    public CustomListener(Configuration config) {        super(config);        logWithHeader(" created ");    }    // 监听建表操作    @Override    public void onCreateTable(CreateTableEvent event) {        logWithHeader(event.getTable());    }    // 监听批改表操作    @Override    public void onAlterTable(AlterTableEvent event) {        logWithHeader(event.getOldTable());        logWithHeader(event.getNewTable());    }    private void logWithHeader(Object obj) {        LOGGER.info("[CustomListener][Thread: " + Thread.currentThread().getName() + "] | " + objToStr(obj));    }    private String objToStr(Object obj) {        try {            return objMapper.writeValueAsString(obj);        } catch (IOException e) {            LOGGER.error("Error on conversion", e);        }        return null;    }}

应用过程解释

应用形式与Hooks有一点不同,Hive Hook是与Hiveserver进行交互的,而Listener是与Metastore交互的,即Listener运行在Metastore过程中的。具体应用形式如下:

首先将jar包放在$HIVE_HOME/lib目录下,而后配置hive-site.xml文件,配置内容为:

<property>    <name>hive.metastore.event.listeners</name>    <value>com.jmx.hooks.CustomListener</value>    <description/> </property>

配置实现之后,须要重新启动元数据服务:

bin/hive --service metastore &

建表操作

CREATE TABLE testlistener(  id int COMMENT "id",  name string COMMENT "姓名")COMMENT "建表_测试Hive Listener"ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'LOCATION '/user/hive/warehouse/';

察看hive.log日志:

{    "tableName":"testlistener",    "dbName":"default",    "owner":"anonymous",    "createTime":1597989316,    "lastAccessTime":0,    "retention":0,    "sd":{        "cols":[            {                "name":"id",                "type":"int",                "comment":"id",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setComment":true,                "setType":true,                "setName":true            }        ],        "location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",        "inputFormat":"org.apache.hadoop.mapred.TextInputFormat",        "outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",        "compressed":false,        "numBuckets":-1,        "serdeInfo":{            "name":null,            "serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",            "parameters":{                "serialization.format":"    ",                "field.delim":"    "            },            "setSerializationLib":true,            "setParameters":true,            "parametersSize":2,            "setName":false        },        "bucketCols":[        ],        "sortCols":[        ],        "parameters":{        },        "skewedInfo":{            "skewedColNames":[            ],            "skewedColValues":[            ],            "skewedColValueLocationMaps":{            },            "setSkewedColNames":true,            "setSkewedColValues":true,            "setSkewedColValueLocationMaps":true,            "skewedColNamesSize":0,            "skewedColNamesIterator":[            ],            "skewedColValuesSize":0,            "skewedColValuesIterator":[            ],            "skewedColValueLocationMapsSize":0        },        "storedAsSubDirectories":false,        "setCols":true,        "setOutputFormat":true,        "setSerdeInfo":true,        "setBucketCols":true,        "setSortCols":true,        "colsSize":2,        "colsIterator":[            {                "name":"id",                "type":"int",                "comment":"id",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setComment":true,                "setType":true,                "setName":true            }        ],        "setCompressed":true,        "setNumBuckets":true,        "bucketColsSize":0,        "bucketColsIterator":[        ],        "sortColsSize":0,        "sortColsIterator":[        ],        "setStoredAsSubDirectories":true,        "setParameters":true,        "setLocation":true,        "setInputFormat":true,        "parametersSize":0,        "setSkewedInfo":true    },    "partitionKeys":[    ],    "parameters":{        "transient_lastDdlTime":"1597989316",        "comment":"建表_测试Hive Listener",        "totalSize":"0",        "numFiles":"0"    },    "viewOriginalText":null,    "viewExpandedText":null,    "tableType":"MANAGED_TABLE",    "privileges":{        "userPrivileges":{            "anonymous":[                {                    "privilege":"INSERT",                    "createTime":-1,                    "grantor":"anonymous",                    "grantorType":"USER",                    "grantOption":true,                    "setGrantOption":true,                    "setCreateTime":true,                    "setGrantor":true,                    "setGrantorType":true,                    "setPrivilege":true                },                {                    "privilege":"SELECT",                    "createTime":-1,                    "grantor":"anonymous",                    "grantorType":"USER",                    "grantOption":true,                    "setGrantOption":true,                    "setCreateTime":true,                    "setGrantor":true,                    "setGrantorType":true,                    "setPrivilege":true                },                {                    "privilege":"UPDATE",                    "createTime":-1,                    "grantor":"anonymous",                    "grantorType":"USER",                    "grantOption":true,                    "setGrantOption":true,                    "setCreateTime":true,                    "setGrantor":true,                    "setGrantorType":true,                    "setPrivilege":true                },                {                    "privilege":"DELETE",                    "createTime":-1,                    "grantor":"anonymous",                    "grantorType":"USER",                    "grantOption":true,                    "setGrantOption":true,                    "setCreateTime":true,                    "setGrantor":true,                    "setGrantorType":true,                    "setPrivilege":true                }            ]        },        "groupPrivileges":null,        "rolePrivileges":null,        "setUserPrivileges":true,        "setGroupPrivileges":false,        "setRolePrivileges":false,        "userPrivilegesSize":1,        "groupPrivilegesSize":0,        "rolePrivilegesSize":0    },    "temporary":false,    "rewriteEnabled":false,    "setParameters":true,    "setPartitionKeys":true,    "partitionKeysSize":0,    "setSd":true,    "setLastAccessTime":true,    "setRetention":true,    "partitionKeysIterator":[    ],    "parametersSize":4,    "setTemporary":true,    "setRewriteEnabled":false,    "setTableName":true,    "setDbName":true,    "setOwner":true,    "setViewOriginalText":false,    "setViewExpandedText":false,    "setTableType":true,    "setPrivileges":true,    "setCreateTime":true}

当咱们再执行批改表操作时

ALTER TABLE testlistener ADD COLUMNS (age int COMMENT '年龄');

再次察看日志:

能够看出下面有两条记录,第一条记录是old table的信息,第二条是批改之后的表的信息。

  • old table
{    "tableName":"testlistener",    "dbName":"default",    "owner":"anonymous",    "createTime":1597989316,    "lastAccessTime":0,    "retention":0,    "sd":{        "cols":[            {                "name":"id",                "type":"int",                "comment":"id",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setComment":true,                "setType":true,                "setName":true            }        ],        "location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",        "inputFormat":"org.apache.hadoop.mapred.TextInputFormat",        "outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",        "compressed":false,        "numBuckets":-1,        "serdeInfo":{            "name":null,            "serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",            "parameters":{                "serialization.format":"    ",                "field.delim":"    "            },            "setSerializationLib":true,            "setParameters":true,            "parametersSize":2,            "setName":false        },        "bucketCols":[        ],        "sortCols":[        ],        "parameters":{        },        "skewedInfo":{            "skewedColNames":[            ],            "skewedColValues":[            ],            "skewedColValueLocationMaps":{            },            "setSkewedColNames":true,            "setSkewedColValues":true,            "setSkewedColValueLocationMaps":true,            "skewedColNamesSize":0,            "skewedColNamesIterator":[            ],            "skewedColValuesSize":0,            "skewedColValuesIterator":[            ],            "skewedColValueLocationMapsSize":0        },        "storedAsSubDirectories":false,        "setCols":true,        "setOutputFormat":true,        "setSerdeInfo":true,        "setBucketCols":true,        "setSortCols":true,        "colsSize":2,        "colsIterator":[            {                "name":"id",                "type":"int",                "comment":"id",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setComment":true,                "setType":true,                "setName":true            }        ],        "setCompressed":true,        "setNumBuckets":true,        "bucketColsSize":0,        "bucketColsIterator":[        ],        "sortColsSize":0,        "sortColsIterator":[        ],        "setStoredAsSubDirectories":true,        "setParameters":true,        "setLocation":true,        "setInputFormat":true,        "parametersSize":0,        "setSkewedInfo":true    },    "partitionKeys":[    ],    "parameters":{        "totalSize":"0",        "numFiles":"0",        "transient_lastDdlTime":"1597989316",        "comment":"建表_测试Hive Listener"    },    "viewOriginalText":null,    "viewExpandedText":null,    "tableType":"MANAGED_TABLE",    "privileges":null,    "temporary":false,    "rewriteEnabled":false,    "setParameters":true,    "setPartitionKeys":true,    "partitionKeysSize":0,    "setSd":true,    "setLastAccessTime":true,    "setRetention":true,    "partitionKeysIterator":[    ],    "parametersSize":4,    "setTemporary":false,    "setRewriteEnabled":true,    "setTableName":true,    "setDbName":true,    "setOwner":true,    "setViewOriginalText":false,    "setViewExpandedText":false,    "setTableType":true,    "setPrivileges":false,    "setCreateTime":true}
  • new table
{    "tableName":"testlistener",    "dbName":"default",    "owner":"anonymous",    "createTime":1597989316,    "lastAccessTime":0,    "retention":0,    "sd":{        "cols":[            {                "name":"id",                "type":"int",                "comment":"id",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"age",                "type":"int",                "comment":"年龄",                "setComment":true,                "setType":true,                "setName":true            }        ],        "location":"hdfs://kms-1.apache.com:8020/user/hive/warehouse",        "inputFormat":"org.apache.hadoop.mapred.TextInputFormat",        "outputFormat":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",        "compressed":false,        "numBuckets":-1,        "serdeInfo":{            "name":null,            "serializationLib":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",            "parameters":{                "serialization.format":"    ",                "field.delim":"    "            },            "setSerializationLib":true,            "setParameters":true,            "parametersSize":2,            "setName":false        },        "bucketCols":[        ],        "sortCols":[        ],        "parameters":{        },        "skewedInfo":{            "skewedColNames":[            ],            "skewedColValues":[            ],            "skewedColValueLocationMaps":{            },            "setSkewedColNames":true,            "setSkewedColValues":true,            "setSkewedColValueLocationMaps":true,            "skewedColNamesSize":0,            "skewedColNamesIterator":[            ],            "skewedColValuesSize":0,            "skewedColValuesIterator":[            ],            "skewedColValueLocationMapsSize":0        },        "storedAsSubDirectories":false,        "setCols":true,        "setOutputFormat":true,        "setSerdeInfo":true,        "setBucketCols":true,        "setSortCols":true,        "colsSize":3,        "colsIterator":[            {                "name":"id",                "type":"int",                "comment":"id",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"name",                "type":"string",                "comment":"姓名",                "setComment":true,                "setType":true,                "setName":true            },            {                "name":"age",                "type":"int",                "comment":"年龄",                "setComment":true,                "setType":true,                "setName":true            }        ],        "setCompressed":true,        "setNumBuckets":true,        "bucketColsSize":0,        "bucketColsIterator":[        ],        "sortColsSize":0,        "sortColsIterator":[        ],        "setStoredAsSubDirectories":true,        "setParameters":true,        "setLocation":true,        "setInputFormat":true,        "parametersSize":0,        "setSkewedInfo":true    },    "partitionKeys":[    ],    "parameters":{        "totalSize":"0",        "last_modified_time":"1597989660",        "numFiles":"0",        "transient_lastDdlTime":"1597989660",        "comment":"建表_测试Hive Listener",        "last_modified_by":"anonymous"    },    "viewOriginalText":null,    "viewExpandedText":null,    "tableType":"MANAGED_TABLE",    "privileges":null,    "temporary":false,    "rewriteEnabled":false,    "setParameters":true,    "setPartitionKeys":true,    "partitionKeysSize":0,    "setSd":true,    "setLastAccessTime":true,    "setRetention":true,    "partitionKeysIterator":[    ],    "parametersSize":6,    "setTemporary":false,    "setRewriteEnabled":true,    "setTableName":true,    "setDbName":true,    "setOwner":true,    "setViewOriginalText":false,    "setViewExpandedText":false,    "setTableType":true,    "setPrivileges":false,    "setCreateTime":true}

能够看出:批改之后的表的元数据信息中,蕴含新增加的列age

总结

在本文中,咱们介绍了如何在Hive中操作元数据,从而可能主动进行元数据管理。咱们给出了Hive Hooks和Metastore Listener的根本应用形式,这些形式能够帮忙咱们实现操作元数据。当然也能够将这些元数据信息推送到Kafka中,以此构建本人的元数据管理系统。

公众号『大数据技术与数仓』,回复『材料』支付大数据资料包