关于hive:Hive-UDF函数

114次阅读

共计 6448 个字符,预计需要花费 17 分钟才能阅读完成。

1、UDF 的实现办法

Hive 提供了两个实现 UDF 的形式:

1.1、继承 UDF 类

  • 长处

    • 实现简略
    • 反对 Hive 的根本类型、数组和 Map
    • 反对函数重载
  • 毛病

    • 逻辑较为简单,只适宜用于实现简略的函数
    • 这种形式编码少,代码逻辑清晰,能够疾速实现简略的 UDF

1.2、继承 GenericUDF 类

  • 长处

    • 反对任意长度、任意类型的参数
    • 能够依据参数个数和类型实现不同的逻辑
    • 能够实现初始化和敞开资源的逻辑(initialize、close)
  • 毛病

    • 实现比继承 UDF 要简单一些
    • 与继承 UDF 相比,GenericUDF 更加灵便,能够实现更为简单的函数

2、继承 UDF 类实现形式

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.journey.udf</groupId>
  <artifactId>hive-udf</artifactId>
  <packaging>jar</packaging>
  <version>1.0-SNAPSHOT</version>
  <name>hive-udf</name>
  <url>http://maven.apache.org</url>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
    <java.version>1.8</java.version>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hive</groupId>
      <artifactId>hive-exec</artifactId>
      <version>3.1.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.69</version>
    </dependency>
  </dependencies>
  <build>
    <plugins>
      <!-- assembly 打包插件 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <executions>
          <execution>
            <id>make-assembly</id>
            <phase>package</phase>
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <archive>
            <manifest>
            </manifest>
          </archive>
          <descriptorRefs>
            <descriptorRef>jar-with-dependencies</descriptorRef>
          </descriptorRefs>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>
package com.journey.udf;

import org.apache.hadoop.hive.ql.exec.UDF;

/**
 * UDF 曾经废除,倡议用 GenericUDF
 */
public class MyUDF extends UDF {public String evaluate(String value) {return "journey_" + value;}
}

3、继承 GenericUDF 类实现形式

package com.journey.udf;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;

@Description(name = "my_hash", value = "returns hash of the given value")
public class MyHashUDF extends GenericUDF {

    private static final String MD2_HASH_TYPE = "md2";
    private static final String MD5_HASH_TYPE = "md5";

    public static final String UDF_NAME = "my_hash";

    private PrimitiveObjectInspector primitiveObjectInspector;

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {PrimitiveObjectInspector primitiveObjectInspector = ((PrimitiveObjectInspector) arguments[1]);

        this.primitiveObjectInspector = ((PrimitiveObjectInspector) arguments[1]);
        // 简略来说就是标识 evaluate 的类型,应用 ObjectInspector 来进行封装,这里其实就是输出类型是什么返回类型就是什么
        // 也能够通过 PrimitiveObjectInspectorFactory.xxx,来指定类型的返回
        PrimitiveObjectInspector columnType = ((PrimitiveObjectInspector) arguments[0]);
        ObjectInspector ret = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(columnType.getPrimitiveCategory());
        return ret;
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {Text result = new Text();
        String propertiesJsonString = (String) primitiveObjectInspector.getPrimitiveJavaObject(arguments[1].get());
        JSONObject propertiesJson = JSON.parseObject(propertiesJsonString);
        String hashType = propertiesJson.getString("hashType");

        DeferredObject col = arguments[0];
        StringObjectInspector stringColumn = (StringObjectInspector) primitiveObjectInspector;
        String colVal = stringColumn.getPrimitiveJavaObject(col.get());

        switch (hashType) {
            case MD2_HASH_TYPE:
                result.set(DigestUtils.md2Hex(colVal));
                break;
            case MD5_HASH_TYPE:
                result.set(DigestUtils.md5Hex(colVal));
                break;
            default:
                throw new UnsupportedOperationException("hash type must be one of [" + MD2_HASH_TYPE + ",}" + MD5_HASH_TYPE + "]");
        }
        return result;
    }

    @Override
    public String getDisplayString(String[] children) {return getStandardDisplayString(UDF_NAME, children);
    }
}

4、三种加载形式

4.1、SPI 机制间接加载为零碎函数

Hive 端代码实现

org.apache.hadoop.hive.ql.exec#FunctionRegistry

private static final Registry system = new Registry(true);

  static {system.registerGenericUDF("concat", GenericUDFConcat.class);
    system.registerUDF("substr", UDFSubstr.class, false);
    system.registerUDF("substring", UDFSubstr.class, false);
    .....
    Map<String, Class<? extends GenericUDF>> edapUDFMap = loadEdapUDF();
        for (Map.Entry<String, Class<? extends GenericUDF>> entry : edapUDFMap.entrySet()) {system.registerGenericUDF(entry.getKey(), entry.getValue());
        }
    }
 /**
   * spi 机制动静加载自定义 udf 函数
   *
   * @return
   */
  private static Map<String, Class<? extends GenericUDF>> loadEdapUDF() {Map<String, Class<? extends GenericUDF>> classMap = new HashMap<>();

      ServiceLoader<GenericUDF> loadedUDF = ServiceLoader.load(GenericUDF.class);
      Iterator<GenericUDF> udfIterator = loadedUDF.iterator();

      while (udfIterator.hasNext()) {Class<? extends GenericUDF> clazz = udfIterator.next().getClass();
          Field udfNameField = null;

          // UDF_NAME 是静态方法
          try {udfNameField = clazz.getDeclaredField("UDF_NAME");
          } catch (NoSuchFieldException e) {LOG.warn("Class" + clazz.getName() + "doesn't UDF_NAME filed.");
              continue;
          }

          udfNameField.setAccessible(true);

          try {classMap.put(String.valueOf(udfNameField.get(null)), clazz);
          } catch (IllegalAccessException e) {LOG.warn("illegal access" + clazz.getName() + "UDF_NAME field value.");
          }
      }
      return classMap;
  }

自定义函数

比如说下面 MyHashUDF,能够将该类在 resources 下进行如下配置 :

4.2、创立长期函数

// 将 jar 放入到 classpath 中
add jar /opt/journey/hive/auxlib/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar;

// 创立长期函数
create temporary function my_udf AS 'com.journey.udf.MyUDF';

// 应用函数
select my_udf('zhagnsan');
返回值 : journey_zhagnsan

// 删除长期函数
drop temporary function my_udf;

留神 : 长期函数是不能指定 db 的,所以在拜访的时候也不须要 db,间接在任何库下都能够拜访

4.3、创立永恒函数

// 将 jar 放入到 hdfs 上
hdfs dfs -put /opt/journey/hive/auxlib/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar /journey

// 创立永恒函数
create function db1.my_udf as 'com.journey.udf.MyUDF' using jar 'hdfs://master-52600d0:8020/journey/hive-udf-1.0-SNAPSHOT-jar-with-dependencies.jar';

// 应用函数
select my_hash("zhagnsan","{'hashType':'md2'}");
select my_hash("zhagnsan","{'hashType':'md5'}");

// 删除永恒函数
drop function db1.my_udf;

select db1.my_udf('zhangsan');

留神 : 创立函数的时候,能够不指定 db,其实就是默认的 default 下的 db 的函数

正文完
 0