共计 6448 个字符,预计需要花费 17 分钟才能阅读完成。
1、UDF 的实现办法
Hive 提供了两个实现 UDF 的形式:
1.1、继承 UDF 类
-
长处:
- 实现简略
- 反对 Hive 的根本类型、数组和 Map
- 反对函数重载
-
毛病:
- 逻辑较为简单,只适宜用于实现简略的函数
- 这种形式编码少,代码逻辑清晰,能够疾速实现简略的 UDF
1.2、继承 GenericUDF 类
-
长处:
- 反对任意长度、任意类型的参数
- 能够依据参数个数和类型实现不同的逻辑
- 能够实现初始化和敞开资源的逻辑(initialize、close)
-
毛病:
- 实现比继承 UDF 要简单一些
- 与继承 UDF 相比,GenericUDF 更加灵便,能够实现更为简单的函数
2、继承 UDF 类实现形式
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.journey.udf</groupId>
<artifactId>hive-udf</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>hive-udf</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<java.version>1.8</java.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.69</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- assembly 打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
package com.journey.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
/**
* UDF 曾经废除,倡议用 GenericUDF
*/
public class MyUDF extends UDF {public String evaluate(String value) {return "journey_" + value;}
}
3、继承 GenericUDF 类实现形式
package com.journey.udf;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;
@Description(name = "my_hash", value = "returns hash of the given value")
public class MyHashUDF extends GenericUDF {
private static final String MD2_HASH_TYPE = "md2";
private static final String MD5_HASH_TYPE = "md5";
public static final String UDF_NAME = "my_hash";
private PrimitiveObjectInspector primitiveObjectInspector;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {PrimitiveObjectInspector primitiveObjectInspector = ((PrimitiveObjectInspector) arguments[1]);
this.primitiveObjectInspector = ((PrimitiveObjectInspector) arguments[1]);
// 简略来说就是标识 evaluate 的类型,应用 ObjectInspector 来进行封装,这里其实就是输出类型是什么返回类型就是什么
// 也能够通过 PrimitiveObjectInspectorFactory.xxx,来指定类型的返回
PrimitiveObjectInspector columnType = ((PrimitiveObjectInspector) arguments[0]);
ObjectInspector ret = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(columnType.getPrimitiveCategory());
return ret;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {Text result = new Text();
String propertiesJsonString = (String) primitiveObjectInspector.getPrimitiveJavaObject(arguments[1].get());
JSONObject propertiesJson = JSON.parseObject(propertiesJsonString);
String hashType = propertiesJson.getString("hashType");
DeferredObject col = arguments[0];
StringObjectInspector stringColumn = (StringObjectInspector) primitiveObjectInspector;
String colVal = stringColumn.getPrimitiveJavaObject(col.get());
switch (hashType) {
case MD2_HASH_TYPE:
result.set(DigestUtils.md2Hex(colVal));
break;
case MD5_HASH_TYPE:
result.set(DigestUtils.md5Hex(colVal));
break;
default:
throw new UnsupportedOperationException("hash type must be one of [" + MD2_HASH_TYPE + ",}" + MD5_HASH_TYPE + "]");
}
return result;
}
@Override
public String getDisplayString(String[] children) {return getStandardDisplayString(UDF_NAME, children);
}
}
4、三种加载形式
4.1、SPI 机制间接加载为零碎函数
Hive 端代码实现
org.apache.hadoop.hive.ql.exec#FunctionRegistry
private static final Registry system = new Registry(true);
static {system.registerGenericUDF("concat", GenericUDFConcat.class);
system.registerUDF("substr", UDFSubstr.class, false);
system.registerUDF("substring", UDFSubstr.class, false);
.....
Map<String, Class<? extends GenericUDF>> edapUDFMap = loadEdapUDF();
for (Map.Entry<String, Class<? extends GenericUDF>> entry : edapUDFMap.entrySet()) {system.registerGenericUDF(entry.getKey(), entry.getValue());
}
}
/**
* spi 机制动静加载自定义 udf 函数
*
* @return
*/
private static Map<String, Class<? extends GenericUDF>> loadEdapUDF() {Map<String, Class<? extends GenericUDF>> classMap = new HashMap<>();
ServiceLoader<GenericUDF> loadedUDF = ServiceLoader.load(GenericUDF.class);
Iterator<GenericUDF> udfIterator = loadedUDF.iterator();
while (udfIterator.hasNext()) {Class<? extends GenericUDF> clazz = udfIterator.next().getClass();
Field udfNameField = null;
// UDF_NAME 是静态方法
try {udfNameField = clazz.getDeclaredField("UDF_NAME");
} catch (NoSuchFieldException e) {LOG.warn("Class" + clazz.getName() + "doesn't UDF_NAME filed.");
continue;
}
udfNameField.setAccessible(true);
try {classMap.put(String.valueOf(udfNameField.get(null)), clazz);
} catch (IllegalAccessException e) {LOG.warn("illegal access" + clazz.getName() + "UDF_NAME field value.");
}
}
return classMap;
}
自定义函数
比如说下面 MyHashUDF,能够将该类在 resources 下进行如下配置 :
4.2、创立长期函数
// 将 jar 放入到 classpath 中
add jar /opt/journey/hive/auxlib/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar;
// 创立长期函数
create temporary function my_udf AS 'com.journey.udf.MyUDF';
// 应用函数
select my_udf('zhagnsan');
返回值 : journey_zhagnsan
// 删除长期函数
drop temporary function my_udf;
留神 : 长期函数是不能指定 db 的,所以在拜访的时候也不须要 db,间接在任何库下都能够拜访
4.3、创立永恒函数
// 将 jar 放入到 hdfs 上
hdfs dfs -put /opt/journey/hive/auxlib/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar /journey
// 创立永恒函数
create function db1.my_udf as 'com.journey.udf.MyUDF' using jar 'hdfs://master-52600d0:8020/journey/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar';
// 应用函数
select my_hash("zhagnsan","{'hashType':'md2'}");
select my_hash("zhagnsan","{'hashType':'md5'}");
// 删除永恒函数
drop function db1.my_udf;
select db1.my_udf('zhangsan');
留神 : 创立函数的时候,能够不指定 db,其实就是默认的 default 下的 db 的函数
正文完