关于hive调优的笔记


由于面试中经常被问到hiveSQL如何调优,但是仅仅对优化有一个模糊的概念,始终没有对hive SQL调优有具体的总结。所以抽这个节点总结一下如何优化自己的sql。达到事半功倍的效果。


fetch抓取

理论分析:

Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。

在hive-default.xml.template文件中hive.fetch.task.conversion默认是more,老版本hive默认是minimal,该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce

xml配置 效果
set hive.fetch.task.conversion=none; 执行查询语句,会走mapreduce程序
set hive.fetch.task.conversion=more; 执行查询语句,不会走mapreduce程序

本地模式

理论分析

Hive 在集群上查询时,默认是在集群上 N 台机器上运行, 需要多个机器进行协调运行,这 个方式很好地解决了大数据量的查询问题。但是当 Hive 查询处理的数据量比较小时,其实没有必要启动分布式模式去执行,因为以分布式方式执行就涉及到跨网络传输、多节点协调 等,并且消耗资源。这个时间可以只使用本地模式来执行 mapreduce job,只在一台机器上执行,速度会很快。启动本地模式涉及到三个参数:

参数名 默认值 备注
hive.exec.mode.local.aotu false 让hive决定是否在本地模式自动运行
hive.exec.mode.local.aotu.input.files.max 4 不启用本地模式的task最大个数
hive.exec.mode.local.aotu.inputbytes.max 128m 不启用本地模式的最大输入文件大小

set hive.exec.mode.local.auto=true 是打开 hive 自动判断是否启动本地模式的开关,但是只 是打开这个参数并不能保证启动本地模式,要当 map 任务数不超过

hive.exec.mode.local.auto.input.files.max 的个数并且 map 输入文件大小不超过

hive.exec.mode.local.auto.inputbytes.max 所指定的大小时,才能启动本地模式。


Hive的压缩存储

1. 合理利用文件存储格式

创建表时,尽量使用 orc、parquet 这些列式存储格式,因为列式存储的表,每一列的数据在物理上是存储在一起的,Hive查询时会只遍历需要列数据,大大减少处理的数据量

2. 压缩的原因

Hive 最终是转为 MapReduce 程序来执行的,而MapReduce 的性能瓶颈在于网络 IO 和 磁盘 IO,要解决性能瓶颈,最主要的是减少数据量,对数据进行压缩是个好的方式。压缩 虽然是减少了数据量,但是压缩过程要消耗CPU的,但是在Hadoop中, 往往性能瓶颈不在于CPU,CPU压力并不大,所以压缩充分利用了比较空闲的 CPU

3. 常用压缩方法对比

压缩格式 是否可拆分 是否自带 压缩率 速度 是否hadoop自带
gzip 很高 比较快
lzo 比较高 很快 否,要安装
snappy 比较高 很快 否,要安装
bzip2 最高

==各个压缩方式所对应的class类==

压缩格式
Zlib org.apache.hadoop.io.compress.DefaultCodec
Gzip org.apache.hadoop.io.compress.GzipCodec
Bzip2 org.apache.hadoop.io.compress.Bzip2Codec
Lzo org.apache.hadoop.io.compress.lzo.LzoCodec
Lz4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

4. 压缩方式的选择

  • 压缩比率

  • 压缩解压缩速度

  • 是否支持 Split

5. 压缩使用

Job 输出文件按照 block 以 GZip 的方式进行压缩:

1
2
3
4
5
set mapreduce.output.fileoutputformat.compress=true; // 默认值是 false

set mapreduce.output.fileoutputformat.compress.type=BLOCK; // 默认值是 Record

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec; // 默认值是 org.apache.hadoop.io.compress.DefaultCodec

Map 输出结果也以 Gzip 进行压缩:

1
2
3
set mapred.map.output.compress=true

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec // 默认值是 org.apache.hadoop.io.compress.DefaultCodec

对 Hive 输出结果和中间都进行压缩

1
2
3
set hive.exec.compress.output=true // 默认值是 false,不压缩

set hive.exec.compress.intermediate=true // 默认值是 false,为 true 时 MR 设置的压缩才启用


表的优化

1. 小表、大表Join

  • 理论分析

将key相对分散,并且数据量小的表放在join的左边,这样可以有效减少内存溢出错误发生的几率;再进一步,可以使用Group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。

实际测试发现:新版的hive已经对小表JOIN大表和大表JOIN小表进行了优化。小表放在左边和右边已经没有明显区别。

hive.auto.convert.join的默认值是true,即默认开启mapjoin优化

hive.mapjoin.smalltable.filesize 默认值为2500000(25M),通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中

  • 参数调优
    1
    2
    hive.auto.convert.join=false(关闭自动MAPJOIN转换操作)
    hive.ignore.mapjoin.hint=false(不忽略MAPJOIN标记)

2. 大表join大表

  • 空key过滤

我们在工作中时常会遇到两个大表之间的join,但是有的时候join会超时,或者报内存溢出的错误(Out of Memery)造成这样的原因很有可能是key对应的数据太多,而相同的key对应的数据会发送到同一个reduce上,导致内存不够。此时我们应该仔细分析这些异常的key,很多情况下,这些key对应的数据属于异常数据,我们需要在sql中进行过滤。例如key对应的字段为空时,将key为空值的数据先过滤出去再进行join。

  • 空key转换

有时候某个key为空对应的数据很多,但是相应的数据不是异常数据,必须包含在join结果中,此时我们可以将表中的key为空的字段赋一个随机的值,使得数据随机均匀的分到不同的reduce上

hive大表join大表案例


3. Map join

  • 理论分析

如果不指定Mapjoin 或者不满足Mapjoin的条件,那么Hive解析器会将Join操作转化为Common join,而Common join会在Reduce端完成Join,容易发生数据倾斜,而使用MapJoin会将小表写入内存。在Map端进行join,避免reduce处理

  • Mapjoin工作机制

image

首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。

接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。

由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。


4. Group by

默认情况下,Map阶段同一Key数据分发给一个reduce,当一个Key数据过大时就倾斜了,并不是所有的聚合操作都需要在Reduce端完成,很多聚合操作都可以在Map端进行部分聚合,最后在Reduce端得出最终结果。

  • 开启Map端聚合参数设置

    (1) 是否在Map端进行聚合,默认为True

    1
    set hive.map.aggr = true;

    (2) 在Map端进行聚合操作的条数数目

    1
    set hive.groupby.mapeggr.checkinterval = 100000

(3) 有数据倾斜时进行负载均衡,默认为false

1
set hive.groupby.skewindata = true;

当选项设定为 true,生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。


5. Count(distinct)

在进行Count(distinct)操作的时候,数据量小的时候无所谓,数据量大的情况下,由于COUNT DISTINCT操作需要用一个Reduce Task来完成,这一个Reduce需要处理的数据量太大,就会导致整个Job很难完成,一般COUNT DISTINCT使用先GROUP BY再COUNT的方式替换:


6. 动态分区调整

关系型数据库中,对分区表Insert数据时候,数据库自动会根据分区字段的值,将数据插入到相应的分区中,Hive中也提供了类似的机制,即动态分区(Dynamic Partition),只不过,使用Hive的动态分区,需要进行相应的配置。

  • 开启动态分区功能,默认为true(开启)
1
set hive.dynamic.partition = true;
  • 设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区)
1
set hive.exec.dynamic.partition.mode = nonstrict;
  • 在所有执行MR的节点上,最大一共可以创建多少动态分区。

    1
    set hive.exec.dynamic.partitions = 1000;
  • 在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。

    1
    set hive.exec.max.dynamic.partitions.pernode = 100;
  • 整个MR Job中,最大可以创建多少个HDFS文件。

    1
    set hive.exec.max.created.files = 100000;
  • 当有空分区生成时,是否抛出异常。一般不需要设置。

    1
    set hive.error.on.empty.partition=false;

7. 优化 in/exists 语句

hive虽然支持in和exists操作,但还是推荐使用hive的一个高效替代方案:==left semi join==

1
2
3
4
5
6
7

--in/exists
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);

--hive left semi join
select a.id, a.name from a left semi join b on a.id = b.id;

8. 排序选择

cluster by:对同一字段分桶并排序,不能和 sort by 连用

distribute by + sort by:分桶,保证同一字段值只存在一个结果文件当中,结合 sort by 保证 每个 reduceTask 结果有序

sort by:单机排序,单个 reduce 结果有序

order by:全局排序,缺陷是只能使用一个 reduce


9. 合并 MapReduce操作

Multi-group by 是hive的一个非常好的特性,它使得Hive利用中间结果变得非常方便

1
2
3
4
5
6
7
8
/*下列查询语句使用了 multi-group by 特性连续 group by 了 2 次数据,
使用不同的 group by key。 这一特性可以减少一次 MapReduce 操作*/
FROM (SELECT a.status, b.school, b.gender FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school

10. 合理利用分桶:Bucketing 和 Sampling

Bucket 是指将数据以指定列的值为 key 进行 hash,hash 到指定数目的桶中。这样就可以支持高效采样了。如下例就是以 userid 这一列为 bucket 的依据,共设置 32 个 buckets

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
CREATE TABLE PAGE_VIEW(
VIEWTIME INT,
USERID BIGINT,
PAGE_URL STRING,
REFERRER_URL STRING,
IP STRING COMMENT 'IP ADDRESS OF THE USER'
)
COMMENT 'THIS IS THE PAGE VIEW TABLE'
PARTITIONED BY( DT STRING,
COUNTRY STRING
)
CLUSTERED BY(USERID)
SORTED BY(VIEWTIME) INTO 32 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '1'
COLLECTION ITEMS TERMINATED BY '2'
MAP KEYS TERMINATED BY '3'
STORED AS SEQUENCEFILE;

通常情况下,Sampling 在全体数据上进行采样,这样效率自然就低,它要去访问所有数据。 而如果一个表已经对某一列制作了 bucket,就可以采样所有桶中指定序号的某个桶,这就减少了访问量。

如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的全部数据:

1
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);

如下例所示就是采样了 page_view 中 32 个桶中的第三个桶的一半数据:

1
SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);


数据倾斜

1. Map数

  • 通常情况下,作业会通过input的目录产生一个或者多个map任务。主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。

在 MapReduce 的编程案例中,我们得知,一个MR Job的 MapTask 数量是由输入分片 InputSplit 决定的。而输入分片是由 FileInputFormat.getSplit()决定的。一个输入分片对应一个 MapTask, 而输入分片是由三个参数决定的:
image
输入分片大小的计算是这么计算出来的:

1
long splitSize = Math.max(minSize,Math.min(maxSize,blockSize))

默认情况下,输入分片大小和 HDFS 集群默认数据块大小一致,也就是默认一个数据块,启用一个 MapTask 进行处理,这样做的好处是避免了服务器节点之间的数据传输,提高 job 处理效率。

  • 是不是map数越多越好?(Map 数过大)

答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。

  • 是不是保证每个map处理接近128m的文件块,就高枕无忧了?(Map 数过小)

答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。

2. 小文件进行合并

在map执行前合并小文件,减少map数:CombineHiveInputFormat具有对小文件进行合并的功能(系统默认的格式)。HiveInputFormat没有对小文件合并功能

1
2
3
4
5
6
set hive.merge.mapfiles = true                   ##在 map only 的任务结束时合并小文件
set hive.merge.mapredfiles = false ## true 时在 MapReduce 的任务结束时合并小文件
set hive.merge.size.per.task = 256*1000*1000 ##合并文件的大小
set mapred.max.split.size=256000000; ##每个 Map 最大分割大小
set mapred.min.split.size.per.node=1; ##一个节点上 split 的最少值
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; ##执行Map前进行小文件合并

3. 复杂文件增加Map数

当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。

增加map的方法为:根据computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M公式,调整maxSize最大值。让maxSize最大值低于blocksize就可以增加map的个数。

4. Reduce数

Hadoop MapReduce 程序中,reducer 个数的设定极大影响执行效率,这使得 Hive 怎样决定 reducer 个数成为一个关键问题。遗憾的是 Hive 的估计机制很弱,不指定 reducer 个数的情况下,Hive 会猜测确定一个 reducer 个数,基于以下两个设定:

1
2
3
4
5
1、hive.exec.reducers.bytes.per.reducer(默认为 256000000)

2、hive.exec.reducers.max(默认为 1009)

3、mapreduce.job.reduces=-1(设置一个常量 reducetask 数量)

计算 reducer 数的公式很简单: N=min(参数 2,总输入数据量/参数 1) 通常情况下,有必要手动指定 reducer 个数。考虑到 map 阶段的输出数据量通常会比输入有 大幅减少,因此即使不设定 reducer 个数,重设参数 2 还是必要的。

==依据 Hadoop 的经验,可以将参数 2 设定为 0.95*(集群中 datanode 个数)。==

  • 调整reduce个数方法
1
2
3
4
5
6
7
8
--每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256000000

--每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009

--计算reducer数的公式
N=min(参数2,总输入数据量/参数1)
  • 调整reduce个数方法二

在hadoop的mapred-default.xml文件中修改

设置每个job的Reduce个数

1
set mapreduce.job.reduces = 15;

  • reduce个数并不是越多越好

1)过多的启动和初始化reduce也会消耗时间和资源;

2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

5. 并行执行

Hive会将一个查询转化成一个或者多个阶段。这样的阶段可以是MapReduce阶段、抽样阶段、合并阶段、limit阶段。或者Hive执行过程中可能需要的其他阶段。默认情况下,Hive一次只会执行一个阶段。不过,某个特定的job可能包含众多的阶段,而这些阶段可能并非完全互相依赖的,也就是说有些阶段是可以并行执行的,这样可能使得整个job的执行时间缩短。不过,如果有更多的阶段可以并行执行,那么job可能就越快完成。

==通过设置参数hive.exec.parallel值为true,就可以开启并发执行。不过,在共享集群中,需要注意下,如果job中并行阶段增多,那么集群利用率就会增加==

1
2
3
set hive.exec.parallel=true;              //打开任务并行执行

set hive.exec.parallel.thread.number=16; //同一个sql允许最大并行度,默认为8。

当然,得是在系统资源比较空闲的时候才有优势,否则,没资源,并行也起不来。