HIVE 基础-->UDF 函数,优化

分享 123456789987654321 ⋅ 于 2021-07-03 20:15:32 ⋅ 191 阅读

idea运行 运行 hive模型

//查看hive目录
which hive 
//通过sh -x /usr/local/hive/bin/hive查看执行了那个类
sh -x /hive目录

1.hiveRunner启动类

1.ctrl+N 搜索jar包中的启动类 org.apache.hadoop.hive.cli.CliDrive
public static void main(String[] args) throws Exception {
        System.setProperty("jline.WindowsTerminal.directConsole", "false");
        int ret = (new CliDriver()).run(args);
        System.exit(ret);
    }

2.建立mysql用户

-- 创建hive的数据库
create database hive_meta default charset utf8 collate utf8_general_ci;
-- 创建用户
CREATE USER 'hive'@'%' IDENTIFIED BY '000000'; 
CREATE USER 'hive'@'localhost' IDENTIFIED BY '000000'; 
-- 用户赋权并制定访问权限
grant all privileges on hive_meta.* to 'hive'@'%' identified by '000000';
grant all privileges on hive_meta.* to 'hive'@'localhost' identified by '000000';
-- 刷新权限
flush privileges; 

运行本地sql文件,创建原始hive表

3.加入mysql驱动

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.35</version>
    </dependency>

4.获取hive的两个配置文件

修改hive-site.xml

<configuration>
    <!-- 数据库 start -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://127.0.0.1:3306/hive_meta</value>
        <description>mysql连接</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>mysql驱动</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>hive</value>
        <description>数据库使用用户名</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>000000</value>
        <description>数据库密码</description>
    </property>
    <!-- 数据库 end -->

    <property>
        <name>hive.metastore.warehouse.dir</name>
        <value>/tmp/hive/warehouse</value>
        <description>hive使用的HDFS目录</description>
    </property>
    <!--不校验hive的版本和metastore版本是否匹配 -->
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>
    <property>
        <name>hive.cli.print.current.db</name>
        <value>true</value>
    </property>
    <property>
       <name>hive.cli.print.header</name>
       <value>true</value>
    </property>
</configuration>

5.采坑hive找不到文件

1.hive文件权限问题
https://blog.csdn.net/iamboluke/article/details/103878312

C:\Users\Ms.ma>echo %HADOOP_HOME%
D:\develop\hadoop
C:\Users\Ms.ma>%HADOOP_HOME%\bin\winutils.exe chmod 777 E:\tmp\hive
C:\Users\Ms.ma>%HADOOP_HOME%\bin\winutils.exe ls E:\tmp\hive
drwxrwxrwx 1 BUILTIN\Administrators ALIENWARE\None 0 Mar 24 2021 E:\tmp\hive
2.配置hadoop环境变量
3.idea的hive实在本地新建一个数据库,不是在线上服务器的数据库

一、UDF函数

1.编写udf

package com.bigdata.hive.function;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

/**
 * @Description(描述): UDF函数
 * @Version(版本): 1.0
 * @Author(创建者): ALIENWARE
 * @Date(创建时间): Created in 2021/7/2.
 * @ * * * * * * * * * * * * * @
 */
public class CountryCode2CountryNameUDF  extends GenericUDF {

    private static Map<String,String> countryMap = new HashMap<String,String>();

    static{
        // 将 字典文件数据写入内存
        try(
                InputStream is = CountryCode2CountryNameUDF.class.getResourceAsStream("/country_dict.txt");
                BufferedReader reader = new BufferedReader(new InputStreamReader(is, "utf-8"));
        ){
            String line = null;
            while((line = reader.readLine()) != null){
                String[] arr = line.split("\t");
                String code = arr[0];
                String name = arr[1];
                countryMap.put(code, name);
            }
            System.out.println("countryMap.size: " + countryMap.size());
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    //初始化参数校验
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        System.out.println("initialize");
        // 校验函数输入参数和 设置函数返回值类型
        // 1) 校验入参个数
        if(arguments.length != 1){
            throw new UDFArgumentException("input params must one");
        }

        ObjectInspector inspector = arguments[0];
        // 2) 校验参数的大类
        // 大类有: PRIMITIVE(基本类型), LIST, MAP, STRUCT, UNION
        if(! inspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
            throw new UDFArgumentException("input params Category must PRIMITIVE");
        }

        // 3) 校验参数的小类
//      VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
//      DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
//      UNKNOWN
        if(! inspector.getTypeName().equalsIgnoreCase(PrimitiveObjectInspector.PrimitiveCategory.STRING.name())){
            throw new UDFArgumentException("input params PRIMITIVE Category type must STRING");
        }

        // 4) 设置函数返回值类型
        // writableStringObjectInspector 里面有 Text, Text 里面有String类型
        return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
    }

    /**
     * 定义函数输出对象
     */
    Text output = new Text();
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        System.out.println("evaluate");
        // 核心算法, 一行调用一次

        // 获取入参数据
        Object obj = arguments[0].get();

        String code = null;
        if(obj instanceof LazyString){
            LazyString lz = (LazyString)obj;
            Text t = lz.getWritableObject();
            code = t.toString();
        }else if(obj instanceof Text){
            Text t = (Text)obj;
            code = t.toString();
        }else{
            code = (String)obj;
        }
        // 翻译国家码
        String countryName = countryMap.get(code);
        countryName = countryName == null ? "某个小国" : countryName;

        output.set(countryName);
        return output;
    }

    //说明
    @Override
    public String getDisplayString(String[] strings) {

        return Arrays.toString(strings);
    }

}

------------------------------------------------------------------------------------
//可以实现 configure 获取 job对象
    @Override
    public void configure(MapredContext context) {
        JobConf jobConf = context.getJobConf();
        super.configure(context);
    }

2.hive加载并使用自定义函数

-- 创建自定义函数
CREATE TEMPORARY FUNCTION f1 AS 'com\bigdata\hive\function\CountryCode2CountryNameUDF.java';

--使用自定义函数
select country,f1(country) as name  from user_install_status limit 10;

二、Hive 编写 UDAF 函数实现

用户自定义聚合函数实现
适用于聚合函数 (列转行函数)

1)自定义UDAF类,需要继承AbstractGenericUDAFResolver;
2)自定义Evaluator类,需要继承GenericUDAFEvaluator,真正实现UDAF的逻辑;
3)自定义bean类,需要继承 AbstractAggregationBuffer,用于在mapper或reducer内部传递数据;

Mode

public static enum Mode {  
    /** 
     * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合 
     * 将会调用iterate()和terminatePartial() 
     */  
    PARTIAL1,  
        /** 
     * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据::从部分数据聚合到部分数据聚合: 
     * 将会调用merge() 和 terminatePartial()  
     */  
    PARTIAL2,  
        /** 
     * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合  
     * 将会调用merge()和terminate() 
     */  
    FINAL,  
        /** 
     * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合 
      * 将会调用 iterate()和terminate() 
     */  
    COMPLETE  
  };  

代码实现

package com.bigdata.hive.function;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.lazy.LazyInteger;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

/**

public class SumUDAF extends AbstractGenericUDAFResolver{

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) throws SemanticException {
        // 校验函数输入参数  和 返回 具体能实现udaf函数的Evaluator对象
        // 校验函数入参的数量
        if(info.length != 1){
            throw new UDFArgumentException("input param must one!");
        }

        // 校验函数入参的大类类型
        ObjectInspector inspector = (ObjectInspector) info[0];
        // 获取大类类型的方法: inspector.getCategory()
        // 获取小类类型的方法:inspector.getTypeName()
        if(! inspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
            throw new UDFArgumentException("input param Category must PRIMITIVE!");
        }

        // 校验函数入参的小类类型
        //  PRIMITIVE大类类型有     VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
        //  DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,UNKNOWN 小类类型
        if(! inspector.getTypeName().equalsIgnoreCase(PrimitiveObjectInspector.PrimitiveCategory.INT.name())){
            throw new UDFArgumentException("input param PRIMITIVE Category must INT type");
        }

        return new SumEvaluator();

    }

    /**
     * 实现sum函数的核心类
     */
    public static class SumEvaluator extends  GenericUDAFEvaluator{

        /**
         * 用来存mapper内部(局部) 或 reducer内部聚合的结果(全局)
         */
        public static class SumAgg extends AbstractAggregationBuffer{
            private int sum = 0;

            public int getSum() {
                return sum;
            }

            public void setSum(int sum) {
                this.sum = sum;
            }

        }

        /**
         * 用于mapper端和reducer端的输出
         */
        IntWritable output = new IntWritable();

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
            // 设置各个阶段的返回值类型

            // 因为 mapper阶段和reducer阶段都是输出int类型,那就不用分阶段了,这个比较特殊
            super.init(m, parameters);
            // writableIntObjectInspector(IntWritable)
            return PrimitiveObjectInspectorFactory.writableIntObjectInspector;

        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            // 获取新的bean对象
            return new SumAgg();

        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            // bean对象可能会重复应用,应用前清0
            SumAgg sumAgg = (SumAgg)agg;
            sumAgg.setSum(0);
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
            // 把mapper中每行的数据汇总到一起,放到一个bean里面,这个bean在mapper和reducer的内部。
            // 通俗理解: 将 每行的 parameters里面的金额,汇总到 bean对象里

            Object object = parameters[0];
            int num = 0;
            if(object instanceof LazyInteger){
                // LazyString ---> Text  --> String
                LazyInteger lz = (LazyInteger)object;
                IntWritable t = lz.getWritableObject();
                num = t.get();
            }else if(object instanceof IntWritable){
                IntWritable t = (IntWritable)object;
                num = t.get();
            }else{
                num = (Integer)object;
            }

            SumAgg sumAgg = (SumAgg)agg;
            // 之前的 + 当前的 = 汇总后的
            sumAgg.setSum(sumAgg.getSum() + num);
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            // 把带有中间结果的bean转换成能实现序列化在mapper 和 reducer 端传输的对象。
            // 通俗理解: bean对象里的 汇总结果 序列化到 IntWritable, 用于输出到reducer
            SumAgg sumAgg = (SumAgg)agg;
            output.set(sumAgg.getSum());

            return output;

        }

        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
            // reduce端把mapper端输出的数据进行全局合并,把合并的结果放到bean里
            // 通俗理解: mapper 输出的局部数据 partial, 汇总到  bean对象里
            int num = 0;
            if(partial instanceof LazyInteger){
                // LazyString ---> Text  --> String
                LazyInteger lz = (LazyInteger)partial;
                IntWritable t = lz.getWritableObject();
                num = t.get();
            }else if(partial instanceof IntWritable){
                IntWritable t = (IntWritable)partial;
                num = t.get();
            }else{
                num = (Integer)partial;
            }

            SumAgg sumAgg = (SumAgg)agg;
            // 之前的 + 当前的 = 汇总后的
            sumAgg.setSum(sumAgg.getSum() + num);
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            // 把bean里的全局聚合结果,转换成能实现序列化的输出对象
            SumAgg sumAgg = (SumAgg)agg;
            output.set(sumAgg.getSum());

            return output;
        }
    }
}

三、Hive 编写 UDTF 函数实现

用户自定义表函数实现
将 整行数据拆分成多行多列的数据 --> 一般成这个函数叫做 行转列函数
UDTF(User-Defined Table-Generating Functions)  :接受零个或者多个输入,然后产生多列或者多行输出。

--原数据
id        name_nickname
1    name1#n1;name2#n2
2    name3#n3;name4#n4;name5#n5 
split_udtf 之后得到的结果

--结果数据
id        name        nickname
1        name1        n1
1        name2        n2
2        name3        n3   
2        name4        n4
2        name5        n5
1.1 开发UDAF步骤
1)自定义UDAF类,需要继承AbstractGenericUDAFResolver;
2)自定义Evaluator类,需要继承GenericUDAFEvaluator,真正实现UDAF的逻辑;
3)自定义bean类,需要继承 AbstractAggregationBuffer,用于在mapper或reducer内部传递数据;
//开发步骤
1)继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。

2)UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息(返回个数,类型)。

3)初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每调用一次forward()产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数。

4)最后close()方法调用,对需要清理的方法进行清理。
package com.bigdata.hive.function;

import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.lazy.LazyString;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.Text;

/**
 * @Description(描述): UDTF函数
 * @Version(版本): 1.0
 * @Author(创建者): ALIENWARE
 * @Date(创建时间): Created in 2021/7/2.
 * @ * * * * * * * * * * * * * @
 */
public class SplitUDTF extends GenericUDTF{

    @Override
    public StructObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 校验函数输入参数和 设置函数返回值类型
        // 1) 校验入参个数
        if(arguments.length != 1){
            throw new UDFArgumentException("input params must one");
        }

        ObjectInspector inspector = arguments[0];
        // 2) 校验参数的大类
        // 大类有: PRIMITIVE(基本类型), LIST, MAP, STRUCT, UNION
        if(! inspector.getCategory().equals(ObjectInspector.Category.PRIMITIVE)){
            throw new UDFArgumentException("input params Category must PRIMITIVE");
        }

        // 3) 校验参数的小类
//              VOID, BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, STRING,
//              DATE, TIMESTAMP, BINARY, DECIMAL, VARCHAR, CHAR, INTERVAL_YEAR_MONTH, INTERVAL_DAY_TIME,
//              UNKNOWN
        if(! inspector.getTypeName().equalsIgnoreCase(PrimitiveObjectInspector.PrimitiveCategory.STRING.name())){
            throw new UDFArgumentException("input params PRIMITIVE Category type must STRING");
        }

        // 4) 设置函数返回值类型(struct<name:string, nickname:string>)
          // struct内部字段的名称
          List<String> names = new ArrayList<String>();
          names.add("name");
          names.add("nickname");

          // struct内部字段的名称对应的类型
          List<ObjectInspector> inspectors = new ArrayList<ObjectInspector>();
          inspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
          inspectors.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);

    }

    /**
     * 函数输出类型
     * 第一个参数:name
     * 第二个参数:nickname
     */
    Object[] outputs = new Object[]{new Text(), new Text()};

    @Override
    public void process(Object[] args) throws HiveException {
        // 核心方法 一行调用一次
        System.out.println("process()");
        Object obj = args[0];

        String data = null;
        if(obj instanceof LazyString){
            LazyString lz = (LazyString)obj;
            Text t = lz.getWritableObject();
            data = t.toString();
        }else if(obj instanceof Text){
            Text t = (Text)obj;
            data = t.toString();
        }else{
            data = (String)obj;
        }

        // name1#n1;name2#n2
        String[] arr1 = data.split(";");
        // name1#n1
        for(String data2 : arr1){
            String[] arr2 = data2.split("#");
            String name = arr2[0];
            String nickname = arr2[1];

            // 想输出就调用forward()
            ((Text)outputs[0]).set(name);
            ((Text)outputs[1]).set(nickname);
            System.out.println("forward()");
            forward(outputs);
        }
    }

    @Override
    public void close() throws HiveException {

    }
}

2.创建函数

CREATE TEMPORARY FUNCTION udtf_split AS 'com\bigdata\hive\function\SplitUDTF.java'; 

3.执行函数测试

-- 带有表头字段
set hive.cli.print.header=true;
select udtf_split(name_nickname) from udtf_table;
select udtf_split(name_nickname) as (name1,nickname1) from udtf_table;

4.UDTF的两种使用方法

1 )直接select中使用

-- 可以自定义表头字段名称
select udtf_func(properties) as (col1,col2) from tablename;

-- 用UDTF代码里写的字段名称
select udtf_func(properties) from tablename;

2)UDTF函数不可以使用的场景

(1)不可以添加其他字段使用;

select values, udtf_split(valuse) as (col1,col2) from udtftest;

(2)不可以嵌套调用;

select udtf_split(udtf_split(strsplit)) from udtftest;

(3)不可以和group by/cluster by/distribute by/sort by一起使用;

select udtf_split(strsplit) as (col1,col2) from udtftest group by col1, col2;

5.和lateral view一起使用

通过Lateral view可以方便的将UDTF得到的行转列的结果集,合在一起提供服务;

lateral view用于和UDTF一起使用,为了解决UDTF不允许在select字段的问题;
它能够将一行数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
lateral view首先为原始表的每行调用UDTF,UTDF会把一行拆分成一或者多行;
lateral view再把结果组合,产生一个支持别名表的虚拟表。
-- 首先table_name表每行调用udtf_func,会把一行拆分成一或者多行
-- 再把结果组合,产生一个支持别名表tableAlias的虚拟表
select table_name.id, tableAlias.col1, tableAlias.col2 
from table_name 
lateral view udtf_func(properties) tableAlias as col1,col2;

示例:利用lateral view 查看表的数据

select t1.id, t2.name, t2.nickname
from udtf_table t1
lateral view udtf_split(name_nickname) t2 as name,nickname;

在示例1的基础上,分组查询统计每个id有多少条记录

select t3.id, count(*) as n from
(select t1.id, t2.name, t2.nickname
from udtf_table t1
lateral view udtf_split(name_nickname) t2 as name,nickname) t3 group by t3.id;

HIVE优化

一、合理设置reduce的数量

//参数1:hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,在Hive 0.14.0及更高版        本中默认为256M) 
//参数2:hive.exec.reducers.max(每个任务最大的reduce数,在Hive 0.14.0及更高版本中默认为1009)

二、job并行运行设置

hive.exec.parallel (默认是false, true:开启并行运行)
hive.exec.parallel.thread.number (最多可以并行执行多少个作业, 默认是 8)

三、小文件的问题优化

1.如果小文件多,在map输入时

hive默认是将小文件合并成大文件。
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; (默认)
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;(关闭小文件合并大文件)

2.如果map输出的小文件过多hive 默认是开启map 输出合并

set hive.merge.mapfiles=true (默认是 true)
hive.merge.size.per.task(合并文件的大小,默认 256M)
hive.merge.smallfiles.avgsize(文件的平均大小小于该值时,会启动一个MR任务执行merge,默认16M )

3.如果reduce输出的小文件过多

--hive需要手动设置开启reduce输出合并。
set hive.merge.mapredfiles=true (默认是 false)
hive.merge.size.per.task(合并文件的大小,默认 256M)
hive.merge.smallfiles.avgsize(文件的平均大小小于该值时,会启动一个MR任务执行merge,默认16M )

四、 join 操作优化

1.多表join,如果join字段一样,--只生成一个job 任务; Join 的字段类型要一致。

2.如果你有一张表非常非常小,而另一张关联的表非常非常大的时候,
你可以使用mapjoin此Join 操作在 Map 阶段完成,不再需要Reduce,hive默认开启mapjoin。
-- 将小表刷入内存中,默认是true 
set hive.auto.convert.join=true;
set hive.ignore.mapjoin.hint=true; 
-- 刷入内存表的大小(字节),根据自己的数据集加大
set hive.mapjoin.smalltable.filesize=2500000; 
-- 也可以手动设置  /*+mapjoin(c)+*/

五、hive的数据倾斜优化

1. 数据倾斜的原因

1)key分布不均匀
2)业务数据本身的特性
3)某些SQL语句本身就有数据倾斜
关键词 情形 后果
Join 其中一个表较小,但是key集中 分发到某一个或几个Reduce上的数据远高于平均值
大表与大表,但是分桶的判断字段0值或空值过多 这些空值都由一个reduce处理,非常慢
group by group by 维度过小,某值的数量过多 处理某值的reduce非常耗时
Count Distinct 某特殊值过多 处理此特殊值的reduce耗时

2.解决方案——参数调节

--对于group by 产生倾斜的问题
set hive.map.aggr=true; (默认是 true)
--开启map端combiner,减少reduce 拉取的数据量。  --(设置了负载均衡)
set hive.groupby.skewindata=true; (默认是 false)
--示例
如果开启负载均衡:
执行:
set mapred.reduce.tasks=3;
set hive.groupby.skewindata=true;
select sum(1) as n from 
( select country from panniu.user_install_status group by country) t ;
会有3个MapReduce任务
第一个:select country from user_install_status group by country, 将key放到不同的reduce里
第二个:将第一个任务生成的临时文件,按照key 进行group by,此时,会将数据放到一个reduce里。
第三个:用group by 的中间结果,进行sum。

六、SQL 语句调节

1.非法数据太多,比如null。

1.假如null值没有用处的话,可以将null值先过滤掉,再进行 union

2.把空值的key变成一随机数(随机值类型需要跟key的类型一致),

   把倾斜的数据分到不同的reduce上,由于null值关联不上,处理后并不影响最终结果。

2.count distinct 数据倾斜

原sql:
select count (distinct country) from user_install_status_limit;
优化后的sql:
select count(*) from (select distinct country from user_install_status_limit)t;

2.1多个count distinct 优化

groupingsets
版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海牛部落-123456789987654321,http://hainiubl.com/topics/75739
点赞
成为第一个点赞的人吧 :bowtie:
回复数量: 0
    暂无评论~~
    • 请注意单词拼写,以及中英文排版,参考此页
    • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
    • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
    • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
    • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
    Ctrl+Enter