跑完的结果是这样的,不知道哪里出了问题,找了一晚上也没找到
代码:
package org.prac.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
- @Author ALIENWARE
- @Date 2021/2/22 20:35
-
@Version 1.0
*/
public class MapReduce01 extends Configured implements Tool {//定义数据分隔符
private static final String SPRLIT_STR = "/t";//map
//inputformat -->(一行数据的起始 和一行数据的值) 0 aabbc--> a 1 a 1 b 1 b 1 c 1
private static class MyMapper extends Mapper<LongWritable, Text, Text,LongWritable>{// 定义map函数需要用到的变量 private Text outkey = new Text(); //map给reduce的输出key private LongWritable outvalue = new LongWritable(); //map给reduce得value private String[] strs = null; //定义数据 //手动实现map方法 @Override public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { //map中不要定义变量 父类中会一行一行执行,消耗资源 //1.拆分数据 value -->读取到的数据 strs = value.toString().split(SPRLIT_STR); //2.遍历数据 for (String s : strs) { outkey.set(s); outvalue.set(1); context.write(outkey, outvalue); } }
}
//reduce阶段
/**
- a 1 a 1 b 1 b 1 c 1
- a 1 a 1 b 1 b 1 c 1
- map --> reduce 聚合
- 相同的key value -- >list
-
reduce聚合后 累加操作
*/
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>
{
//定义reduce需要用到的环境变量
private LongWritable outval = new LongWritable();
private Long sum = 0L;//map传过来的累加的数据/**
- @param values map传过来reduce聚合后的list
- @param context
- @throws IOException
-
@throws InterruptedException
*/
@Override
public void reduce(Text outkey, Iterable values,
Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {//1. 初始化累加的变量
sum = 0L;
//2. 进行值的累加
for (LongWritable value : values) {
sum += value.get();
}
//3.保存累加值
outval.set(sum);
//4. 输出
context.write(outkey, outval);
}
}
//mapreduce主要执行的任务
@Override
public int run(String[] args)
{
try {
//获取已经加载好的配置的conf
Configuration conf = this.getConf();
//编写本次job
Job job = Job.getInstance(conf);
//job开始进行 固定三部配置//1. 类的配置 主执行类设置,谁有main方法就设置谁 job.setJarByClass(MapReduce01.class); //设置数据的输入格式化类 job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(MyMapper.class);//设置map //如果map和 reduce的输出来星一样.可以用一个输出 //job.setMapOutputKeyClass(Text.class);//map key的输出 固定的 //job.setOutputValueClass(LongWritable.class);//map的 value输出 固定的 job.setReducerClass(MyReducer.class);//设置reduce job.setOutputKeyClass(Text.class);//reduce的key job.setOutputValueClass(LongWritable.class);//reduce的vcalue job.setOutputFormatClass(TextOutputFormat.class);//设置输出 //2. 路径设置 //输入路径 //FileInputFormat.addInputPath(job,new Path(args[0])); //保证输出路径必须没有 Path in = new Path(args[0]); Path out = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if(fs.exists(out)){ fs.delete(out,true ); System.out.println(job.getJobName() + "路径已经被删除了!"); } FileInputFormat.addInputPath(job, in); FileOutputFormat.setOutputPath(job,out); // 3.执行配置 long start = System.currentTimeMillis(); boolean cons = job.waitForCompletion(true); long end = System.currentTimeMillis(); String msg = "job状态" + (cons? "SUCCESS!":"FAILE!"); System.out.println(msg); System.out.println(Math.abs(end-start)/1000+"秒!"); }catch (Exception e){ e.printStackTrace(); } return 0;
}
//运行mapreduce
/**- mapreduce运行流程
- 1.ToolRunner.run 获取tool.getConf() tool接口的configretion
- 2.extends Configured 获取Configuration 对象,加载hadoop配置文件
- 3.ToolRunner.run接管mapreduce执行,进行参数设置
*/
public static void main(String[] args)
{
try {
System.exit(ToolRunner.run(new MapReduce01(), args));
} catch (Exception e) {
e.printStackTrace();
}
}
}
//使用idea编写
//edit configruation 添加路径
//E:\MAPREDUCEFILE\FILE\INPUT\file1 E:\MAPREDUCEFILE\FILE\OUTPUT