如何在 HDFS 上进行简单的批量解压操作

分享 leeston9 ⋅ 于 2020-07-17 22:00:50 ⋅ 最后回复由 leeston9 2020-07-17 23:30:38 ⋅ 5092 阅读

上次在做老苏的ETL项目的时候,为了方便,为了操作方便,老苏提供都是.gz文件和.tar.gz文件(至于这些怎么用xargs提取的,这里就不再详细说明) 大家都知道hdfs的命令并没有解压操作,如果我想在hdfs上解压400多个文件,该如何操作呢?此时我们就可以用一个非常牛逼的类叫做CompressionCodecFactory,里面提供的api可以帮我们实现

1.首先这个api只能对单个文件进行解压,并不能对一个文件夹下的所有文件操作,而且解压后的文件文件名和路径名也要给全才可以,因此我们需要先将这些文件追加到本地文件中:
file
file
lee2文件内容如下:
file
用sz命令导出lee2 文件到windows 中 再用notpad++ 操作一波:
去掉第一行数据,并将所有分隔符换成\t格式,
file
为了简化mr操作,我们把\也换成\t 分割,得到的文件是这样的
file
2.编写一个Mapreduce程序,将这个文件中的最后一个元素输出:

public class MR_LOG_CUT extends Configured implements Tool {
    //-Dmapreduce.job.reduces=4
    @Override
    public int run(String[] args) throws Exception {
        // 创建hadoop配置文件的加载对象
        Configuration myConf = this.getConf();

        Job job = Job.getInstance(myConf, "etl_cut");
        job.setJarByClass(MR_LOG_CUT.class);
        //数据片 -- 输入格式化 -- M ---  R ---输出格式化
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MyMapper.class);
        //reduce设置
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置输出格式化 文本
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置I/O路径
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        //输出路径必须不存在
        FileSystem fs = FileSystem.get(myConf);
        if (fs.exists(out)) {
            //true 表示 hadoop fs -rm -r -skipTrash
            fs.delete(out, true);
            System.out.println(job.getJobName() + "'s outputDir has been deleted ");
        }
        FileInputFormat.addInputPath(job, in);
        FileOutputFormat.setOutputPath(job, out);
        long start = System.currentTimeMillis();
        //运行代码
        boolean con = job.waitForCompletion(true);
        long end = System.currentTimeMillis();
        System.out.println(con ? "JOB_STATUS : SUCCESS" : "JOB_STATUS : FILED");
        //运行时间
        System.out.println("JOB_COST : " + (end - start) / 1000 + " SECONDS");
        return 0;
    }

    private static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        private Text outKey = new Text();  //创建 key 对象 text类型
        private NullWritable outval = NullWritable.get();
        private String[] strs = null;   //存储切割的字符串数组

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            strs = value.toString().split("\t");    //不需要key值,key值这是行号,无用
            //最后一个元素就是我们需要的
            outKey.set(strs[strs.length - 1]);
            context.write(outKey, outval);
        }
    }

    //reduce的输入是map的输出,输出根据实际情况定
    private static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        //聚合输出,因为没重复的key所以不会聚合咯
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(key, NullWritable.get());
        }
    }

    public static void main(String[] args) throws Exception {
        System.exit(ToolRunner.run(new MR_LOG_CUT(), args));
    }
}

输出后的文件是这样的,然后我们新建一个解压类,放在resource目录下进行加压操作:
file
为什么要这么做呢,因为该方法只能一个文件一个文件地解压,所以我们需要一个一个遍历文件全路径名来读取每一个.gz/.tar.tar.gz文件,解压完成后我们还需要根据文件名来拼每一个文件的输出路径,所以先写一个根据文件名生成输出路径的工具类如图所示:
file

  1. 进行解压逻辑操作,主要思路是:先依次读取我们的压缩文件名文件 part-r-00000 ,再创建url 根据CompressionCodecFactory 来判断文件的压缩格式,进行解压,设置我们的解压后文件名,用IOUtils完成解压后输出,具体代码如下:

import com.lee.utils.FileOutputUtil;
import jline.internal.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

public class LeeFileDecompressor {

    //实现批量解压数据
    public static void main(String[] args) {
        LeeFileDecompressor fd = new LeeFileDecompressor();
        // 将文件加载到内存
        InputStream in = null;
        String filePath;
        BufferedReader reader = null;
        try {
            //加载资源
            in = LeeFileDecompressor.class.getClassLoader().getResourceAsStream("part-r-00000");
            // 因为是字符流 所以用 BufferedReader 进行解析
            reader = new BufferedReader(new InputStreamReader(in, "UTF-8"));
            String line;
            while ((line = reader.readLine()) != null) {
                // 获取到数据
                if (!"".equals(line.trim())) {
                    //构建文件路径,args[0]是输出路径名,需带 ‘/’
                    filePath = (args[0] + line).trim();
                    fd.UnzipOne(filePath);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != reader) {
                    reader.close();
                }
                if (null != in) {
                    in.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void UnzipOne(String fileDir) {
        InputStream in = null;
        OutputStream out = null;
        String outputUri ;
        FileSystem fs;
        try {
            Configuration conf = new Configuration();
            // 输入路径,args[0] 给出
            String uri = fileDir;
            System.out.println(uri);
            // 得到输入文件系统
            fs = FileSystem.get(URI.create(uri), conf);
            // 得到输入路径的Path格式
            Path inputPath = new Path(uri);
            // System.out.println("输入路径是:" + inputPath);
            CompressionCodecFactory factory = new CompressionCodecFactory(conf);
            // 检测输入路径是否存在压缩类型
            CompressionCodec codec = factory.getCodec(inputPath);
            if (codec == null) {
                System.err.println("未发现压缩格式哦: " + uri);
                System.exit(1);
            }
            //注意,这里写的getOutputFile() 是获得在input的同级目录下新建一个output目录存放解压文件
            if (uri.endsWith(".tar.gz")) {
                //如果是 .tar.gz结尾 则直接去掉
                outputUri =
                        CompressionCodecFactory.removeSuffix(FileOutputUtil.getOutputFile(uri), ".tar.gz");
            } else{
                outputUri =
                        //否则就是.gz结尾的直接去除即可
                        CompressionCodecFactory.removeSuffix(FileOutputUtil.getOutputFile(uri), ".gz");
            }
            // 创建输入流
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            //输出到input同级目录下的output文件中
            IOUtils.copyBytes(in, out, conf);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
    }
}
  1. 指明mainClass, 打jar包,直接运行,需要给出文件的输出路径,需要带上\:
  2. file

file

查看运行的控制台输出:

file

去hdfs查看我们的输出文件,.tar.gz和 .gz 结尾的文件全部去掉了后缀,

file

至此,大功告成!

file

总结: 因为解压过程不需要跑mapreduce,所以速度比较快,如有疑问,请联系我

版权声明:原创作品,允许转载,转载时务必以超链接的形式表明出处和作者信息。否则将追究法律责任。来自海汼部落-leeston9,http://hainiubl.com/topics/75214
本帖已被设为精华帖!
本帖由 青牛 于 3年前 加精
回复数量: 6
  • leeston9 上海工程技术大学自动化专业2019级毕业生
    2020-07-17 22:14:20

    控制台运行:

    file

  • leeston9 上海工程技术大学自动化专业2019级毕业生
    2020-07-17 22:16:01

    自动生成output文件目录并输出文件同时去除输出文件名的.tar.gz和.gz后缀:

    file

    file

  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2020-07-17 23:09:44

    @leeston9

    file

    这一步不用写mr,可以使用notepad++列编辑功能直接提取最后一列,不是啥都要写代码,咋快咋来。

    打赏10元,理由:爱思考有创新。

  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2020-07-17 23:12:47

    @leeston9 我帮你整理了一下帖子格式,markdown语法之间用换行分割就好了。

  • leeston9 上海工程技术大学自动化专业2019级毕业生
    2020-07-17 23:29:22

    @青牛 嗯嗯 多谢老师!

  • leeston9 上海工程技术大学自动化专业2019级毕业生
    2020-07-17 23:30:38

    @青牛 我之前没用过,你说了我才知道有这功能,谢老师:blush:

暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter