压缩概述

压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在运行MR程序时,I/O操作、网络数据传输、 Shuffle 和 Merge 要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要。

由于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘 I/O 和网络传输非常有帮助。可以在任意 MapReduce 阶段启用压缩。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。

由此压缩的优缺点也较为明显:

  1. 优点:以减少磁盘 IO、减少磁盘存储空间
  2. 缺点:增加 CPU 开销

压缩策略和原则

压缩是提高 Hadoop 运行效率的一种优化策略。

通过对 Mapper、Reducer 运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。

注意:采用压缩技术减少了磁盘IO,但同时增加了CPU运算负担。所以,压缩特性运用得当能提高性能,但运用不当也可能降低性能。

压缩基本原则:

  1. 运算密集型的job,少用压缩
  2. IO密集型的job,多用压缩
  3. 对于 MapReduce,需要分为3个阶段来分析:
    1). Map Input:
    从HDFS读取数据,如果是大文件,压缩可以节省读取过程的IO开销,此过程中使用压缩应需要支持split(如:Bzip2)
    2). Map Out:
    写入到磁盘(使用网络传输),此过程使用压缩可以减少写入过程的IO开销和网络传输
    作为Reduce的输入,使用压缩时速度一定要快而且支持split(如:snappy, lzo),因为reduce阶段需要立即响应
    3). Reduce Out:
    写入磁盘, 此过程使用压缩可以减少写入过程的IO开销和网络传输(如:BZip2, LZO)
    作为下一个job的输入,使用压缩时速度一定要快而且支持split

Hadoop常用的压缩方式

压缩格式 扩展名 可分割性 算法 hadoop自带 native 换成压缩格式后,原来的程序是否需要修改
DEFLATE .deflate deflate 是,直接使用 和文本处理一样,不需要修改
Gzip .gz deflate 是,直接使用 和文本处理一样,不需要修改
bzip2 .bz2 bzip2 是,直接使用 和文本处理一样,不需要修改
LZO .lzo lzo 否,需要安装 需要建索引,还需要指定输入格式
Snappy .snappy snappy 否,需要安装 和文本处理一样,不需要修改

压缩性能对比
image.png

工作中一般后两种使用较多。

压缩方式使用场景及选择

1. Gzip压缩
优点:
  压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理Gzip格式的文件就和直接处理文本一样;大部分Linux系统都自带Gzip命令,使用方便。

缺点:
  不支持Split。

应用场景:
  当每个文件压缩之后在130M以内的(1个块大小内),都可以考虑用Gzip压缩格式。例如:一天或者一个小时的日志压缩成一个gzip文件,运行mapreduce程序的时候通过多个gzip文件达到并发。

2. Bzip2压缩
优点:
  支持Split;具有很高的压缩率,比Gzip压缩率都高;Hadoop本身自带,使用方便。

缺点:
  压缩/解压速度慢。

应用场景:
  适合对速度要求不高,但需要较高的压缩率的时候;或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持Split,而且兼容之前的应用程序的情况。

3. Lzo压缩
优点:
  压缩/解压速度也比较快,合理的压缩率;支持Split,是Hadoop中最流行的压缩格式;可以在Linux系统下安装lzop命令,使用方便。

缺点:
  压缩率比Gzip要低一些;Hadoop本身不支持,需要安装;在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为Lzo格式)。

应用场景:
  一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,Lzo优点越越明显。

4. Snappy压缩
优点:
  高速压缩速度和合理的压缩率。

缺点:
  不支持Split;压缩率比Gzip要低;Hadoop本身不支持,需要安装。

应用场景:
  当MapReduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格式;或者作为一个MapReduce作业的输出和另外一个MapReduce作业的输入。

压缩位置的选择

image.png

压缩参数配置

编码/解码类

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
Gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

Hadoop中压缩参数配置

image.png

Hadoop API应用实例

案例一:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();

// start
// 开启 map 端输出压缩
conf.setBoolean("mapreduce.map.output.compress", true);
// 设置 map 端输出压缩方式
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class,CompressionCodec.class);
// end

Job job = Job.getInstance(conf);

job.setJarByClass(WordCountDriver.class);

job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// start
// 设置 reduce 端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
// end

boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}

案例二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

public class CodecTest {
public static void main(String[] args) throws Exception {
compress("org.apache.hadoop.io.compress.BZip2Codec");
// compress("org.apache.hadoop.io.compress.GzipCodec");
// compress("org.apache.hadoop.io.compress.Lz4Codec");
// compress("org.apache.hadoop.io.compress.SnappyCodec");
// uncompress("text");
// uncompress1("hdfs://master:9000/user/hadoop/text.gz");
}

// 压缩文件
public static void compress(String codecClassName) throws Exception {
Class<?> codecClass = Class.forName(codecClassName);
Configuration conf = new Configuration();

FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
//输入和输出均为hdfs路径
FSDataInputStream in = fs.open(new Path("/test.log"));
FSDataOutputStream outputStream = fs.create(new Path("/test1.bz2"));

System.out.println("compress start !");

// 创建压缩输出流
CompressionOutputStream out = codec.createOutputStream(outputStream);
IOUtils.copyBytes(in, out, conf);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
System.out.println("compress ok !");
}

// 解压缩
public static void uncompress(String fileName) throws Exception {
Class<?> codecClass = Class
.forName("org.apache.hadoop.io.compress.GzipCodec");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
CompressionCodec codec = (CompressionCodec) ReflectionUtils
.newInstance(codecClass, conf);
FSDataInputStream inputStream = fs
.open(new Path("/user/hadoop/text.gz"));
// 把text文件里到数据解压,然后输出到控制台
InputStream in = codec.createInputStream(inputStream);
IOUtils.copyBytes(in, System.out, conf);
IOUtils.closeStream(in);
}

// 使用文件扩展名来推断二来的codec来对文件进行解压缩
public static void uncompress1(String uri) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);

Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.out.println("no codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri,
codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
}
}

}

在Hive中临时设置

1. 指定map out

1
2
3
4
5
6
7
8
9
10
11
1.开启hive中间传输数据压缩功能
set hive.exec.compress.intermediate=true;

2.开启mapreduce中map输出压缩功能
set mapreduce.map.output.compress=true;

3.设置mapreduce中map输出数据的压缩方式
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;

4.执行查询语句
select count(ename) name from emp;

2. 指定reduce out

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
1.开启hive最终输出数据压缩功能
set hive.exec.compress.output=true;

2.开启mapreduce最终输出数据压缩
set mapreduce.output.fileoutputformat.compress=true;

3.设置mapreduce最终数据输出压缩方式
set mapreduce.output.fileoutputformat.compress.codec =org.apache.hadoop.io.compress.SnappyCodec;

4.设置mapreduce最终数据输出压缩为块压缩
set mapreduce.output.fileoutputformat.compress.type=BLOCK;

5.测试一下输出结果是否是压缩文件
insert overwrite local directory
'/opt/module/datas/distribute-result' select * from emp distribute by deptno sort by empno desc