关于hive:Hive基于UDF进行文本分词

58次阅读

共计 10469 个字符,预计需要花费 27 分钟才能阅读完成。

本文纲要

UDF 简介

Hive作为一个 sql 查问引擎,自带了一些根本的函数,比方 count(计数)sum(求和),有时候这些根本函数满足不了咱们的需要,这时候就要写hive hdf(user defined funation),又叫用户自定义函数。编写Hive UDF 的步骤:

  • 增加相干依赖,创立我的项目,这里我用的管理工具是 maven,所以我创立的也是一个 maven 我的项目 (这个时候你须要抉择适合的依赖版本,次要是 Hadoop 和 Hive, 能够应用hadoop versionhive --version 来别离查看版本)
  • 继承 org.apache.hadoop.hive.ql.exec.UDF 类,实现 evaluate 办法,而后打包;
  • 应用 add办法增加 jar 包到分布式缓存,如果 jar 包是上传到 $HIVE_HOME/lib/ 目录以下,就不须要执行 add 命令了
  • 通过 create temporary function 创立长期函数,不加 temporary 就创立了一个永恒函数;
  • 在 SQL 中应用你创立的 UDF;

UDF 分词

这个是一个比拟常见的场景,例如公司的产品有每天都会产生大量的弹幕或者评论,这个时候咱们可能会想去剖析一下大家最关怀的热点话题是什么,或者是咱们会剖析最近一段时间的网络趋势是什么,然而这里有一个问题就是你的词库建设的问题,因为你应用通用的词库可能不能达到很好的分词成果,尤其有很多网络风行用语它是不在词库里的,还有一个就是停用词的问题了,因为很多时候停用词是没有意义的,所以这里咱们须要将其过滤,而过滤的形式就是通过停用词词表进行过滤。

这个时候咱们的解决方案次要有两种,一种是应用第三方提供的一些词库,还有一种是自建词库,而后有专人去保护,这个也是比拟常见的一种状况。

最初一个就是咱们应用的分词工具,因为目前支流的分词器很多,抉择不同的分词工具可能对咱们的分词后果有很多影响。

分词工具

1:Elasticsearch 的开源中文分词器 IK Analysis(Star:2471)

IK 中文分词器在 Elasticsearch 上的应用。原生 IK 中文分词是从文件系统中读取词典,es-ik 自身可扩大成从不同的源读取词典。目前提供从 sqlite3 数据库中读取。es-ik-plugin-sqlite3 应用办法:1. 在 elasticsearch.yml 中设置你的 sqlite3 词典的地位:ik_analysis_db_path: /opt/ik/dictionary.db

2:开源的 java 中文分词库 IKAnalyzer(Star:343)

IK Analyzer 是一个开源的,基于 java 语言开发的轻量级的中文分词工具包。从 2006 年 12 月推出 1.0 版开始,IKAnalyzer 曾经推出了 4 个大版本。最后,它是以开源我的项目 Luence 为利用主体的,联合词典分词和文法剖析算法的中文分词组件。从 3.0 版本开始,IK 倒退为面向 Java 的专用分词组件,独立于 Lucene 我的项目

3:java 开源中文分词 Ansj(Star:3019)

Ansj 中文分词 这是一个 ictclas 的 java 实现. 基本上重写了所有的数据结构和算法. 词典是用的开源版的 ictclas 所提供的. 并且进行了局部的人工优化 分词速度达到每秒钟大概 200 万字左右,准确率能达到 96% 以上。

目前实现了. 中文分词. 中文姓名辨认 . 词性标注、用户自定义词典,关键字提取,主动摘要,关键字标记等性能。

能够利用到自然语言解决等方面, 实用于对分词成果要求高的各种我的项目.

4:结巴分词 ElasticSearch 插件(Star:188)

elasticsearch 官网只提供 smartcn 这个中文分词插件,成果不是很好,好在国内有 medcl 大神(国内最早钻研 es 的人之一)写的两个中文分词插件,一个是 ik 的,一个是 mmseg 的

5:Java 分布式中文分词组件 – word 分词(Star:672)

word 分词是一个 Java 实现的分布式的中文分词组件,提供了多种基于词典的分词算法,并利用 ngram 模型来打消歧义。能精确辨认英文、数字,以及日期、工夫等数量词,能辨认人名、地名、组织机构名等未登录词

6:Java 开源中文分词器 jcseg(Star:400)

Jcseg 是什么?Jcseg 是基于 mmseg 算法的一个轻量级开源中文分词器,同时集成了关键字提取,要害短语提取,要害句子提取和文章主动摘要等性能,并且提供了最新版本的 lucene, solr, elasticsearch 的分词接口,Jcseg 自带了一个 jcseg.properties 文件 …

7:中文分词库 Paoding

庖丁中文分词库是一个应用 Java 开发的,可联合到 Lucene 利用中的,为互联网、企业内部网应用的中文搜索引擎分词组件。Paoding 填补了国内中文分词方面开源组件的空白,致力于此并希翼成为互联网网站首选的中文分词开源组件。Paoding 中文分词谋求分词的高效率和用户良好体验。

8:中文分词器 mmseg4j

mmseg4j 用 Chih-Hao Tsai 的 MMSeg 算法 (http://technology.chtsai.org/…) 实现的中文分词器,并实现 lucene 的 analyzer 和 solr 的 TokenizerFactory 以不便在 Lucene 和 Solr 中使 …

9:中文分词 Ansj(Star:3015)

Ansj 中文分词 这是一个 ictclas 的 java 实现. 基本上重写了所有的数据结构和算法. 词典是用的开源版的 ictclas 所提供的. 并且进行了局部的人工优化 内存中中文分词每秒钟大概 100 万字(速度上曾经超过 ictclas) 文件读取分词每秒钟大概 30 万字 准确率能达到 96% 以上 目前实现了 ….

10:Lucene 中文分词库 ICTCLAS4J

ictclas4j 中文分词零碎是 sinboy 在中科院张华温和刘群老师的研制的 FreeICTCLAS 的根底上实现的一个 java 开源分词我的项目,简化了原分词程序的复杂度,旨在为宽广的中文分词爱好者一个更好的学习机会。

代码实现

第一步:引入依赖

这里咱们引入了两个依赖,其实是两个不同分词工具

<dependency>
  <groupId>org.ansj</groupId>
  <artifactId>ansj_seg</artifactId>
  <version>5.1.6</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>com.janeluo</groupId>
  <artifactId>ikanalyzer</artifactId>
  <version>2012_u6</version>
</dependency>

在开始之前咱们先写一个 demo 玩玩,让大家有个根本的意识

@Test
public  void testAnsjSeg() {
    String str = "我叫李太白,我是一个诗人,我生存在唐朝" ;
      // 抉择应用哪种分词器 BaseAnalysis ToAnalysis NlpAnalysis  IndexAnalysis
    Result result = ToAnalysis.parse(str);
    System.out.println(result);
    KeyWordComputer kwc = new KeyWordComputer(5);
    Collection<Keyword> keywords = kwc.computeArticleTfidf(str);
    System.out.println(keywords);
}

输入后果

我 /r, 叫 /v, 李太白 /nr,,/w, 我 /r, 是 /v, 一个 /m, 诗人 /n,,/w, 我 /r, 生存 /vn, 在 /p, 唐朝 /t
[李太白 /24.72276098504223, 诗人 /3.0502185968368885, 唐朝 /0.8965677022546215, 生存 /0.6892230219652541]

第二步:引入停用词词库

因为是停用词词库,自身也不是很大,所以我间接放在我的项目里了,当然你也能够放在其余中央,例如 HDFS 上

第三步:编写 UDF

代码很简略我就不不做具体解释了,须要留神的是GenericUDF 外面的一些办法的应用规定,至于代码设计的好坏以及还有什么改良的计划咱们前面再说,上面两套实现的思路简直是统一的,不一样的是在应用的分词工具上的不一样

ansj 的实现

/**
 * Chinese words segmentation with user-dict in com.kingcall.dic
 * use Ansj(a java open source analyzer)
 */

// 这个信息就是你每次应用 desc 进行获取函数信息的时候返回的
@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using ansj. Return list of words.",
        extended = "Example: select _FUNC_(' 我是测试字符串 ') from src limit 1;\n"
                + "[\" 我 \", \" 是 \", \" 测试 \", \" 字符串 \"]")

public class AnsjSeg extends GenericUDF {private transient ObjectInspectorConverters.Converter[] converters;
    private static final String userDic = "/app/stopwords/com.kingcall.dic";

    //load userDic in hdfs
    static {
        try {FileSystem fs = FileSystem.get(new Configuration());
            FSDataInputStream in = fs.open(new Path(userDic));
            BufferedReader br = new BufferedReader(new InputStreamReader(in));

            String line = null;
            String[] strs = null;
            while ((line = br.readLine()) != null) {line = line.trim();
                if (line.length() > 0) {strs = line.split("\t");
                    strs[0] = strs[0].toLowerCase();
                    DicLibrary.insert(DicLibrary.DEFAULT, strs[0]); //ignore nature and freq
                }
            }
            MyStaticValue.isNameRecognition = Boolean.FALSE;
            MyStaticValue.isQuantifierRecognition = Boolean.TRUE;
        } catch (Exception e) {System.out.println("Error when load userDic" + e.getMessage());
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException("The function AnsjSeg(str) takes 1 or 2 arguments.");
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }


    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {return null;}
        if (2 == arguments.length) {IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }

        Text s = (Text) converters[0].convert(arguments[0].get());
        ArrayList<Text> result = new ArrayList<>();

        if (filterStop) {for (Term words : DicAnalysis.parse(s.toString()).recognition(StopLibrary.get())) {if (words.getName().trim().length() > 0) {result.add(new Text(words.getName().trim()));
                }
            }
        } else {for (Term words : DicAnalysis.parse(s.toString())) {if (words.getName().trim().length() > 0) {result.add(new Text(words.getName().trim()));
                }
            }
        }
        return result;
    }


    @Override
    public String getDisplayString(String[] children) {return getStandardDisplayString("ansj_seg", children);
    }
}

ikanalyzer 的实现

@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.",
        extended = "Example: select _FUNC_(' 我是测试字符串 ') from src limit 1;\n"
                + "[\" 我 \", \" 是 \", \" 测试 \", \" 字符串 \"]")
public class IknalyzerSeg extends GenericUDF {private transient ObjectInspectorConverters.Converter[] converters;
    // 用来寄存停用词的汇合
    Set<String> stopWordSet = new HashSet<String>();

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException("The function AnsjSeg(str) takes 1 or 2 arguments.");
        }
        // 读入停用词文件
        BufferedReader StopWordFileBr = null;
        try {StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt"))));
            // 初如化停用词集
            String stopWord = null;
            for(; (stopWord = StopWordFileBr.readLine()) != null;){stopWordSet.add(stopWord);
            }
        } catch (FileNotFoundException e) {e.printStackTrace();
        } catch (IOException e) {e.printStackTrace();
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {return null;}
        if (2 == arguments.length) {IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }
        Text s = (Text) converters[0].convert(arguments[0].get());
        StringReader reader = new StringReader(s.toString());
        IKSegmenter iks = new IKSegmenter(reader, true);
        List<Text> list = new ArrayList<>();
        if (filterStop) {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {if (!stopWordSet.contains(lexeme.getLexemeText())) {list.add(new Text(lexeme.getLexemeText()));
                    }
                }
            } catch (IOException e) {}} else {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {list.add(new Text(lexeme.getLexemeText()));
                }
            } catch (IOException e) {}}
        return list;
    }

    @Override
    public String getDisplayString(String[] children) {return "Usage: evaluate(String str)";
    }
}

第四步:编写测试用例

GenericUDF 给咱们提供了一些办法,这些办法能够用来构建测试须要的环境和参数,这样咱们就能够测试这些代码了

@Test
public void testAnsjSegFunc() throws HiveException {AnsjSeg udf = new AnsjSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是测试字符串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}


@Test
public void testIkSegFunc() throws HiveException {IknalyzerSeg udf = new IknalyzerSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是测试字符串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}

咱们看到加载停用词没有找到,然而整体还是跑起来了,因为读取不到 HDFS 上的文件

然而咱们第二个样例是不须要从 HDFS 上加载停用词信息,所以能够完满的测试运行

起初为了能在内部更新文件,我将其放在了 HDFS 上,和 AnsjSeg 中的代码一样

第五步:创立 UDF 并应用

add jar /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar;
create temporary function ansjSeg as 'com.kingcall.bigdata.HiveUDF.AnsjSeg';
select ansjSeg("我是字符串,你是啥");
-- 开启停用词过滤
select ansjSeg("我是字符串,你是啥",1);
create temporary function ikSeg as 'com.kingcall.bigdata.HiveUDF.IknalyzerSeg';
select ikSeg("我是字符串,你是啥");
select ikSeg("我是字符串,你是啥",1);

下面办法的第二个参数,就是是否开启停用词过滤, 咱们应用 ikSeg 函数演示一下

上面咱们尝试获取一下函数的形容信息

如果没有写的话,就是上面的这样的

其它利用场景

通过编写 Hive UDF 能够轻松帮咱们实现大量常见需要,其它应该场景还有:

  • ip地址转 地区 :将上报的用户日志中的ip 字段转化为 国家 - 省 - 市 格局,便于做地区散布统计分析;
  • 应用 Hive SQL 计算的标签数据,不想编写 Spark 程序,能够通过 UDF 在动态代码块中初始化连接池,利用 Hive 启动的并行 MR 工作,并行疾速导入大量数据到 codis 中,利用于一些举荐业务;
  • 还有其它 sql 实现绝对简单的工作,都能够编写永恒 Hive UDF 进行转化;

总结

  1. 这一节咱们学习了一个比拟常见的 UDF,通过实现 GenericUDF 抽象类来实现,这一节的重点在于代码的实现以及对GenericUDF 类中办法的了解
  2. 下面的代码实现上有一个问题,那就是对于停用词的加载,就是咱们能不能动静加载停用词呢?

正文完
 0