1、UDF的实现办法
Hive 提供了两个实现 UDF 的形式:
1.1、继承UDF类
长处:
- 实现简略
- 反对Hive的根本类型、数组和Map
- 反对函数重载
毛病:
- 逻辑较为简单,只适宜用于实现简略的函数
- 这种形式编码少,代码逻辑清晰,能够疾速实现简略的UDF
1.2、继承GenericUDF类
长处:
- 反对任意长度、任意类型的参数
- 能够依据参数个数和类型实现不同的逻辑
- 能够实现初始化和敞开资源的逻辑(initialize、close)
毛病:
- 实现比继承UDF要简单一些
- 与继承 UDF 相比,GenericUDF 更加灵便,能够实现更为简单的函数
2、继承UDF类实现形式
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.journey.udf</groupId> <artifactId>hive-udf</artifactId> <packaging>jar</packaging> <version>1.0-SNAPSHOT</version> <name>hive-udf</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.encoding>UTF-8</maven.compiler.encoding> <java.version>1.8</java.version> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.69</version> </dependency> </dependencies> <build> <plugins> <!-- assembly打包插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> <configuration> <archive> <manifest> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build></project>
package com.journey.udf;import org.apache.hadoop.hive.ql.exec.UDF;/** * UDF曾经废除,倡议用GenericUDF */public class MyUDF extends UDF { public String evaluate(String value) { return "journey_" + value; }}
3、继承GenericUDF类实现形式
package com.journey.udf;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.commons.codec.digest.DigestUtils;import org.apache.hadoop.hive.ql.exec.Description;import org.apache.hadoop.hive.ql.exec.UDFArgumentException;import org.apache.hadoop.hive.ql.metadata.HiveException;import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;import org.apache.hadoop.io.Text;@Description(name = "my_hash", value = "returns hash of the given value")public class MyHashUDF extends GenericUDF { private static final String MD2_HASH_TYPE = "md2"; private static final String MD5_HASH_TYPE = "md5"; public static final String UDF_NAME = "my_hash"; private PrimitiveObjectInspector primitiveObjectInspector; @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { PrimitiveObjectInspector primitiveObjectInspector = ((PrimitiveObjectInspector) arguments[1]); this.primitiveObjectInspector = ((PrimitiveObjectInspector) arguments[1]); // 简略来说就是标识evaluate的类型,应用ObjectInspector来进行封装,这里其实就是输出类型是什么返回类型就是什么 // 也能够通过 PrimitiveObjectInspectorFactory.xxx,来指定类型的返回 PrimitiveObjectInspector columnType = ((PrimitiveObjectInspector) arguments[0]); ObjectInspector ret = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(columnType.getPrimitiveCategory()); return ret; } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { Text result = new Text(); String propertiesJsonString = (String) primitiveObjectInspector.getPrimitiveJavaObject(arguments[1].get()); JSONObject propertiesJson = JSON.parseObject(propertiesJsonString); String hashType = propertiesJson.getString("hashType"); DeferredObject col = arguments[0]; StringObjectInspector stringColumn = (StringObjectInspector) primitiveObjectInspector; String colVal = stringColumn.getPrimitiveJavaObject(col.get()); switch (hashType) { case MD2_HASH_TYPE: result.set(DigestUtils.md2Hex(colVal)); break; case MD5_HASH_TYPE: result.set(DigestUtils.md5Hex(colVal)); break; default: throw new UnsupportedOperationException("hash type must be one of [" + MD2_HASH_TYPE + ",}" + MD5_HASH_TYPE + "]"); } return result; } @Override public String getDisplayString(String[] children) { return getStandardDisplayString(UDF_NAME, children); }}
4、三种加载形式
4.1、SPI机制间接加载为零碎函数
Hive端代码实现
org.apache.hadoop.hive.ql.exec#FunctionRegistry
private static final Registry system = new Registry(true); static { system.registerGenericUDF("concat", GenericUDFConcat.class); system.registerUDF("substr", UDFSubstr.class, false); system.registerUDF("substring", UDFSubstr.class, false); ..... Map<String, Class<? extends GenericUDF>> edapUDFMap = loadEdapUDF(); for (Map.Entry<String, Class<? extends GenericUDF>> entry : edapUDFMap.entrySet()) { system.registerGenericUDF(entry.getKey(), entry.getValue()); } }
/** * spi机制动静加载自定义udf函数 * * @return */ private static Map<String, Class<? extends GenericUDF>> loadEdapUDF() { Map<String, Class<? extends GenericUDF>> classMap = new HashMap<>(); ServiceLoader<GenericUDF> loadedUDF = ServiceLoader.load(GenericUDF.class); Iterator<GenericUDF> udfIterator = loadedUDF.iterator(); while (udfIterator.hasNext()) { Class<? extends GenericUDF> clazz = udfIterator.next().getClass(); Field udfNameField = null; // UDF_NAME 是静态方法 try { udfNameField = clazz.getDeclaredField("UDF_NAME"); } catch (NoSuchFieldException e) { LOG.warn("Class" + clazz.getName() + " doesn't UDF_NAME filed."); continue; } udfNameField.setAccessible(true); try { classMap.put(String.valueOf(udfNameField.get(null)), clazz); } catch (IllegalAccessException e) { LOG.warn("illegal access " + clazz.getName() + " UDF_NAME field value."); } } return classMap; }
自定义函数
比如说下面 MyHashUDF,能够将该类在resources下进行如下配置 :
4.2、创立长期函数
// 将jar放入到classpath中add jar /opt/journey/hive/auxlib/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar;// 创立长期函数create temporary function my_udf AS 'com.journey.udf.MyUDF';// 应用函数select my_udf('zhagnsan');返回值 : journey_zhagnsan// 删除长期函数drop temporary function my_udf;留神 : 长期函数是不能指定db的,所以在拜访的时候也不须要db,间接在任何库下都能够拜访
4.3、创立永恒函数
// 将jar放入到hdfs上hdfs dfs -put /opt/journey/hive/auxlib/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar /journey// 创立永恒函数create function db1.my_udf as 'com.journey.udf.MyUDF' using jar 'hdfs://master-52600d0:8020/journey/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar';// 应用函数select my_hash("zhagnsan","{'hashType':'md2'}");select my_hash("zhagnsan","{'hashType':'md5'}");// 删除永恒函数drop function db1.my_udf;select db1.my_udf('zhangsan');留神 : 创立函数的时候,能够不指定db,其实就是默认的default下的db的函数