MapReduce设计模式之概要设计模式

42次阅读

共计 18706 个字符,预计需要花费 47 分钟才能阅读完成。

什么是概要设计模式

概要分析师将相似数据分组到一起并执行类似统计计算、索引生成或简单计数等后续的分析操作。

概要设计模式有哪些分类?

(1) 数值概要 (2) 倒排索引概要 (3) 计数器计数等等。

数值概要

MapReduce 的内容结果输出,如果父文件夹已经存在,会报文件已存在错误,每次重新输出文件,如果都手动删除,会比较麻烦,可以自己写一个删除文件的工具类。

import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 23:31
 * @Description
 */
public class FileUtil {

    /**
     * 删除文件
     * @param fileName 文件名称
     */
    public static void deleteFile(String fileName) {File file = new File(fileName);
        if(!file.exists()) {return;}
        if(file.isFile()) {file.delete();
        }else if(file.isDirectory()) {File[] fileList = file.listFiles();
            for (int i = 0; i < fileList.length; i++) {fileList[i].delete();}
            file.delete();}
    }
}

1. 最大值 / 最小值 / 计数

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 9:57
 * @Description
 */
public class MinMaxCountData implements Writable {

    // 日期
    private Date createDate;
    // 用户标识
    private String userId;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public MinMaxCountData() {}

    public MinMaxCountData(Date createDate, String userId) {
        this.createDate = createDate;
        this.userId = userId;
    }

    public Date getCreateDate() {return createDate;}

    public void setCreateDate(Date createDate) {this.createDate = createDate;}

    public String getUserId() {return userId;}

    public void setUserId(String userId) {this.userId = userId;}

    public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(createDate.getTime());
        dataOutput.writeBytes(userId);
    }

    public void readFields(DataInput dataInput) throws IOException {createDate = new Date(dataInput.readLong());
        userId = dataInput.readLine();}

    @Override
    public String toString() {
        return "MinMaxCountData{" +
                "createDate=" + createDate +
                ", userId='" + userId + '\'' +
                '}';
    }
}
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 9:36
 * @Description
 */
public class MinMaxCountTuple implements Writable {
    // 最小日期
    private Date min = null;
    // 最大日期
    private Date max = null;
    // 计数
    private long count = 0;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public Date getMin() {return min;}

    public void setMin(Date min) {this.min = min;}

    public Date getMax() {return max;}

    public void setMax(Date max) {this.max = max;}

    public long getCount() {return count;}

    public void setCount(long count) {this.count = count;}

    public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(min.getTime());
        dataOutput.writeLong(max.getTime());
        dataOutput.writeLong(count);
    }

    public void readFields(DataInput dataInput) throws IOException {min = new Date(dataInput.readLong());
        max = new Date(dataInput.readLong());
        count  = dataInput.readLong();}

    public String toString() {return frmt.format(min) + "\t" + frmt.format(max) + "\t" + count;
    }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.File;
import java.text.SimpleDateFormat;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 10:02
 * @Description
 */
public class MinMaxCountMain {

    public static class MinMaxCountMapper extends Mapper<Object,Text,Text,MinMaxCountTuple> {private Text userId = new Text();
        private MinMaxCountTuple minMaxCountTuple = new MinMaxCountTuple();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key,Text value,Context context){
            try {ObjectMapper objectMapper = new ObjectMapper();
                objectMapper.setDateFormat(frmt);
                MinMaxCountData minMaxCountData = objectMapper.readValue(value.toString(), MinMaxCountData.class);
                minMaxCountTuple.setCount(1);
                minMaxCountTuple.setMin(minMaxCountData.getCreateDate());
                minMaxCountTuple.setMax(minMaxCountData.getCreateDate());
                userId.set(minMaxCountData.getUserId());
                context.write(userId, minMaxCountTuple);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {private MinMaxCountTuple minMaxCountTuple = new MinMaxCountTuple();

        public void reduce(Text key,Iterable<MinMaxCountTuple> values,Context context) {
            try {
                long sum = 0;
                for (MinMaxCountTuple value : values) {if(minMaxCountTuple.getMin() == null || value.getMin().compareTo(minMaxCountTuple.getMin()) < 0 ) {minMaxCountTuple.setMin(value.getMin());
                    }
                    if(minMaxCountTuple.getMax() == null || value.getMax().compareTo(minMaxCountTuple.getMax()) > 0 ) {minMaxCountTuple.setMax(value.getMax());
                    }
                    sum += value.getCount();}
                minMaxCountTuple.setCount(sum);
                context.write(key, minMaxCountTuple);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {Configuration conf = new Configuration();
        try {Job job = Job.getInstance(conf, "NumericalSummarization:MinMaxCount");
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(MinMaxCountTuple.class);
            job.setJarByClass(MinMaxCountMain.class);
            job.setMapperClass(MinMaxCountMapper.class);
            job.setCombinerClass(MinMaxCountReducer.class);
            job.setReducerClass(MinMaxCountReducer.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            File outputFile = new File(args[1]);
            if(outputFile.exists()){outputFile.delete();
            }
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

2. 平均值

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 21:51
 * @Description
 */
public class CountAverageData implements Writable {

    // 日期
    private Date creationDate;

    // 文本
    private String text;

    private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public CountAverageData() {}

    public CountAverageData(Date creationDate, String text) {
        this.creationDate = creationDate;
        this.text = text;
    }

    public Date getCreationDate() {return creationDate;}

    public void setCreationDate(Date creationDate) {this.creationDate = creationDate;}

    public String getText() {return text;}

    public void setText(String text) {this.text = text;}

    public void write(DataOutput dataOutput) throws IOException {dataOutput.writeBytes(frmt.format(creationDate));
        dataOutput.writeBytes(text);
    }

    public void readFields(DataInput dataInput) throws IOException {
        try {System.out.println(dataInput);
            creationDate = frmt.parse(dataInput.toString());
            text = dataInput.readLine();} catch (ParseException e) {e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return "{" +
                "creationDate=" + creationDate +
                ", text='" + text + '\'' +
                '}';
    }
}
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 21:36
 * @Description
 */
public class CountAverageTuple implements Writable {

    // 计数
    private long count;

    // 平均值
    private float average;

    public long getCount() {return count;}

    public void setCount(long count) {this.count = count;}

    public float getAverage() {return average;}

    public void setAverage(float average) {this.average = average;}

    public void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(count);
        dataOutput.writeFloat(average);
    }

    public void readFields(DataInput dataInput) throws IOException {count = dataInput.readLong();
        average = dataInput.readFloat();}

    @Override
    public String toString() {
        return "{" +
                "count=" + count +
                ", average=" + average +
                '}';
    }
}
import file.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/14 21:40
 * @Description
 */
public class CountAverageMain {

    public static class CountAverageMapper extends Mapper<Object, Text, IntWritable, CountAverageTuple> {private IntWritable outHour = new IntWritable();
        private CountAverageTuple countAverageTuple = new CountAverageTuple();
        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

        public void map(Object key,Text value,Context context) {ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setDateFormat(frmt);
            try {CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class);
                Calendar calendar = Calendar.getInstance();
                Date creationDate = countAverageData.getCreationDate();
                calendar.setTime(creationDate);
                int hour = calendar.get(Calendar.HOUR_OF_DAY);
                outHour.set(hour);
                countAverageTuple.setAverage(countAverageData.getText().length());
                countAverageTuple.setCount(1);
                context.write(outHour, countAverageTuple);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class CountAverageReducer extends Reducer<IntWritable, CountAverageTuple,IntWritable, CountAverageTuple> {private CountAverageTuple result = new CountAverageTuple();

        public void reduce(IntWritable key, Iterable<CountAverageTuple> values,Context context) {
            float sum = 0;
            long count = 0;
            for(CountAverageTuple countAverageTuple : values) {count += countAverageTuple.getCount();
                sum += countAverageTuple.getCount() * countAverageTuple.getAverage();
            }
            result.setAverage(sum / count);
            result.setCount(count);
            try {context.write(key, result);
            } catch (IOException e) {e.printStackTrace();
            } catch (InterruptedException e) {e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) {Configuration configuration = new Configuration();
        try {Job job = Job.getInstance(configuration, "CountAverage");
            job.setJarByClass(CountAverageMain.class);
            job.setMapperClass(CountAverageMapper.class);
            job.setCombinerClass(CountAverageReducer.class);
            job.setReducerClass(CountAverageReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(CountAverageTuple.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)? 0 : 1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

3. 中位数和方差

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @Author bluesnail95
 * @Date 2019/7/16 6:33
 * @Description
 */
public class MedianStdDevTuple implements Writable {

    private float median;

    private float stdDev;

    public float getMedian() {return median;}

    public void setMedian(float median) {this.median = median;}

    public float getStdDev() {return stdDev;}

    public void setStdDev(float stdDev) {this.stdDev = stdDev;}

    public void write(DataOutput dataOutput) throws IOException {dataOutput.writeFloat(median);
        dataOutput.writeFloat(stdDev);
    }

    public void readFields(DataInput dataInput) throws IOException {median = dataInput.readFloat();
        stdDev = dataInput.readFloat();}

    @Override
    public String toString() {
        return "{" +
                "median=" + median +
                ", stdDev=" + stdDev +
                '}';
    }
}
import file.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;

/**
 * @Author bluesnail95
 * @Date 2019/7/16 6:18
 * @Description
 */
public class MedianStdDevMain {private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public static class MedianStdDevMapper extends Mapper<Object, Text,IntWritable, IntWritable> {private IntWritable outhour = new IntWritable();
        private IntWritable outlength = new IntWritable();

        public void map(Object key,Text value,Context context) {ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setDateFormat(frmt);
            try {CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class);
                Date creationDate = countAverageData.getCreationDate();
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(creationDate);
                int hour = calendar.get(Calendar.HOUR_OF_DAY);
                int length = countAverageData.getText().length();
                outhour.set(hour);
                outlength.set(length);
                context.write(outhour, outlength);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class MadianStdDevReducer extends Reducer<IntWritable, IntWritable, IntWritable, MedianStdDevTuple> {private ArrayList<Float> lengths = new ArrayList<Float>();
        private MedianStdDevTuple medianStdDevTuple = new MedianStdDevTuple();

        public void reduce(IntWritable key, Iterable<IntWritable> values, Context context) {
            int sum = 0;
            int count = 0;
            try {for (IntWritable value : values) {sum += value.get();
                    count++;
                    lengths.add((float) value.get());
                }

                // 进行排序
                Collections.sort(lengths);
                // 求中位数
                if(count == 1 || count % 2 == 0) {medianStdDevTuple.setMedian(lengths.get(count/2));
                }else {medianStdDevTuple.setMedian((lengths.get(count / 2 - 1) + lengths.get(count / 2)) / 2.0f);
                }
                // 求平均值
                float mean = sum / count;
                float sumOfSquare = 0.0f;
                // 求标准差
                for(Float value: lengths) {sumOfSquare += (value - mean) * (value - mean);
                }
                if(count == 1) {medianStdDevTuple.setStdDev(0);
                }else{medianStdDevTuple.setStdDev((float)Math.sqrt(sumOfSquare / (count - 1)));
                }

                context.write(key, medianStdDevTuple);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {Configuration configuration = new Configuration();
        try {Job job = Job.getInstance(configuration, "CountAverage");
            job.setJarByClass(MedianStdDevMain.class);
            job.setMapperClass(MedianStdDevMapper.class);
            job.setReducerClass(MadianStdDevReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)? 0 : 1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

升级版

import file.FileUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;


import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;

/**
 * @Author bluesnail95
 * @Date 2019/7/16 21:28
 * @Description
 */
public class MedianStdDevUpgradeMain {private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");

    public static class MedianStdDevUpgradeMapper extends Mapper<Object, Text, IntWritable, SortedMapWritable> {private IntWritable outHour = new IntWritable();
        private LongWritable one = new LongWritable(1);
        private IntWritable lengths = new IntWritable();

        public void map(Object key,Text value,Context context) {ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.setDateFormat(frmt);
            try {CountAverageData countAverageData = objectMapper.readValue(value.toString(), CountAverageData.class);
                Date creationDate = countAverageData.getCreationDate();
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(creationDate);
                outHour.set(calendar.get(Calendar.HOUR_OF_DAY));
                lengths.set(countAverageData.getText().length());
                SortedMapWritable sortedMapWritable = new SortedMapWritable();
                sortedMapWritable.put(lengths,one);
                context.write(outHour, sortedMapWritable);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class MedianStdDevUpgradeCombiner extends Reducer<IntWritable,SortedMapWritable,IntWritable,SortedMapWritable> {protected void reduce(IntWritable key,Iterable<SortedMapWritable> values,Context context) {SortedMapWritable outValue = new SortedMapWritable();

            try {for (SortedMapWritable sortedMapWritable : values) {Set<Map.Entry<WritableComparable,Writable>> set = sortedMapWritable.entrySet();
                    Iterator<Map.Entry<WritableComparable,Writable>> iterator = set.iterator();
                    while(iterator.hasNext()) {Map.Entry<WritableComparable,Writable> entry = iterator.next();

                       LongWritable count = (LongWritable) outValue.get(entry.getKey());
                       if(count != null) {count.set(count.get() + ((LongWritable)entry.getValue()).get());
                            outValue.put(entry.getKey(), count);
                       }else{outValue.put(entry.getKey(),new LongWritable(((LongWritable)entry.getValue()).get()));
                       }

                    }
                }
                context.write(key, outValue);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class MedianStdDevUpgradeReducer extends Reducer<IntWritable,SortedMapWritable,IntWritable,MedianStdDevTuple> {private MedianStdDevTuple medianStdDevTuple = new MedianStdDevTuple();
        private TreeMap<Integer, Long> lengthCounts = new TreeMap<Integer, Long>();

        public void reduce(IntWritable key,Iterable<SortedMapWritable> values,Context context) {
            float sum = 0;
            long total = 0;
           lengthCounts.clear();
            medianStdDevTuple.setStdDev(0);
            medianStdDevTuple.setMedian(0);

            for(SortedMapWritable sortedMapWritable : values) {Set<Map.Entry<WritableComparable,Writable>> set = sortedMapWritable.entrySet();
                Iterator<Map.Entry<WritableComparable,Writable>> iterator =  set.iterator();
               while (iterator.hasNext()) {Map.Entry<WritableComparable,Writable> writableEntry = iterator.next();
                   int length = ((IntWritable)writableEntry.getKey()).get();
                   long count = ((LongWritable)writableEntry.getValue()).get();

                   total += count;
                   sum += count * length;
                   Long sortedCount = lengthCounts.get(length);
                   if(sortedCount == null) {lengthCounts.put(length, count);
                   }else{lengthCounts.put(length, count + sortedCount);
                   }
               }
            }

            long medianIndex = total / 2;
            long previousCount = 0;
            long count = 0;
            long prevKey = 0;
            for(Map.Entry<Integer, Long> entry:lengthCounts.entrySet()) {count = previousCount + entry.getValue();
                if(previousCount <= medianIndex && medianIndex < count) {if(total % 2 == 0 && previousCount == medianIndex) {medianStdDevTuple.setMedian((entry.getKey() + prevKey) / 2.0f);
                    }else{medianStdDevTuple.setMedian(entry.getKey());
                    }
                    break;
                }
                previousCount = count;
                prevKey = entry.getKey();}

            float mean = sum / total;
            float sumOfSquares = 0.0f;
            for(Map.Entry<Integer, Long> entry:lengthCounts.entrySet()) {sumOfSquares += (entry.getKey() - mean) * (entry.getKey() - mean) * entry.getValue();}
            if(total == 1) {medianStdDevTuple.setStdDev(0);
            }else{medianStdDevTuple.setStdDev((float)Math.sqrt((sumOfSquares / (total - 1))));
            }
            try {context.write(key, medianStdDevTuple);
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {Configuration configuration = new Configuration();
        try {Job job = Job.getInstance(configuration, "MedianStdDevUpgrade");
            job.setJarByClass(MedianStdDevUpgradeMain.class);
            job.setMapperClass(MedianStdDevUpgradeMapper.class);
            job.setCombinerClass(MedianStdDevUpgradeCombiner.class);
            job.setReducerClass(MedianStdDevUpgradeReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(SortedMapWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.deleteFile(args[1]);
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)? 0 : 1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

正文完
 0