实体类
public class Week2Data implements WritableComparable<Week2Data>{ private String day; private Integer balance; public Week2Data() { } public Week2Data(String day,Integer balance) { this.balance = balance; this.day = day; } public String getDay() { return day; } public void setDay(String day) { this.day = day; } public Integer getBalance() { return balance; } public void setBalance(Integer balance) { this.balance = balance; } @Override public int compareTo(Week2Data o) { return o.balance.compareTo(this.balance); } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(day); dataOutput.writeInt(balance); } @Override public void readFields(DataInput dataInput) throws IOException { this.day=dataInput.readUTF(); this.balance=dataInput.readInt(); } }
主要
public class Week2 { static { System.setProperty("hadoop.home.dir","D:\\hadoop\\hadoop-2.8.3"); } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //创建作业 Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "week2"); //输入文件 FileInputFormat.addInputPath(job,new Path(args[0])); //mapper计算 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Week2Data.class); //reduce job.setNumReduceTasks(2); job.setReducerClass(MyReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //输出文件 FileOutputFormat.setOutputPath(job,new Path(args[1])); //提交 boolean b = job.waitForCompletion(true); System.out.println(b?1:0); } public static class MyMapper extends Mapper<LongWritable,Text,Text, Week2Data>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String s = value.toString(); String[] split = s.split("\\s"); String year = split[0].substring(0, 4); context.write(new Text(year),new Week2Data(split[0], Integer.parseInt(split[1])-Integer.parseInt(split[2]))); } } public static class MyReduce extends Reducer<Text,Week2Data,Text,Text>{ @Override protected void reduce(Text key, Iterable<Week2Data> values, Context context) throws IOException, InterruptedException { for (Week2Data value:values){ System.out.println(value); context.write(new Text(value.getDay()+" "+value.getBalance()),new Text("")); } } } }