输出数据文件 AvgTemperature.txt:
DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM1020160602,00,霾,1984,130,9,390,348,30020160802,01,霾,1163,81,8,393,368,30220160706,02,霾,1079,108,17,360,394,30620160706,03,霾,1116,79,6,339,387,30320160502,04,霾,1198,98,16,357,325,307 20160602,05,霾,1762,126,9,324,316,30120160408,06,霾,1996,131,3,349,344,30120160604,07,霾,1952,119,26,347,300,30920160105,08,霾,1410,81,8,350,395,30720160104,09,霾,1718,130,4,352,335,30820160501,10,霾,1714,119,27,310,336,30720160601,11,霾,1660,130,23,311,364,30220160606,12,霾,1598,96,12,369,346,30920160602,13,霾,1673,127,2,343,346,30320160706,14,霾,1578,122,8,360,323,30720160707,15,霾,1237,118,12,384,384,30120160205,16,霾,1231,78,9,361,357,30220160605,17,霾,1166,86,30,350,388,30720160506,18,霾,1426,94,2,378,372,30520160805,19,霾,1874,144,20,376,327,30220160405,20,霾,1778,94,22,360,335,30420160104,21,霾,1055,64,22,376,361,30520160304,22,霾,1349,78,15,367,384,30820160203,23,霾,2004,110,2,359,371,30420160603,24,霾,1375,115,19,308,301,30820160402,25,霾,1201,69,5,387,342,30520160707,26,霾,1272,112,23,348,333,30720160702,27,霾,1738,60,12,393,300,30320160301,28,霾,1752,107,12,364,331,30120160704,29,霾,1442,65,9,332,369,308第一题:编写月平均气温统计程序
import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;public class AvgTemperature { public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> { private IntWritable intValue = new IntWritable(); private Text dateKey = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] items = value.toString().split(","); String date = items[0]; String tmp = items[5]; if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行阐明以及未取到数据的行 dateKey.set(date.substring(0, 6)); intValue.set(Integer.parseInt(tmp)); context.write(dateKey, intValue); } } } public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int tmp_sum = 0; int count = 0; for(IntWritable val : values){ tmp_sum += val.get(); count++; } int tmp_avg = tmp_sum/count; result.set(tmp_avg); context.write(key, result); } } public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Job job = new Job(conf, "AvgTemperature"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(AvgTemperature.class); job.setMapperClass(StatMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(StatReducer.class); job.setPartitionerClass(HashPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); TextOutputFormat.setOutputPath(job, new Path(args[1])); TextInputFormat.setInputPaths(job, args[0]); job.setNumReduceTasks(Integer.parseInt(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
...