如何在 join 的 mapper 阶段 不同表来自多个输入时,如何指定对应的输入文件?

问答 羽翔 ⋅ 于 2020-07-29 17:39:53 ⋅ 最后回复由 青牛 2020-07-29 18:55:32 ⋅ 107 阅读

file
上图所示,不同的文件时 通过 两个数据有不同的列来区分的
如果 恰好两个表都有相同的列 ,我们如何区分?

成为第一个点赞的人吧 :bowtie:
回复数量: 4
  • 苏牛
    2020-07-29 18:04:26

    多目录输入配置即可

  • 羽翔
    2020-07-29 18:22:35

    @苏牛 如何进行多目录输入配置?

  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2020-07-29 18:26:22

    @羽翔 解决了吗?

  • 青牛 国内首批大数据从业者,就职于金山,担任大数据团队核心研发工程师
    2020-07-29 18:55:32

    @羽翔 给你两种多目录输入的方法,你参考一下

    第一种:

    /**
     * InnerJoin.java
     * com.hainiu.mapreducer.mr
     * Copyright (c) 2017, 海牛版权所有.
     * @author   青牛
    */
    
    package com.hainiu.mapreducer.mr;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * mr实现类似于sql的innerjoin
     * 
     * @author   青牛
     * @Date     2017年8月28日      
     */
    public class InnerJoin extends Configured implements Tool {
    
        public static final String SIGN1 = "\t";
    
        public static final String SIGN2 = "\001";
    
        public static class InnerJoinMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
    
            private Text outKey = new Text();
    
            private WordWritable outValue = new WordWritable();
    
            /**
             * 可以从这个map任务的输入文件的目录名称来判断是属于那类数据从而进行数据的分类
             * map任务中使用context对象可以获得本次任务的输入地址
             */
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                FileSplit inputSplit = (FileSplit) context.getInputSplit();
                String path = inputSplit.getPath().toString();
                if (path.contains("minout")) {
                    outValue.setType("1");
                } else if (path.contains("maxout")) {
                    outValue.setType("2");
                }
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String str = value.toString();
                String strs[] = str.split(SIGN1);
                if (strs.length != 2) {
                    return;
                }
                String word = strs[0];
                Long num = Long.parseLong(strs[1]);
                outKey.set(word);
                outValue.setN(num);
                outValue.setWord(outKey);
                context.write(outKey, outValue);
            }
    
        }
    
        public static class InnerJoinReducer extends Reducer<Text, WordWritable, Text, Text>{
    
            private Text outValue = new Text();
    
            private List<Long> firstList = new ArrayList<Long>();
    
            private List<Long> secondList = new ArrayList<Long>();
    
            @Override
            protected void reduce(Text key, Iterable<WordWritable> value,Context context) throws IOException, InterruptedException {
                //注意进行缓存的清理,不然下一个key的数据也会被追加到每次key的后面
                firstList.clear();
                secondList.clear();
                for(WordWritable wordWritable:value){
                    if(wordWritable.getType().equals("1")){
                        firstList.add(wordWritable.getN());
                    }else{
                        secondList.add(wordWritable.getN());
                    }
                }
    
                //时行数据的拼接输出
                for(Long max:secondList){
                    for(Long min:firstList){
                        outValue.set(max + SIGN1 + min);
                        context.write(key, outValue);
                    }
                }
            }
    
        }
    
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            //定义Job名字并设置任务配置
            Job job = Job.getInstance(conf, "innerjoin");
            //设置Jar使用的Class
            job.setJarByClass(InnerJoin.class);
            //设置使用的Mapper Class
            job.setMapperClass(InnerJoinMapper.class);
            //设置使用的Reducer Class
            job.setReducerClass(InnerJoinReducer.class);
            //设置mapper任务的输出value类型
            job.setMapOutputValueClass(WordWritable.class);
            //设置任务的输出Key类型
            job.setOutputKeyClass(Text.class);
            //调协任务的输出Value类型
            job.setOutputValueClass(Text.class);
            //设置任务的输入地址,可以设置多个目录为输入,用逗号隔开/tmp/mulitipleoutmaxmin/maxout,/tmp/mulitipleoutmaxmin/minout
            FileInputFormat.addInputPaths(job, args[0]);;
            //设置任务的输出地址,对应的是一个目录
            Path outputDir = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outputDir);
    
            //删除输出目录
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputDir)) {
                fs.delete(outputDir, true);
                System.out.println("out put delete finish");
            }
    
            //等待任务执行完成
            int i = job.waitForCompletion(true) ? 0 : 1;
            return i;
        }
    
        public static void main(String[] args) throws Exception {
            System.exit(ToolRunner.run(new InnerJoin(), args));
        }
    
    }

    第二种:

    /**
     * InnerJoin.java
     * com.hainiu.mapreducer.mr
     * Copyright (c) 2017, 海牛版权所有.
     * @author   青牛
    */
    
    package com.hainiu.mapreducer.mr;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    /**
     * mr实现类似于sql的innerjoin
     * 
     * @author   青牛
     * @Date     2017年8月28日      
     */
    public class InnerJoin2 extends Configured implements Tool {
    
        public static final String SIGN1 = "\t";
    
        public static final String SIGN2 = "\001";
    
        public static class InnerJoinMaxMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
    
            private Text outKey = new Text();
    
            private WordWritable outValue = new WordWritable();
    
            /**
             * 根据任务设置不同的数据输出类型
             */
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                outValue.setType("2");
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String str = value.toString();
                String strs[] = str.split(SIGN1);
                if (strs.length != 2) {
                    return;
                }
                String word = strs[0];
                Long num = Long.parseLong(strs[1]);
                outKey.set(word);
                outValue.setN(num);
                outValue.setWord(outKey);
                context.write(outKey, outValue);
            }
    
        }
    
        public static class InnerJoinMinMapper extends Mapper<LongWritable, Text, Text, WordWritable> {
    
            private Text outKey = new Text();
    
            private WordWritable outValue = new WordWritable();
    
            /**
             * 根据任务设置不同的数据输出类型
             */
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                outValue.setType("1");
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String str = value.toString();
                String strs[] = str.split(SIGN1);
                if (strs.length != 2) {
                    return;
                }
                String word = strs[0];
                Long num = Long.parseLong(strs[1]);
                outKey.set(word);
                outValue.setN(num);
                outValue.setWord(outKey);
                context.write(outKey, outValue);
            }
    
        }
    
        public static class InnerJoinReducer extends Reducer<Text, WordWritable, Text, Text> {
    
            private Text outValue = new Text();
    
            private List<Long> firstList = new ArrayList<Long>();
    
            private List<Long> secondList = new ArrayList<Long>();
    
            @Override
            protected void reduce(Text key, Iterable<WordWritable> value, Context context)
                    throws IOException, InterruptedException {
                //注意进行缓存的清理,不然下一个key的数据也会被追加到每次key的后面
                firstList.clear();
                secondList.clear();
                for (WordWritable wordWritable : value) {
                    if (wordWritable.getType().equals("1")) {
                        firstList.add(wordWritable.getN());
                    } else {
                        secondList.add(wordWritable.getN());
                    }
                }
    
                //时行数据的拼接输出
                for (Long max : secondList) {
                    for (Long min : firstList) {
                        outValue.set(max + SIGN1 + min);
                        context.write(key, outValue);
                    }
                }
            }
    
        }
    
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            //定义Job名字并设置任务配置
            Job job = Job.getInstance(conf, "innerjoin2");
            //设置Jar使用的Class
            job.setJarByClass(InnerJoin2.class);
            //设置使用的Reducer Class
            job.setReducerClass(InnerJoinReducer.class);
            //设置mapper任务的输出value类型
            job.setMapOutputValueClass(WordWritable.class);
            //设置任务的输出Key类型
            job.setOutputKeyClass(Text.class);
            //调协任务的输出Value类型
            job.setOutputValueClass(Text.class);
    
            //设置任务的输入地址,并设置每个地址使用的mapper
            MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, InnerJoinMaxMapper.class);
            MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, InnerJoinMinMapper.class);
    
            //设置任务的输出地址,对应的是一个目录
            Path outputDir = new Path(args[2]);
            FileOutputFormat.setOutputPath(job, outputDir);
    
            //删除输出目录
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(outputDir)) {
                fs.delete(outputDir, true);
                System.out.println("out put delete finish");
            }
    
            //等待任务执行完成
            int i = job.waitForCompletion(true) ? 0 : 1;
            return i;
        }
    
        public static void main(String[] args) throws Exception {
            System.exit(ToolRunner.run(new InnerJoin2(), args));
        }
    
    }
暂无评论~~
  • 请注意单词拼写,以及中英文排版,参考此页
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`, 更多语法请见这里 Markdown 语法
  • 支持表情,可用Emoji的自动补全, 在输入的时候只需要 ":" 就可以自动提示了 :metal: :point_right: 表情列表 :star: :sparkles:
  • 上传图片, 支持拖拽和剪切板黏贴上传, 格式限制 - jpg, png, gif,教程
  • 发布框支持本地存储功能,会在内容变更时保存,「提交」按钮点击时清空
Ctrl+Enter