Hive 自定义function

Learningsuch as何Development and usingHive 自定义function (UDF, UDAF, UDTF) , scaleHive functions

1. 自定义functionoverview

Hiveproviding了丰富 in 置function, 但 in practicalapplicationin, 经常需要根据业务requirementsDevelopment自定义function. Hivesupport三种class型 自定义function:

UDF (user自定义function)

一 for 一maprelationships, 接收一个 or many 个输入parameter, 返回一个输出结果

  • example: stringprocessing, 数值计算
  • inheritance: org.apache.hadoop.hive.ql.exec.UDF

UDAF (user自定义aggregatefunction)

many for 一maprelationships, 接收 many 个输入parameter, 返回一个aggregate结果

  • example: 自定义求 and , 平均值计算
  • inheritance: org.apache.hadoop.hive.ql.exec.UDAF

UDTF (user自定义表生成function)

一 for many maprelationships, 接收一个输入parameter, 返回 many 行结果

  • example: 行转列, JSON解析
  • inheritance: org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

2. DevelopmentUDF (user自定义function)

2.1 UDFDevelopment步骤

  1. creationMavenproject, 添加Hive依赖
  2. writingUDFclass, inheritanceorg.apache.hadoop.hive.ql.exec.UDF
  3. implementationevaluate()method, 该method is UDF core逻辑
  4. 编译打package生成JARfile
  5. in Hiveinregister and usingUDF

2.2 creationMavenproject

首先creation一个Mavenproject, in pom.xmlin添加Hive依赖:


    4.0.0
    
    com.example
    hive-udf
    1.0-SNAPSHOT
    
    
        3.1.2
        3.2.1
        1.8
    
    
    
        
        
            org.apache.hive
            hive-exec
            ${hive.version}
            provided
        
        
        
        
            org.apache.hadoop
            hadoop-common
            ${hadoop.version}
            provided
        
    
    
    
        
            
                org.apache.maven.plugins
                maven-compiler-plugin
                3.8.1
                
                    ${java.version}
                    ${java.version}
                
            
        
    

2.3 writingUDFclass

writing一个 simple UDF, 用于将string转换 for big 写:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

/**
 * 自定义UDF: 将string转换 for  big 写
 */
public class UpperCaseUDF extends UDF {
    
    /**
     * evaluatemethod is UDF core, 必须implementation
     * support重载, 可以 has  many 个evaluatemethod
     */
    public Text evaluate(Text input) {
        // processingnull值
        if (input == null) {
            return null;
        }
        
        // 将string转换 for  big 写
        String upperCase = input.toString().toUpperCase();
        return new Text(upperCase);
    }
    
    // 重载method, supportStringclass型输入
    public Text evaluate(String input) {
        if (input == null) {
            return null;
        }
        return new Text(input.toUpperCase());
    }
}

2.4 编译打package

usingMaven编译打package生成JARfile:

mvn clean package

2.5 in Hiveinregister and usingUDF

has 两种方式 in HiveinregisterUDF:

方式一: 临时register

临时register UDF只 in 当 before session has 效:

-- 添加JARpackage to Hivesession
hive> ADD JAR /path/to/hive-udf-1.0-SNAPSHOT.jar;

-- creation临时function
hive> CREATE TEMPORARY FUNCTION to_upper AS 'com.example.hive.udf.UpperCaseUDF';

-- using自定义function
hive> SELECT to_upper('hello world');
-- 输出: HELLO WORLD

-- query表data时using
hive> SELECT id, to_upper(username) AS upper_username FROM users;

方式二: 永久register

永久register UDF in 所 has sessionin has 效:

-- creation永久function (需要Hive 2.2.0+) 
hive> CREATE FUNCTION to_upper AS 'com.example.hive.udf.UpperCaseUDF'
    > USING JAR 'hdfs:///user/hive/udf/hive-udf-1.0-SNAPSHOT.jar';

-- 查看所 has function
hive> SHOW FUNCTIONS LIKE 'to_upper';

-- 查看function详情
hive> DESCRIBE FUNCTION to_upper;
-- 输出: to_upper(string) - Converts a string to uppercase

3. DevelopmentUDAF (user自定义aggregatefunction)

3.1 UDAFoverview

UDAF用于implementation自定义 aggregateoperation, such as求 and , 平均值, 最 big 值etc.. UDAF Development比UDF complex , 需要implementation以 under component:

  • Evaluator: 负责aggregate逻辑 implementation
  • 初始化, iteration, merge and 终止四个阶段

3.2 UDAFDevelopment步骤

  1. creationMavenproject, 添加Hive依赖 ( and UDF相同)
  2. writingUDAFclass, inheritanceorg.apache.hadoop.hive.ql.exec.UDAF
  3. writingEvaluator in 部class, implementationUDAFEvaluatorinterface
  4. implementationevaluatemethod 四个阶段: init, iterate, merge, terminate
  5. 编译打package生成JARfile
  6. in Hiveinregister and usingUDAF

3.3 writingUDAFclass

writing一个 simple UDAF, 用于计算平均值:

package com.example.hive.udaf;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;

/**
 * 自定义UDAF: 计算平均值
 */
public class AvgUDAF extends UDAF {
    
    /**
     * Evaluator in 部class, implementationaggregate逻辑
     */
    public static class AvgEvaluator implements UDAFEvaluator {
        
        // storein间结果: sum and count
        public static class PartialResult {
            long count;
            double sum;
        }
        
        private PartialResult partial;
        
        /**
         * 初始化method,  in aggregate开始时调用
         */
        @Override
        public void init() {
            partial = null;
        }
        
        /**
         * iterationmethod, processing每个输入行
         */
        public boolean iterate(DoubleWritable value) {
            if (value == null) {
                return true;
            }
            
            if (partial == null) {
                partial = new PartialResult();
            }
            
            partial.sum += value.get();
            partial.count++;
            return true;
        }
        
        /**
         * 终止局部aggregate, 返回in间结果
         */
        public PartialResult terminatePartial() {
            return partial == null ? null : partial;
        }
        
        /**
         * mergemethod, merge many 个部分结果
         */
        public boolean merge(PartialResult other) {
            if (other == null) {
                return true;
            }
            
            if (partial == null) {
                partial = new PartialResult();
            }
            
            partial.sum += other.sum;
            partial.count += other.count;
            return true;
        }
        
        /**
         * 终止全局aggregate, 返回最终结果
         */
        public DoubleWritable terminate() {
            if (partial == null || partial.count == 0) {
                return null;
            }
            
            return new DoubleWritable(partial.sum / partial.count);
        }
    }
}

3.4 in HiveinusingUDAF

-- 添加JARpackage
hive> ADD JAR /path/to/hive-udf-1.0-SNAPSHOT.jar;

-- creation临时aggregatefunction
hive> CREATE TEMPORARY FUNCTION my_avg AS 'com.example.hive.udaf.AvgUDAF';

-- using自定义aggregatefunction
hive> SELECT my_avg(amount) AS avg_amount FROM sales;
-- 输出: 平均销售额

-- groupusing
hive> SELECT category, my_avg(price) AS avg_price FROM products GROUP BY category;

4. DevelopmentUDTF (user自定义表生成function)

4.1 UDTFoverview

UDTF用于将一行data转换 for many 行data, such as将array or JSONstring拆分 for many 行. UDTF Development需要inheritanceGenericUDTFclass, implementationinitialize, process and closemethod.

4.2 UDTFDevelopment步骤

  1. creationMavenproject, 添加Hive依赖 ( and UDF相同)
  2. writingUDTFclass, inheritanceorg.apache.hadoop.hive.ql.udf.generic.GenericUDTF
  3. implementationinitializemethod, 定义输出列 class型
  4. implementationprocessmethod, processing输入data并生成 many 行输出
  5. implementationclosemethod, cleanresource
  6. 编译打package生成JARfile
  7. in Hiveinregister and usingUDTF

4.3 writingUDTFclass

writing一个 simple UDTF, 用于将string按指定分隔符拆分 for many 行:

package com.example.hive.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

/**
 * 自定义UDTF: 将string按分隔符拆分 for  many 行
 */
public class SplitUDTF extends GenericUDTF {
    
    private String delimiter = ",";
    private final transient Object[] forwardObj = new Object[1];
    
    /**
     * 初始化method, 定义输出列 class型 and 名称
     */
    @Override
    public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
        // processingparameter
        if (argOIs.length > 1) {
            // 第二个parameter for 分隔符
            delimiter = argOIs[1].getTypeName();
        }
        
        // 定义输出列名
        List fieldNames = new ArrayList<>();
        fieldNames.add("word");
        
        // 定义输出列class型
        List fieldOIs = new ArrayList<>();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }
    
    /**
     * processing输入data, 生成 many 行输出
     */
    @Override
    public void process(Object[] args) throws HiveException {
        // processing输入parameter
        if (args == null || args[0] == null) {
            return;
        }
        
        String input = args[0].toString();
        
        // 按分隔符拆分string
        String[] parts = input.split(delimiter);
        
        // 生成 many 行输出
        for (String part : parts) {
            forwardObj[0] = part;
            forward(forwardObj); // 输出一行data
        }
    }
    
    /**
     * cleanresource
     */
    @Override
    public void close() throws HiveException {
        // 无需特殊clean
    }
}

4.4 in HiveinusingUDTF

-- 添加JARpackage
hive> ADD JAR /path/to/hive-udf-1.0-SNAPSHOT.jar;

-- creation临时表生成function
hive> CREATE TEMPORARY FUNCTION my_split AS 'com.example.hive.udtf.SplitUDTF';

-- usingUDTF (LATERAL VIEW方式) 
hive> SELECT id, word
    > FROM users
    > LATERAL VIEW my_split(hobbies, ',') exploded_hobbies AS word;
-- 输出: 每个user 爱 good 拆分 for  many 行

-- 直接usingUDTF
hive> SELECT my_split('a,b,c,d');
-- 输出: 
-- a
-- b
-- c
-- d

-- using自定义分隔符
hive> SELECT my_split('x|y|z', '|');
-- 输出: 
-- x
-- y
-- z

5. 自定义functionbest practices

5.1 Developmentbest practices

  • processingnull值: in 所 has 自定义functionin, 都需要妥善processingnull输入值, 避免空指针exception
  • class型security: 确保输入输出class型 正确性, usingHadoop Writableclass型processingdata
  • performanceoptimization:
    • reducingobjectcreation, 特别 is in 热点codein
    • usingcachemechanismstore频繁using data
    • 避免 in processmethodin执行 complex 计算
  • test充分: writing单元test, test各种输入circumstances, includingedge界条件
  • documentation完善: for function添加清晰 comment and documentation, 说明其用途, parameter and return value

5.2 usingbest practices

  • 选择合适 register方式: 根据requirements选择临时register or 永久register
  • function命名规范: using清晰, has 意义 function名, 避免 and in 置functionconflict
  • versionmanagement: for UDF JARpackage添加version号, 便于management and upgrade
  • monitor and 调优: monitor自定义function performance, 及时optimization瓶颈
  • security性考虑: 限制UDF permission, 避免securityrisk

5.3 commonissues and solution

commonissues

  • ClassNotFoundException: 确保JARpackagepath正确, 并且package含所 has 依赖
  • NullPointerException: 未processingnull输入值, in codein添加nullcheck
  • performanceissues: optimizationcode, reducingobjectcreation and complex 计算
  • memory溢出: in UDTFin避免store big 量data, 及时释放resource
  • class型不匹配: 确保输入输出class型 and Hive表 字段class型匹配

6. in 置自定义functionframework

6.1 Hive in 置 UDFframework

Hiveproviding了一些 in 置 自定义functionframework, 可以简化UDF Development:

GenericUDF

GenericUDF is Hive 0.13引入 new UDFframework, providing了更 good class型check and errorprocessing:

  • support complex class型 (such asarray, map, structure体)
  • providing更 good class型security性
  • support可变parameter

GenericUDAF

GenericUDAF is Hive 0.13引入 new UDAFframework, providing了更flexible aggregatefunctions:

  • support complex class型 aggregate
  • providing更 good performance
  • support部分aggregate and merge

6.2 usingGenericUDFDevelopmentUDF

usingGenericUDFDevelopment一个 simple UDF, 用于获取array 第一个元素:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

/**
 * GenericUDFexample: 获取array 第一个元素
 */
@Description(
    name = "get_first_element",
    value = "_FUNC_(array) - Returns the first element of an array",
    extended = "Example:\n  > SELECT _FUNC_(array(1, 2, 3)) FROM table;\n  1"
)
public class GetFirstElementUDF extends GenericUDF {
    
    private ListObjectInspector listOI;
    
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // checkparameter数量
        if (arguments.length != 1) {
            throw new UDFArgumentException("Expected exactly one argument");
        }
        
        // checkparameterclass型
        if (!(arguments[0] instanceof ListObjectInspector)) {
            throw new UDFArgumentException("Expected an array as input");
        }
        
        listOI = (ListObjectInspector) arguments[0];
        
        // 返回输出class型: array元素 class型
        return listOI.getListElementObjectInspector();
    }
    
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        if (arguments[0].get() == null) {
            return null;
        }
        
        // 获取array
        Object array = arguments[0].get();
        
        // 获取array big  small 
        int size = listOI.getListLength(array);
        if (size == 0) {
            return null;
        }
        
        // 返回第一个元素
        return listOI.getListElement(array, 0);
    }
    
    @Override
    public String getDisplayString(String[] children) {
        return "get_first_element(" + children[0] + ")";
    }
}

7. summarized

自定义function is Hivescalefunctions important 方式, throughDevelopmentUDF, UDAF and UDTF, 可以满足各种 complex 业务requirements.

  • UDF: 用于一 for 一 转换operation, such asstringprocessing, 数值计算etc.
  • UDAF: 用于 many for 一 aggregateoperation, such as自定义求 and , 平均值etc.
  • UDTF: 用于一 for many 表生成operation, such as拆分string, 解析JSONetc.

in Development自定义function时, 需要注意以 under 几点:

  • 妥善processingnull值, 避免空指针exception
  • optimizationperformance, reducingobjectcreation and complex 计算
  • writing充分 test, 确保function 正确性
  • using清晰 命名 and 完善 documentation
  • 根据requirements选择合适 register方式 (临时 or 永久)

through合理using自定义function, 可以显著improvingHive flexible性 and scale性, 满足各种 complex dataprocessingrequirements.