MapReduce设计模式之过滤模式

76次阅读

共计 8553 个字符,预计需要花费 22 分钟才能阅读完成。

过滤模式

过滤 (正则过滤和随机抽样)

应用场景

可以从一个大的数据集中筛选出具有某些特征的小的数据集。

代码实现

在 Mapper 阶段,用正则表达式对值进行过滤。在 Ruducer 阶段,生成 double 类型的随机数,判断是否小于给定的数进行模拟简单的随机取样。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 7:46
 * @Description
 */
public class GrepMain {

    public static class GrepMapper extends Mapper<Object, Text, NullWritable,Text> {
        private String matchGrep = null;

        public void map(Object key,Text value,Context context) {matchGrep = context.getConfiguration().get("matchGrep");
            Pattern pattern = Pattern.compile(matchGrep);
            Matcher matcher = pattern.matcher(value.toString());
            if(matcher.matches()) {
                try {context.write(NullWritable.get(), value);
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static class GrepReducer extends Reducer<NullWritable,Text,NullWritable,Text> {private Random random = new Random();
        private Double percentage;

        public void reduce(NullWritable key,Iterable<Text> values,Context context) {String strPercentage = context.getConfiguration().get("filter_percentage");
            percentage = Double.valueOf(strPercentage);

            for(Text value:values) {double rand = random.nextDouble();
                if(rand < percentage) {
                    try {context.write(NullWritable.get(), value);
                    } catch (Exception e) {e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            // 设置正则表达式:匹配全是字母的字符串
            conf.set("matchGrep","^[a-zA-Z]+$");
            conf.setDouble("filter_percentage",0.5);
            Job job = Job.getInstance(conf, "Grep");
            // 与自己定义的类名保持一致
            job.setJarByClass(GrepMain.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(GrepMapper.class);
            job.setCombinerClass(GrepReducer.class);
            job.setReducerClass(GrepReducer.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

入参出参

运行结果

布隆过滤

应用场景

判断某个记录是否存在于某个预判的集合中,存在失误。

代码实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

import java.io.File;
import java.net.URI;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 15:35
 * @Description
 */
public class BloomFilterMain {

    public static class BloomFilterMapper extends Mapper<Object, Text,Text, NullWritable> {int vectorSize = BloomFilterUtil.getOptimalBloomFilterSize(10,0.1f);
        int nbHash = BloomFilterUtil.getOptimalK(10,vectorSize);
        BloomFilter bloomFilter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);

        // 建立预定义集合
        protected void setup(Context context) {
            try {bloomFilter.add(new Key("BeiJing".getBytes()));
                bloomFilter.add(new Key("ShangHai".getBytes()));
            } catch (Exception e) {e.printStackTrace();
            }
        }

        public void map(Object key,Text value,Context context) {String word = value.toString();
            // 判断值是否在预判的集合中
            if(bloomFilter.membershipTest(new Key(word.getBytes()))) {
                try {context.write(value,NullWritable.get());
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Grep");
            // 与自己定义的类名保持一致
            job.setJarByClass(BloomFilterMain.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(BloomFilterMapper.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

出参入参

运行结果

top K 问题

应用场景

查找出最热门的 K 条记录等等。

代码实现

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class Top10Data {

    private String userId;

    private Integer reputation;

    public String getUserId() {return userId;}

    public void setUserId(String userId) {this.userId = userId;}

    public Integer getReputation() {return reputation;}

    public void setReputation(Integer reputation) {this.reputation = reputation;}
}
/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class Top10Main {

    public static class Top10Mapper extends Mapper<Object, Text, NullWritable,Text> {private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>();

        public void map(Object key,Text value,Context context) {ObjectMapper objectMapper = new ObjectMapper();
            try {Top10Data top10Data = objectMapper.readValue(value.toString(),Top10Data.class);
                Integer reputation = top10Data.getReputation();
                String userId = top10Data.getUserId();
                sortedMap.put(reputation,new Text(value));
                if(sortedMap.size() > 10) {sortedMap.remove(sortedMap.firstKey());
                }
            } catch (IOException e) {e.printStackTrace();
            }
        }

        protected void cleanup(Context context) {for(Text t:sortedMap.values()) {
                try {context.write(NullWritable.get(),t);
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static class Top10Reducer extends Reducer<NullWritable,Text,NullWritable,Text> {private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>();

        public void reduce(NullWritable key,Iterable<Text> values,Context context) {for(Text value:values) {System.out.println(value.toString());
                ObjectMapper objectMapper = new ObjectMapper();
                try {Top10Data top10Data = objectMapper.readValue(value.toString(), Top10Data.class);
                    int reputation = top10Data.getReputation();
                    String userId = top10Data.getUserId();
                    sortedMap.put(reputation,new Text(value));

                    if(sortedMap.size() > 10) {sortedMap.remove(sortedMap.firstKey());
                    }
                } catch (IOException e) {e.printStackTrace();
                }
            }

            for (Text t:sortedMap.values()) {
                try {context.write(NullWritable.get(), t);
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Top 10");
            // 与自己定义的类名保持一致
            job.setJarByClass(Top10Main.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(Top10Mapper.class);
            job.setReducerClass(Top10Reducer.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

注意输出的时候是 new Text(value) 重新建立了一个对象。

只找出 K 条记录,不进行全排序。每次在 TreeMap 集合中加入新的元素后,判断 Map 的大小是否大于 K,如果是就移除第一个元素。

入参出参

运行结果

去重

应用场景

集合中存在重复的数据需要剔除,以简化统计统计等。

代码实现

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class DistinctMain {

    public static class DistinctMapper extends Mapper<Object, Text,Text, NullWritable> {public void map(Object key,Text value,Context context) {
            try {context.write(value, NullWritable.get());
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class DistinctReducer extends Reducer<Text, NullWritable,Text, NullWritable> {public void reduce(Text key,Iterable<NullWritable> values,Context context) {
            try {context.write(key,NullWritable.get());
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Distinct");
            // 与自己定义的类名保持一致
            job.setJarByClass(DistinctMapper.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(DistinctMapper.class);
            job.setReducerClass(DistinctReducer.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

出参入参

运行结果

参考资料

《MapReduce 设计模式》

正文完
 0

MapReduce设计模式之过滤模式

76次阅读

共计 10618 个字符,预计需要花费 27 分钟才能阅读完成。

过滤

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.File;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 7:46
 * @Description
 */
public class GrepMain {

    public static class GrepMapper extends Mapper<Object, Text, NullWritable,Text> {
        private String matchGrep = null;

        public void map(Object key,Text value,Context context) {matchGrep = context.getConfiguration().get("matchGrep");
            Pattern pattern = Pattern.compile(matchGrep);
            Matcher matcher = pattern.matcher(value.toString());
            if(matcher.matches()) {
                try {context.write(NullWritable.get(), value);
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static class GrepReducer extends Reducer<NullWritable,Text,NullWritable,Text> {private Random random = new Random();
        private Double percentage;

        public void reduce(NullWritable key,Iterable<Text> values,Context context) {String strPercentage = context.getConfiguration().get("filter_percentage");
            percentage = Double.valueOf(strPercentage);

            for(Text value:values) {double rand = random.nextDouble();
                if(rand < percentage) {
                    try {context.write(NullWritable.get(), value);
                    } catch (Exception e) {e.printStackTrace();
                    }
                }
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            conf.set("matchGrep","^[a-zA-Z]+$");
            conf.setDouble("filter_percentage",0.5);
            Job job = Job.getInstance(conf, "Grep");
            // 与自己定义的类名保持一致
            job.setJarByClass(GrepMain.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(GrepMapper.class);
            job.setCombinerClass(GrepReducer.class);
            job.setReducerClass(GrepReducer.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

布隆过滤

/**
 * @Author bluesnail95
 * @Date 2019/7/20 16:19
 * @Description
 */
public class BloomFilterUtil {public static int getOptimalBloomFilterSize(int numMembers,float falsePosRate) {int size = (int) (-numMembers * (float) Math.log(falsePosRate) /Math.pow(Math.log(2) , 2));
        return size;
    }

    public static int getOptimalK(float numMembers,float vectorSize) {return (int)Math.round(vectorSize / numMembers * Math.log(2));
    }

}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

import java.io.*;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 8:25
 * @Description
 */
public class BloomFilterDriver {public static void main(String[] args) {Path inputFile =  new Path(args[0]);
        int numMembers =  Integer.parseInt(args[1]);
        float falsePosRate = Float.parseFloat(args[2]);
        Path bfFile = new Path(args[3]);

        int vectorSize = BloomFilterUtil.getOptimalBloomFilterSize(numMembers,falsePosRate);
        int nbHash = BloomFilterUtil.getOptimalK(numMembers,vectorSize);
        BloomFilter bloomFilter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);

        System.out.println("Training Bloom filter of size" + vectorSize + "with" + nbHash + "hash functions," + numMembers
                      + "approximate number of records, and" + falsePosRate + "false positive rate");

        String line = null;
        int numElements = 0;
        try {FileSystem fs = FileSystem.get(new Configuration());
            for(FileStatus status:fs.listStatus(inputFile)) {BufferedReader reader = new BufferedReader(new InputStreamReader(new FSDataInputStream(fs.open(status.getPath()))));

                System.out.println("Reading" + status.getPath());
                while((line = reader.readLine()) != null) {bloomFilter.add(new Key(line.getBytes()));
                    ++numElements;
                }
            }
            fs.close();
            System.out.println("Trained Bloom filter with" + numElements + "entries");
            System.out.println("Serializing Bloom filter to HDFS at" + bfFile);
            FSDataOutputStream strm = fs.create(bfFile);
            bloomFilter.write(strm);
            strm.flush();
            strm.close();
            System.exit(0);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;
import org.apache.hadoop.util.hash.Hash;

import java.io.File;
import java.net.URI;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 15:35
 * @Description
 */
public class BloomFilterMain {

    public static class BloomFilterMapper extends Mapper<Object, Text,Text, NullWritable> {int vectorSize = BloomFilterUtil.getOptimalBloomFilterSize(10,0.1f);
        int nbHash = BloomFilterUtil.getOptimalK(10,vectorSize);
        BloomFilter bloomFilter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH);

        protected void setup(Context context) {
            try {bloomFilter.add(new Key("BeiJing".getBytes()));
                bloomFilter.add(new Key("ShangHai".getBytes()));
            } catch (Exception e) {e.printStackTrace();
            }
        }

        public void map(Object key,Text value,Context context) {String word = value.toString();
            if(bloomFilter.membershipTest(new Key(word.getBytes()))) {
                try {context.write(value,NullWritable.get());
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Grep");
            job.addCacheFile(new URI(args[2]));
            // 与自己定义的类名保持一致
            job.setJarByClass(BloomFilterMain.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(BloomFilterMapper.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

TOP N

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class Top10Data {

    private String userId;

    private Integer reputation;

    public String getUserId() {return userId;}

    public void setUserId(String userId) {this.userId = userId;}

    public Integer getReputation() {return reputation;}

    public void setReputation(Integer reputation) {this.reputation = reputation;}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.File;
import java.io.IOException;
import java.util.TreeMap;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class Top10Main {

    public static class Top10Mapper extends Mapper<Object, Text, NullWritable,Text> {private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>();

        public void map(Object key,Text value,Context context) {ObjectMapper objectMapper = new ObjectMapper();
            try {Top10Data top10Data = objectMapper.readValue(value.toString(),Top10Data.class);
                Integer reputation = top10Data.getReputation();
                String userId = top10Data.getUserId();
                sortedMap.put(reputation,new Text(value));
                if(sortedMap.size() > 10) {sortedMap.remove(sortedMap.firstKey());
                }
            } catch (IOException e) {e.printStackTrace();
            }
        }

        protected void cleanup(Context context) {for(Text t:sortedMap.values()) {
                try {context.write(NullWritable.get(),t);
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static class Top10Reducer extends Reducer<NullWritable,Text,NullWritable,Text> {private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>();

        public void reduce(NullWritable key,Iterable<Text> values,Context context) {for(Text value:values) {System.out.println(value.toString());
                ObjectMapper objectMapper = new ObjectMapper();
                try {Top10Data top10Data = objectMapper.readValue(value.toString(), Top10Data.class);
                    int reputation = top10Data.getReputation();
                    String userId = top10Data.getUserId();
                    sortedMap.put(reputation,value);

                    if(sortedMap.size() > 10) {sortedMap.remove(sortedMap.firstKey());
                    }
                } catch (IOException e) {e.printStackTrace();
                }
            }

            for (Text t:sortedMap.values()) {
                try {context.write(NullWritable.get(), t);
                } catch (Exception e) {e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Top 10");
            // 与自己定义的类名保持一致
            job.setJarByClass(Top10Main.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(Top10Mapper.class);
            job.setReducerClass(Top10Reducer.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

去重

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;

/**
 * @Author bluesnail95
 * @Date 2019/7/20 17:09
 * @Description
 */
public class DistinctMain {

    public static class DistinctMapper extends Mapper<Object, Text,Text, NullWritable> {public void map(Object key,Text value,Context context) {
            try {context.write(value, NullWritable.get());
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static class DistinctReducer extends Reducer<Text, NullWritable,Text, NullWritable> {public void reduce(Text key,Iterable<NullWritable> values,Context context) {
            try {context.write(key,NullWritable.get());
            } catch (Exception e) {e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        try {Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "Distinct");
            // 与自己定义的类名保持一致
            job.setJarByClass(DistinctMapper.class);
            // 与自己定义的 Mapper 类和 Reducer 类保持一致
            job.setMapperClass(DistinctMapper.class);
            job.setReducerClass(DistinctReducer.class);
            // 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(NullWritable.class);
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileUtil.fullyDelete(new File(args[1]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true)?0:1);
        } catch (Exception e) {e.printStackTrace();
        }
    }
}

正文完
 0