Hive 调优
**Hadoop 框架计算特性**
1. 数据量大不是问题,数据倾斜是个问题
2. jobs 数比较多的作业运行效率相对比较低,比如即使有几百行的表,如果多次关联多次汇总,产生十几个 jobs,耗时很长。原因是 map reduce 作业初始化的时间是比较长的
3. sum、count、max、min 等 UDAF,不怕数据倾斜问题,hadoop 在 map 端的汇总合并优化,这使得数据倾斜不成问题
4. count(distinct userid),在数据量大的情况下,效率较低,如果是多 count(distinct userid,month)效率更低,因为 count(distinct)是按 group by 字段分组,按 distinct 字段排序, 一般这种分布方式是很容易引起倾斜的,比如 PV 数据,淘宝一天 30 亿的 pv,如果按性别分组,分配 2 个 reduce,每个 reduce 期望处理 15 亿数据,但现实必定是男少女多
# 1、压缩和存储调优
> 一般在hadoop集群上运行一个MapReduce会有以下步骤:
>input-> Map-> shuffle -> reduce -> output
>如果我们采用了数据压缩,在map阶段产生的数据大小就会减少,会减少磁盘的IO,同时还能够减少网络的IO。
压缩可以显著减少中间数据量,从而在内部减少了Map和Reduce之间的数据传输量。

- map阶段输出数据压缩 ,在这个阶段,优先选择一个低CPU开销的算法。
```
set hive.exec.compress.intermediate=true
set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec
set mapred.map.output.compression.codec=com.hadoop.compression.lzo.LzoCodec;
```
- 对最终输出结果压缩
```
set hive.exec.compress.output=true
set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec
## 当然,也可以在hive建表时指定表的文件格式和压缩编码
```
**结论:** 一般选择orcfile/parquet + snappy 方式
**详细:** [Hive高阶之数据压缩详解](https://zhuanlan.zhihu.com/p/59767485)
# 2、参数调优
```
// 让可以不走mapreduce任务的,就不走mapreduce任务
hive> set hive.fetch.task.conversion=more;
```
**详细**:[Hive笔记之Fetch Task](https://www.cnblogs.com/cc11001100/p/9434076.html)
```
// 开启任务并行执行
hive> set hive.exec.parallel=true;
// 解释:当一个sql中有多个job(mapreduce job)时候,且这多个job之间没有依赖,则可以让顺序执行变为并行执行(一般为用到union all的时候)
// 同一个sql允许并行任务的最大线程数
hive> set hive.exec.parallel.thread.number=8;
```
```
// 设置JVM重用
// JVM重用对hive的性能具有非常大的影响,特别是对于很难避免小文件的场景或者task特别多的场景(一个Map task或Reduce task),
// 这类场景大多数执行时间都很短。jvm的启动过程可能会造成相当大的开销,尤其是执行的job包含有成千上万个task任务的情况。
hive> set mapred.job.reuse.jvm.num.tasks=10;
```
**详细**:[Hadoop的JVM重用](https://blog.csdn.net/javastart/article/details/76724271)+[任务JVM重用](http://reader.epubee.com/books/mobile/20/20c7350debd2b788284bcea356fa70d7/text00105.html)
```
// 合理设置reduce的数目
// 方法1:调整每个reduce所接受的数据量大小
hive> set hive.exec.reducers.bytes.per.reducer=500000000; (500M)
// 方法2:直接设置reduce数量
hive> set mapred.reduce.tasks = 20
```
```
// map端聚合,降低传给reduce的数据量
hive> set hive.map.aggr=true
```
```
// 开启hive内置的数倾优化机制(负载均衡)
hive> set hive.groupby.skewindata=true
```
[Hive设置map和reduce的个数](https://blog.csdn.net/B11050101/article/details/78754652)
# 3、SQL调优
**谓词下推**
默认生成的执行计划会在可见的位置执行过滤器,但在某些情况下,某些过滤器表达式可以被推到更接近首次看到此特定数据的运算符的位置。
比如下面的查询:
```
select a.*, b.*
from a join b
on (a.col1 = b.col1)
where a.col1 > 15 and b.col2 > 16
```
**如果没有谓词下推**,则在完成JOIN处理之后将执行过滤条件 **(a.col1> 15和b.col2> 16)** 。因此,在这种情况下,JOIN将首先发生,并且可能产生更多的行,然后在进行过滤操作。
**使用谓词下推**,这两个谓词 **(a.col1> 15和b.col2> 16)** 将在JOIN之前被处理,因此它可能会从a和b中过滤掉连接中较早处理的大部分数据行,因此,建议启用谓词下推。
通过将hive.optimize.ppd设置为true可以启用谓词下推。
`SET hive.optimize.ppd=true`
---
**union优化**
尽量不要使用union (union 去掉重复的记录)而是使用 union all 然后再用group by 去重
---
**用Left semi join优化in/exists**
(a表和b表通过user_id关联)
a表数据
`select * from wedw_dw.t_user;`

b表数据
`select * from wedw_dw.t_order;`

left semi join
```
Select *
from wedw_dw.t_user t1
left semi join wedw_dw.t_order t2
on t1.user_id = t2.user_id;
```
如图所示:只能展示a表的字段,因为left semi join 只传递表的 join key 给 map 阶段

>**总结:**
>LEFT SEMI JOIN 是 IN/EXISTS 子查询的一种更高效的实现。
>
>LEFT SEMI JOIN 的限制是, JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERE 子句、SELECT 子句或其他地方都不行。
>
>因为 left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,而 join 则会一直遍历。这就导致右表有重复值得情况下 left semi join 只产生一条,join 会产生多条,也会导致 left semi join 的性能更高。
>
>left semi join 是只传递表的 join key 给 map 阶段,因此left semi join 中最后 select 的结果只许出现左表。因为右表只有 join key 参与关联计算了,而left join on 默认是整个关系模型都参与计算了
---
**大表join小表优化(MapJoin)**
Shuffle 阶段代价非常昂贵,因为它需要排序和合并。减少 Shuffle 和 Reduce 阶段的代价可以提高任务性能。
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数`hive.mapjoin.smalltable.filesize`来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数`hive.auto.convert.join`来控制,默认为true.
假设a表为一张大表,b为小表,并且`hive.auto.convert.join=true`,那么Hive在执行时候会自动转化为MapJoin。
MapJoin简单说就是在Map阶段将小表数据从 HDFS 上读取到内存中的哈希表中,读完后将内存中的哈希表序列化为哈希表文件,在下一阶段,当 MapReduce 任务启动时,会将这个哈希表文件上传到 Hadoop 分布式缓存中,该缓存会将这些文件发送到每个 Mapper 的本地磁盘上。因此,所有 Mapper 都可以将此持久化的哈希表文件加载回内存,并像之前一样进行 Join。顺序扫描大表完成Join。减少昂贵的shuffle操作及reduce操作
MapJoin分为两个阶段:
- 通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会HashTableFiles进行压缩。
- MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务

注意:map端join没有reduce任务,所以map直接输出结果,即有多少个map任务就会产生多少个结果文件。
**详细**:[Map join和Common join详解](https://blog.csdn.net/weixin_39216383/article/details/79043299)
---
**大表join大表优化**
空key过滤
- 有时join超时是因为某些key对应的数据太多,而相同key对应的数据都会发送到相同的reducer上,从而导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据是异常数据,我们需要在SQL语句中进行过滤。例如key对应的字段为空
空key转换
- 有时虽然某个key为空对应的数据很多,但是相应的数据不是异常数据,必须要包含在join的结果中,此时我们可以表a中key为空的字段赋一个随机的值,使得数据随机均匀地分不到不同的reducer上。例如:
```
insert overwrite table test
select n.*
from test1 n
full join test2 o
on case when n.id is null then concat('hive', rand()) else n.id end = o.id;
```
---
**Join 优化**
总体原则:
「**1、 优先过滤后再进行 Join 操作,最大限度的减少参与 join 的数据量 2、 小表 join 大表,最好启动 mapjoin 3、 Join on 的条件相同的话,最好放入同一个 job,并且 join 表的排列顺序从小到大**」
**「在使用写有 Join 操作的查询语句时有一条原则:应该将条目少的表/子查询放在 Join 操作符的左边」。**
原因是在 Join 操作的 Reduce 阶段,位于 Join 操作符左边的表的内容会被加载进内存,将条目少的表放在左边,可以有效减少发生 OOM 错误的几率。对于一条语句中有多个 Join 的情况,如果 Join 的条件相同,比如查询
```
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
```
如果 Join 的 key 相同,不管有多少个表,都会合并为一个 Map-Reduce 任务,而不是”n”个,在做 OUTER JOIN 的时候也是一样。
如果 join 的条件不相同,比如:
```
INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
```
Map-Reduce 的任务数目和 Join 操作的数目是对应的,上述查询和以下查询是等价的
```
--先 page_view 表和 user 表做链接
INSERT OVERWRITE TABLE tmptable
SELECT * FROM page_view p JOIN user u ON (pv.userid = u.userid);
-- 然后结果表 temptable 和 newuser 表做链接
INSERT OVERWRITE TABLE pv_users
SELECT x.pageid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age);
```
在编写 Join 查询语句时,如果确定是由于 join 出现的数据倾斜,那么请做如下设置:
```
// 这个是 join 的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
// 如果是 join 过程出现倾斜应该设置为 true
set hive.optimize.skewjoin=true;
```
[skewjoin参数设置解决Hive由于join产生的数据倾斜问题](https://www.pinggu.com/post/details/5ef4b5dae76c715bf35703e4)
---
**列裁剪**
只读取需要查询的行,列很多或者数据量很大时,select * 效率很低
---
**查看sql的执行计划**
基本语法:EXPLAIN [EXTENDED | DEPENDENCY | AUTHORIZATION] query
查看执行计划:
`explain select * from wedw_tmp.t_sum_over;`

学会查看sql的执行计划,优化业务逻辑 ,减少job的数据量。对调优也非常重要
查看详细执行计划:
`explain extended select * from wedw_tmp.t_sum_over;`

# 4、数据倾斜调优
**表现:** 任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大。
**原因:** 某个reduce的数据输入量远远大于其他reduce数据的输入量
**sql导致的倾斜**
---
**Group By 优化**
1、Map 端部分聚合
并不是所有的聚合操作都需要在 Reduce 端完成,很多聚合操作都可以先在 Map 端进行部分聚合,最后在 Reduce 端得出最终结果。
MapReduce 的 combiner 组件参数包括:
```
set hive.map.aggr = true //是否在 Map 端进行聚合,默认为 True
set hive.groupby.mapaggr.checkinterval = 100000 //在 Map 端进行聚合操作的条目数目
```
2、使用 Group By 有数据倾斜的时候进行负载均衡
`set hive.groupby.skewindata = true`
当 sql 语句使用 groupby 时数据出现倾斜时,如果该变量设置为 true,那么 Hive 会自动进行 负载均衡。「**策略就是把 MR 任务拆分成两个:第一个先做预汇总,第二个再做最终汇总**」。
在 MR 的第一个阶段中,Map 的输出结果集合会缓存到 maptaks 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同 Group By Key 有可能被分发到不同的 Reduce 中, 从而达到负载均衡的目的;第二个阶段 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成 最终的聚合操作。
---
**count distinct优化**
数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:
不要使用count (distinct cloumn) ,使用子查询
`select count(1) from (select id from tablename group by id) tmp;`
虽然会多用一个Job来完成,但在数据量大的情况下,这个绝对是值得的。
又或者有:
**情形**:某特殊值过多
**后果**:处理此特殊值的 reduce 耗时;只有一个 reduce 任务
**解决方式**:count distinct 时,将值为空的情况单独处理,比如可以直接过滤空值的行,在最后结果中加 1。如果还有其他计算,需要进行 group by,可以先将值为空的记录单独处理,再和其他计算结果进行 union。
---
**避免笛卡尔积**
尽量避免笛卡尔积,join的时候不加on条件,或者无效的on条件,Hive只能使用1个reducer来完成笛卡尔积。
**若无法避免时:**
当 Hive 设定为严格模式(hive.mapred.mode=strict)时,不允许在 HQL 语句中出现笛卡尔积, 这实际说明了 Hive 对笛卡尔积支持较弱。因为找不到 Join key,Hive 只能使用 1 个 reducer 来完成笛卡尔积。
当然也可以使用 limit 的办法来减少某个表参与 join 的数据量,但对于需要笛卡尔积语义的 需求来说,经常是一个大表和一个小表的 Join 操作,结果仍然很大(以至于无法用单机处 理),这时 MapJoin才是最好的解决办法。MapJoin,顾名思义,会在 Map 端完成 Join 操作。这需要将 Join 操作的一个或多个表完全读入内存。
PS:MapJoin 在子查询中可能出现未知 BUG。在大表和小表做笛卡尔积时,规避笛卡尔积的 方法是,给 Join 添加一个 Join key。
**「原理很简单:将小表扩充一列 join key,并将小表的条目复制数倍,join key 各不相同;将大表扩充一列 join key 为随机数。」**
精髓就在于复制几倍,最后就有几个 reduce 来做,而且大表的数据是前面小表扩张 key 值 范围里面随机出来的,所以复制了几倍 n,就相当于这个随机范围就有多大 n,那么相应的, 大表的数据就被随机的分为了 n 份。并且最后处理所用的 reduce 数量也是 n,而且也不会 出现数据倾斜。
**例如**: 假设小 表中只有1条数据,大表中1000条数据,给小表添加一列join key,设其值为1,将该数据扩 充四倍,其join key的值分别为2〜5,大表中的join key,使用1〜5之间的随机数,比如有 200个1,200个2,200个3,200个4和200个5,此时,根据join key进行两个表的连接操作时, 就会产生5个reducer,同时将大表的数据随机分成了5份(小表扩展后的倍数),从而解决了 上述问题。但是,该解决方案的的本质仍然与笛卡尔积操作相同,其弊端是会造成数据的冗 余,且实际执行起来比较繁琐、效率低下。
# 5、小文件问题的调优
**合并小文件**
默认情况下一个小文件会启动一个maptask来处理数据,这样的话会启动大量的maptask,但是每个maptask处理的数据量又非常少,task启动和销毁也是非常消耗时间和资源,所以这个时候我们就需要想办法来合并这些小文件
小文件的产生有三个地方,map输入,map输出,reduce输出,小文件过多也会影响hive的分析效率:
设置map输入的小文件合并
```
set mapred.max.split.size=256000000;
//一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
//一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
//一个机架下split的至少的大小(这个值决定了多个机架上的文件是否需要合并)
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
//执行Map前进行小文件合并
```
设置map输出和reduce输出进行合并的相关参数:
```
//设置map端输出进行合并,默认为true
set hive.merge.mapfiles = true
//设置reduce端输出进行合并,默认为false
set hive.merge.mapredfiles = true
//设置合并文件的大小
set hive.merge.size.per.task = 256000000
//当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge。
set hive.merge.smallfiles.avgsize=16000000
```
# 6、合理利用分区分桶
分区是将表的数据在物理上分成不同的文件夹,以便于在查询时可以精准指定所要读取的分区目录,从来降低读取的数据量
分桶是将表数据按指定列的hash散列后分在了不同的文件中,将来查询时,hive可以根据分桶结构,快速定位到一行数据所在的分桶文件,从来提高读取效率
>**分区表**:原来的一个大表存储的时候分成不同的数据目录进行存储。如果说是单分区表,那么在表的目录下就只有一级子目录,如果说是多分区表,那么在表的目录下有多少分区就有多少级子目录。不管是单分区表,还是多分区表,在表的目录下,和非最终分区目录下是不能直接存储数据文件的
>
>**分桶表**:原理和hashpartitioner 一样,将hive中的一张表的数据进行归纳分类的时候,归纳分类规则就是hashpartitioner。(需要指定分桶字段,指定分成多少桶)
**分区表和分桶的区别除了存储的格式不同外,最主要的是作用:**
>**分区表**:细化数据管理,缩小mapreduce程序 需要扫描的数据量。
>
>**分桶表**:提高join查询的效率,在一份数据会被经常用来做连接查询的时候建立分桶,分桶字段就是连接字段;提高采样的效率。
**有了分区为什么还要分桶?**
1. 获得更高的查询处理效率。桶为表加上了额外的结构,Hive在处理有些查询时能利用这个结构。
2. 使取样( sampling)更高效。在处理大规模数据集时,在开发和修改査询的阶段,如果能在数据集的一小部分数据上试运行查询,会带来很多方便。
分桶是相对分区进行更细粒度的划分。分桶将表或者分区的某列值进行hash值进行区分,如要按照name属性分为3个桶,就是对name属性值的hash值对3取摸,按照取模结果对数据分桶。
与分区不同的是,分区依据的不是真实数据表文件中的列,而是我们指定的伪列,但是分桶是依据数据表中真实的列而不是伪列
> 该使用哪些列进行分区呢?一条基本的法则是:选择低基数属性作为“分区键”,比如“地区”或“日期”等。
>
> 一些常见的分区字段可以是:
>
> 日期或者时间
比如year、month、day或者hour,当表中存在时间或者日期字段时,可以使用些字段。
>
> 地理位置
比如国家、省份、城市等
>
> 业务逻辑
比如部门、销售区域、客户等等
# 7、全局排序的优化
Order By 只能是在一个reduce进程中进行,所以如果对一个大数据集进行全局排序,会非常慢
如果是取排序后的N条数据,可以使用 distribute by 和 sort by在多个reduce上进行排序然后取每个分区的前N条数据进行合并后最后再一个reduce中排序取前N条,效率会有很大的提升
**「order by」**:全局排序,缺陷是只能使用一个 reduce
hive中的order by 会对查询结果集执行一个全局排序,这也就是说所有的数据都通过一个reduce进行处理的过程,对于大数据集,这个过程将消耗很大的时间来执行。

**「distribute by」**:
distribute by是控制在map端如何拆分数据给 reduce端的。hive会根据 distribute by后面列,对应 reduce的个数进行分发,默认是采用hash算法。sort by为每个 reduce产生一个排序文件。在有些情况下,你需要控制某个特定行应该到哪个 reducer,这通常是为了进行后续的聚集操作。distribute by刚好可以做这件事。因此, distribute by经常和 sort by配合使用。并且hive规定distribute by 语句要写在sort by语句之前

**「sort by」**:单机排序,单个 reduce 结果有序
hive的sort by 也就是执行一个局部排序过程。这可以保证每个reduce的输出数据都是有序的(但并非全局有效)。这样就可以提高后面进行的全局排序的效率了。对于这两种情况,语法区别仅仅是,一个关键字是order,另一个关键字是sort。用户可以指定任意期望进行排序的字段,并可以在字段后面加上asc关键字(默认)表示升序,desc关键字是降序排序。
在使用sort by之前,需要先设置Reduce的数量>1,才会做局部排序,如果Reduce数量是1,作用与order by一样,全局排序。

**「distribute by + sort by」**:分桶,保证同一字段值只存在一个结果文件当中,结合 sort by 保证 每个 reduceTask 结果有序
**「cluster by」**:对同一字段分桶并排序,不能和 sort by 连用
当distribute by 和 sort by 所指定的字段相同时,即可以使用cluster by。注意:cluster by指定的列只能是升序,不能指定asc和desc。
cluster by 除了distribute by 的功能外,还会对该字段进行排序,所以cluster by = distribute by +sort by 。
eg:`select * from table cluster by year;`
等价于:`select * from table distribute by year sort by year;`
