慎用api

  我们知道大数据场景下不害怕数据量大,害怕的是数据倾斜,怎样避免数据倾斜,找到可能产生数据倾斜的函数尤为关键,数据量较大的情况下,慎用 count(distinct),count(distinct) 容易产生倾斜问题。

设置合理的map reduce的task数量

map阶段优化

1
2
3
4
mapred.min.split.size: -- 指的是数据的最小分割单元大小;min的默认值是1B
mapred.max.split.size: -- 指的是数据的最大分割单元大小;max的默认值是256MB
通过调整max可以起到调整map数的作用,减小max可以增加map数,增大max可以减少map数。
需要提醒的是,直接调整mapred.map.tasks这个参数是没有效果的。

举例:
  a. 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128M的块和1个12M的块),从而产生7个map书;
  b. 假设input目录下有3个文件a,b,c,大小分别为10M,20M,130M,那么hadoop会分隔成4个块(10M,20M,128M,2M),从而产生4个map数;

  注意:如果文件大于块大小(128M),那么会拆分,如果小于块大小,则把该文件当成一个块。
  其实这就涉及到小文件的问题:如果一个任务有很多小文件(远远小于块大小128M),则每个小文件也会当做一个块,用一个map任务来完成。

  而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。那么,是不是保证每个map处理接近128M的文件块,就高枕无忧了?答案也是不一定。比如有一个127M的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。我们该如何去解决呢?

  我们需要采取两种方式来控制map和reduce的数量:

1. 减少map数量
  hive中默认的 hive.input.format 是 org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,表示执行前进行小文件合并,对于 combineHiveInputFormat,它的输入的map数量由三个配置决定:

1
2
3
4
5
6
7
/** map输入合并小文件的相关参数:*/
-- 一个split最大的大小(这个值决定了合并后文件的数量)
mapred.max.split.size
-- 一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
mapred.min.split.size.per.node
-- 一个交换机下split至少的大小(这个值决定了多个交换机上的文件是否需要合并)
mapred.min.split.size.per.rack

主要思路是把输入目录下的大文件分成多个map的输入, 并合并小文件, 做为一个map的输入. 具体的原理是下述三步:
 a. 根据输入目录下的每个文件,如果其长度超过 mapred.max.split.size,以 mapred.max.split.size 为单位分成多个split(一个split是一个map的输入), 此文件剩下的长度如果大于 mapred.min.split.size.per.node,则再切分生成一个split,否则先暂时保留;
 b. 现在剩下的都是一些长度效短的碎片,把每个rack下碎片合并, 只要长度超过 mapred.max.split.size 就合并成一个split,最后如果剩下的碎片比 mapred.min.split.size.per.rack 大,就合并成一个split,,否则暂时保留;
 c. 把不同rack下的碎片合并,只要长度超过 mapred.max.split.size 就合并成一个split,剩下的碎片无论长度,合并成一个split。

mapred.max.split.size=1000;
mapred.min.split.size.per.node=300;
mapred.min.split.size.per.rack=100;

举例:
  输入目录下五个文件,
  rack1下三个文件,长度为2050,1499,10,
  rack2下两个文件,长度为1010,80,另外blockSize为500。
  经过第一步,生成五个split: 1000,1000,1000,499,1000,剩下的碎片为rack1下:50,10;rack2下10:80
  由于两个rack下的碎片和都不超过100,所以经过第二步,split和碎片都没有变化。
  第三步,合并四个碎片成一个split,长度为150。

  如果要减少map数量,可以调大 mapred.max.split.size,否则调小即可。
  其特点是:一个块至多作为一个map的输入,一个文件可能有多个块,一个文件可能因为块多分给做为不同map的输入,一个map可能处理多个块,可能处理多个文件。

2. 增大map数量
  如何适当的增加map数?
  当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。

  假设有这样一个任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
select 
data_desc,
count(1),
count(distinct id),
sum(case when ...),
sum(case when ...),
sum(...)
from a group by data_desc;

-- 如果 表a 只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,
-- 这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。
set mapred.reduce.tasks=10;

create table a_1 as
select * from a
distribute by rand(123);

-- 这样会将 a表 的记录,随机的分散到包含10个文件的 a_1表 中,再用 a_1 代替上面sql中的 a表,则会用10个map任务去完成。
-- 每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。

注意:看上去,貌似这两种有些矛盾,一个是要合并小文件,一个是要把大文件拆成小文件,这点正是重点需要关注的地方,使单个map任务处理合适的数据量;

reduce阶段优化

  Reduce的个数对整个作业的运行性能有很大影响。如果Reduce设置的过大,那么将会产生很多小文件,对NameNode会产生一定的影响,而且整个作业的运行时间未必会减少;如果Reduce设置的过小,那么单个Reduce处理的数据将会加大,很可能会引起OOM异常。
  如果设置了 mapred.reduce.tasks/mapreduce.job.reduces 参数,那么Hive会直接使用它的值作为Reduce的个数;如果 mapred.reduce.tasks/mapreduce.job.reduces 的值没有设置(也就是-1),那么Hive会根据输入文件的大小估算出Reduce的个数。根据输入文件估算Reduce的个数可能未必很准确,因为Reduce的输入是Map的输出,而Map的输出可能会比输入要小,所以最准确的数根据Map的输出估算Reduce的个数。

1. Hive自己如何确定reduce数:
  reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:
  hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)
  hive.exec.reducers.max(每个任务最大的reduce数,默认为999)
  计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)
  即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;

select pt, count(1) from tb where pt = ‘2012-07-04’ group by pt;
其中/data/path/pt=2012-07-04 总大小为9G多,
因此这句有10个reduce

2. 调整reduce个数方法一:
  调整 hive.exec.reducers.bytes.per.reducer 参数的值;
  set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

select pt, count(1) from tb where pt = ‘2012-07-04’ group by pt;
这次有20个reduce

3. 调整reduce个数方法二:
  set mapred.reduce.tasks=15;

select pt,count(1) from tb where pt = ‘2012-07-04’ group by pt;
这次有15个reduce

4. reduce个数并不是越多越好:
  同map一样,启动和初始化reduce也会消耗时间和资源;
  另外,有多少个reduce,就会有个多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

5. 什么情况下只有一个reduce:
  很多时候你会发现任务中不管数据量多大,不管你有没有调整reduce个数的参数,任务中一直都只有一个reduce任务;其实只有一个reduce任务的情况,除了数据量小于 hive.exec.reducers.bytes.per.reducer 参数值的情况外,还有以下原因:
  a. 没有group by的汇总,比如把 select pt,count(1) from tb where pt = ‘2012-07-04’ group by pt; 写成 select count(1) from tb where pt = ‘2012-07-04’; 这点非常常见,希望大家尽量改写。
  b. 用了Order by
  c. 有笛卡尔积。

  注意:在设置reduce个数的时候也需要考虑这两个原则:使大数据量利用合适的reduce数;是单个reduce任务处理合适的数据量;

小文件合并优化

  我们知道文件数目小,容易在文件存储端造成瓶颈,给HDFS带来压力,影响处理效率。对此,可以通过合并Map和Reduce的结果文件来消除这样的影响。
用于设置合并的参数有:

小文件是如何产生的:
 a. 动态分区插入数据,产生大量的小文件,从而导致map数量剧增;
 b. reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);
 c. 数据源本身就包含大量的小文件。

小文件问题的影响:
 a. 从Hive的角度看,小文件会开很多map,一个map开一个JVM去执行,所以这些任务的初始化,启动,执行会浪费大量的资源,严重影响性能。
 b. 在HDFS中,每个小文件对象约占150byte,如果小文件过多会占用大量内存。这样NameNode内存容量严重制约了集群的扩展。

小文件问题的解决方案:
 1. 从小文件产生的途径就可以从源头上控制小文件数量,方法如下:
  a. 使用Sequencefile作为表存储格式,不要用textfile,在一定程度上可以减少小文件;
  b. 减少reduce的数量(可以使用参数进行控制);
  c. 少用动态分区,用时记得按distribute by分区;

 2. 对于已有的小文件,我们可以通过以下几种方案解决:
  a. 使用hadoop archive命令把小文件进行归档;
  b. 重建表,建表时减少reduce数量;
  c. 通过参数进行调节,设置map/reduce端的相关参数,如下:

小文件是如何产生的:
 a. 动态分区插入数据,产生大量的小文件,从而导致map数量剧增;
 b. reduce数量越多,小文件也越多(reduce的个数和输出文件是对应的);
 c. 数据源本身就包含大量的小文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
-- 每个Map最大输入大小(这个值决定了合并后文件的数量)
set mapred.max.split.size=256000000;
-- 一个节点上split的至少的大小(这个值决定了多个DataNode上的文件是否需要合并)
set mapred.min.split.size.per.node=100000000;
-- 一个交换机下split的至少的大小(这个值决定了多个交换机上的文件是否需要合并)
set mapred.min.split.size.per.rack=100000000;
-- 执行Map前进行小文件合并
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

-- 设置map输出和reduce输出进行合并的相关参数:
-- 是否合并Map输出文件:(默认值为true)
hive.merge.mapfiles=true
-- 是否合并Reduce端输出文件:(默认值为false)
hive.merge.mapredfiles=false
-- 合并操作后的单个文件大小:(默认值为256000000)
hive.merge.size.per.task=256*1000*1000
-- 当输出文件的平均大小小于该值时,启动一个独立的 map-reduce 任务进行文件 merge
-- 默认值为16M
-- 这一设定只有当hive.merge.mapfiles或hive.merge.mapredfiles设定为true时,才会对相应的操作有效
hive.merge.smallfiles.avgsize=16000000;


/** 常用参数 */
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
-- 所有MR节点上最大动态分区数
set hive.exec.max.dynamic.partitions.pernode=10000;
-- 每个MR节点上最大动态分区数
set hive.exec.max.dynamic.partitions.partitions=50000;
-- MR Job 过程中最大创建的HDFS文件数量(默认)
set hive.exec.max.created.files=100000;
-- JVM重用
set mapred.job.reuse.jvm.num.tasks=50;
-- 并行执行
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=16; -- 默认8
-- 开启数据传输中的压缩
set hive.exec.compress.intermediate=true;
-- 开启map端输出压缩
set hive.exec.compress.output=true;
-- 合并map端输出
set hive.merge.mapfiles=true;
-- 合并reduce端输出
set hive.merge.mapredfiles=true;

-- 这个是给join优化的 0.6官方版本好像有个bug悲哀啊
hive.optimize.skewjoin=true;
-- 这个是给groupby优化的
hive.groupby.skewindata=true;