2020-08-19 08:49发布
hadoop如何解决小文件?
HDFS存储小文件的弊端:每个文件均按照块存储,每个块的元数据存储在Namenode的内存中,因此HDFS的内存中,因此HDFS存储小文件会非常低效。因为大量小文件会消耗NameNode中的大部分内存。在后期大量的小文件如果不做处理的话,在进行mr运算时会开启大量的mapTask任务,每个小文件会开启独立的mapTask任务,造成资源的浪费。但注意,存储小文件所需要的磁盘容量和数据块的大小无关系。
解决存储小文件的办法之一HDFS存档文件或者HAR文件,是一个高效的文件存档工具,将文件存入HDFS块,在减少NameNode内存的同时,可以对文件进行透明的访问。具体来说,HDFS存档的还是一个一个独立的文件,对NameNode而言却是一个整体,减少了namenode的内存消耗。采用JVM重用Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。具体见:https://blog.csdn.net/javastart/article/details/76724271
采用ConbineTextInputFormat框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
ConbineTextInputFormat的虚拟存储过程和切边过程示意图:自定义InputFormat实现归档小文件类似实现har文件的功能:读入多个小文件将小文件按照 k为文件名 v为文件内容的2进制数的形式读入;采用SequenceFileOutputFormat为输出的OutputFormat,将多个小文件归档为一个文件,读出时采用SequenceFileInputFormat即可。‘’代码示例:
(1)自定义InputFromat package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; // 定义类继承FileInputFormat public class WholeFileInputformat extends FileInputFormat{ @Override protected boolean isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { WholeRecordReader recordReader = new WholeRecordReader(); recordReader.initialize(split, context); return recordReader; } } (2)自定义RecordReader类 package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader{ private Configuration configuration; private FileSplit split; private boolean isProgress= true; private BytesWritable value = new BytesWritable(); private Text k = new Text(); @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.split = (FileSplit)split; configuration = context.getConfiguration(); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (isProgress) { // 1 定义缓存区 byte[] contents = new byte[(int)split.getLength()]; FileSystem fs = null; FSDataInputStream fis = null; try { // 2 获取文件系统 fs = FileSystem.get(configuration); // 3 读取数据 Path path = split.getPath(); fis = fs.open(path); // 4 读取文件内容 IOUtils.readFully(fis, contents, 0, contents.length); // 5 输出文件内容 value.set(contents, 0, contents.length); // 6 获取文件路径及名称 String name = split.getPath().toString(); // 7 设置输出的key值 k.set(name); } catch (Exception e) { }finally { IOUtils.closeStream(fis); } isProgress = false; return true; } return false; } @Override public Text getCurrentKey() throws IOException, InterruptedException { return k; } @Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { } } (3)编写SequenceFileMapper类处理流程 package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class SequenceFileMapper extends Mapper{ @Override protected void map(Text key, BytesWritable value, Context context) throws IOException, InterruptedException { context.write(key, value); } } (4)编写SequenceFileReducer类处理流程 package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SequenceFileReducer extends Reducer { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { context.write(key, values.iterator().next()); } } (5)编写SequenceFileDriver类处理流程 package com.atguigu.mapreduce.inputformat; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; public class SequenceFileDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 输入输出路径需要根据自己电脑上实际的输入输出路径设置 args = new String[] { "e:/input/inputinputformat", "e:/output1" }; // 1 获取job对象 Configuration conf = new Configuration(); Job job = Job.getInstance(conf); // 2 设置jar包存储位置、关联自定义的mapper和reducer job.setJarByClass(SequenceFileDriver.class); job.setMapperClass(SequenceFileMapper.class); job.setReducerClass(SequenceFileReducer.class); // 7设置输入的inputFormat job.setInputFormatClass(WholeFileInputformat.class); // 8设置输出的outputFormat job.setOutputFormatClass(SequenceFileOutputFormat.class); // 3 设置map输出端的kv类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(BytesWritable.class); // 4 设置最终输出端的kv类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(BytesWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 6 提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
原文链接:https://blog.csdn.net/sghuu/article/details/101555816?ops_request_misc={"request_id":"159780014719724846436302","scm":"20140713.130102334.."}&request_id=159780014719724846436302&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-101555816.first_rank_ecpm_v3_pc_rank_v2&utm_term=hadoop对于小文件的处理方式&spm=1018.2118.3001.4187
最多设置5个标签!
HDFS存储小文件的弊端:
每个文件均按照块存储,每个块的元数据存储在Namenode的内存中,因此HDFS的内存中,因此HDFS存储小文件会非常低效。因为大量小文件会消耗NameNode中的大部分内存。在后期大量的小文件如果不做处理的话,在进行mr运算时会开启大量的mapTask任务,每个小文件会开启独立的mapTask任务,造成资源的浪费。
但注意,存储小文件所需要的磁盘容量和数据块的大小无关系。
解决存储小文件的办法之一
HDFS存档文件或者HAR文件,是一个高效的文件存档工具,将文件存入HDFS块,在减少NameNode内存的同时,可以对文件进行透明的访问。具体来说,HDFS存档的还是一个一个独立的文件,对NameNode而言却是一个整体,减少了namenode的内存消耗。
采用JVM重用
Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
具体见:https://blog.csdn.net/javastart/article/details/76724271
采用ConbineTextInputFormat
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。
ConbineTextInputFormat的虚拟存储过程和切边过程示意图:
自定义InputFormat实现归档小文件类似实现har文件的功能:
读入多个小文件
将小文件按照 k为文件名 v为文件内容的2进制数的形式读入;采用SequenceFileOutputFormat为输出的OutputFormat,将多个小文件归档为一个文件,读出时采用SequenceFileInputFormat即可。‘’
代码示例:
原文链接:https://blog.csdn.net/sghuu/article/details/101555816?ops_request_misc={"request_id":"159780014719724846436302","scm":"20140713.130102334.."}&request_id=159780014719724846436302&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-101555816.first_rank_ecpm_v3_pc_rank_v2&utm_term=hadoop对于小文件的处理方式&spm=1018.2118.3001.4187
一周热门 更多>