hadoop如何解决小文件?

2020-08-19 08:49发布

hadoop如何解决小文件?

hadoop如何解决小文件?

5条回答
我是大脸猫
2020-08-19 09:24

HDFS存储小文件的弊端:
每个文件均按照块存储,每个块的元数据存储在Namenode的内存中,因此HDFS的内存中,因此HDFS存储小文件会非常低效。因为大量小文件会消耗NameNode中的大部分内存。在后期大量的小文件如果不做处理的话,在进行mr运算时会开启大量的mapTask任务,每个小文件会开启独立的mapTask任务,造成资源的浪费。
但注意,存储小文件所需要的磁盘容量和数据块的大小无关系。

解决存储小文件的办法之一
HDFS存档文件或者HAR文件,是一个高效的文件存档工具,将文件存入HDFS块,在减少NameNode内存的同时,可以对文件进行透明的访问。具体来说,HDFS存档的还是一个一个独立的文件,对NameNode而言却是一个整体,减少了namenode的内存消耗。在这里插入图片描述
采用JVM重用
Hadoop的默认配置通常是使用派生JVM来执行map和Reduce任务的。这时JVM的启动过程可能会造成相当大的开销,尤其是执行的job包含有成百上千task任务的情况。JVM重用可以使得JVM实例在同一个job中重新使用N次。N的值可以在Hadoop的mapred-site.xml文件中进行配置。通常在10-20之间,具体多少需要根据具体业务场景测试得出。
具体见:https://blog.csdn.net/javastart/article/details/76724271

采用ConbineTextInputFormat
框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
CombineTextInputFormat用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片中,这样,多个小文件就可以交给一个MapTask处理。

ConbineTextInputFormat的虚拟存储过程和切边过程示意图:
在这里插入图片描述
自定义InputFormat实现归档小文件类似实现har文件的功能:
读入多个小文件
将小文件按照 k为文件名 v为文件内容的2进制数的形式读入;采用SequenceFileOutputFormat为输出的OutputFormat,将多个小文件归档为一个文件,读出时采用SequenceFileInputFormat即可。‘’
代码示例:

(1)自定义InputFromat
package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

// 定义类继承FileInputFormat
public class WholeFileInputformat extends FileInputFormat{
 
 @Override
 protected boolean isSplitable(JobContext context, Path filename) {
  return false;
 }

 @Override
 public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  
  WholeRecordReader recordReader = new WholeRecordReader();
  recordReader.initialize(split, context);
  
  return recordReader;
 }
}
(2)自定义RecordReader类
package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class WholeRecordReader extends RecordReader{

 private Configuration configuration;
 private FileSplit split;
 
 private boolean isProgress= true;
 private BytesWritable value = new BytesWritable();
 private Text k = new Text();

 @Override
 public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  
  this.split = (FileSplit)split;
  configuration = context.getConfiguration();
 }

 @Override
 public boolean nextKeyValue() throws IOException, InterruptedException {
  
  if (isProgress) {

   // 1 定义缓存区
   byte[] contents = new byte[(int)split.getLength()];
   
   FileSystem fs = null;
   FSDataInputStream fis = null;
   
   try {
    // 2 获取文件系统
    fs = FileSystem.get(configuration);
    
    // 3 读取数据
Path path = split.getPath();
    fis = fs.open(path);
    
    // 4 读取文件内容
    IOUtils.readFully(fis, contents, 0, contents.length);
    
    // 5 输出文件内容
    value.set(contents, 0, contents.length);

// 6 获取文件路径及名称
String name = split.getPath().toString();

// 7 设置输出的key值
k.set(name);

   } catch (Exception e) {
    
   }finally {
    IOUtils.closeStream(fis);
   }
   
   isProgress = false;
   
   return true;
  }
  
  return false;
 }

 @Override
 public Text getCurrentKey() throws IOException, InterruptedException {
  return k;
 }

 @Override
 public BytesWritable getCurrentValue() throws IOException, InterruptedException {
  return value;
 }

 @Override
 public float getProgress() throws IOException, InterruptedException {
  return 0;
 }

 @Override
 public void close() throws IOException {
 }
}
(3)编写SequenceFileMapper类处理流程
package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class SequenceFileMapper extends Mapper{
 
 @Override
 protected void map(Text key, BytesWritable value,   Context context)  throws IOException, InterruptedException {

  context.write(key, value);
 }
}
(4)编写SequenceFileReducer类处理流程
package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SequenceFileReducer extends Reducer {

 @Override
 protected void reduce(Text key, Iterable values, Context context)  throws IOException, InterruptedException {

  context.write(key, values.iterator().next());
 }
}
(5)编写SequenceFileDriver类处理流程
package com.atguigu.mapreduce.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

public class SequenceFileDriver {

 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  
       // 输入输出路径需要根据自己电脑上实际的输入输出路径设置
  args = new String[] { "e:/input/inputinputformat", "e:/output1" };

       // 1 获取job对象
  Configuration conf = new Configuration();
  Job job = Job.getInstance(conf);

       // 2 设置jar包存储位置、关联自定义的mapper和reducer
  job.setJarByClass(SequenceFileDriver.class);
  job.setMapperClass(SequenceFileMapper.class);
  job.setReducerClass(SequenceFileReducer.class);

       // 7设置输入的inputFormat
  job.setInputFormatClass(WholeFileInputformat.class);

       // 8设置输出的outputFormat
  job.setOutputFormatClass(SequenceFileOutputFormat.class);
       
// 3 设置map输出端的kv类型
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(BytesWritable.class);
  
       // 4 设置最终输出端的kv类型
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(BytesWritable.class);

       // 5 设置输入输出路径
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));

       // 6 提交job
  boolean result = job.waitForCompletion(true);
  System.exit(result ? 0 : 1);
 }
}


原文链接:https://blog.csdn.net/sghuu/article/details/101555816?ops_request_misc={"request_id":"159780014719724846436302","scm":"20140713.130102334.."}&request_id=159780014719724846436302&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-2-101555816.first_rank_ecpm_v3_pc_rank_v2&utm_term=hadoop对于小文件的处理方式&spm=1018.2118.3001.4187

一周热门 更多>