共计 8553 个字符,预计需要花费 22 分钟才能阅读完成。
过滤模式
过滤 (正则过滤和随机抽样)
应用场景
可以从一个大的数据集中筛选出具有某些特征的小的数据集。
代码实现
在 Mapper 阶段,用正则表达式对值进行过滤。在 Ruducer 阶段,生成 double 类型的随机数,判断是否小于给定的数进行模拟简单的随机取样。
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileUtil; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import java.io.File; | |
import java.util.Random; | |
import java.util.regex.Matcher; | |
import java.util.regex.Pattern; | |
/** | |
* @Author bluesnail95 | |
* @Date 2019/7/20 7:46 | |
* @Description | |
*/ | |
public class GrepMain { | |
public static class GrepMapper extends Mapper<Object, Text, NullWritable,Text> { | |
private String matchGrep = null; | |
public void map(Object key,Text value,Context context) {matchGrep = context.getConfiguration().get("matchGrep"); | |
Pattern pattern = Pattern.compile(matchGrep); | |
Matcher matcher = pattern.matcher(value.toString()); | |
if(matcher.matches()) { | |
try {context.write(NullWritable.get(), value); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
} | |
public static class GrepReducer extends Reducer<NullWritable,Text,NullWritable,Text> {private Random random = new Random(); | |
private Double percentage; | |
public void reduce(NullWritable key,Iterable<Text> values,Context context) {String strPercentage = context.getConfiguration().get("filter_percentage"); | |
percentage = Double.valueOf(strPercentage); | |
for(Text value:values) {double rand = random.nextDouble(); | |
if(rand < percentage) { | |
try {context.write(NullWritable.get(), value); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
} | |
} | |
public static void main(String[] args) { | |
try {Configuration conf = new Configuration(); | |
// 设置正则表达式:匹配全是字母的字符串 | |
conf.set("matchGrep","^[a-zA-Z]+$"); | |
conf.setDouble("filter_percentage",0.5); | |
Job job = Job.getInstance(conf, "Grep"); | |
// 与自己定义的类名保持一致 | |
job.setJarByClass(GrepMain.class); | |
// 与自己定义的 Mapper 类和 Reducer 类保持一致 | |
job.setMapperClass(GrepMapper.class); | |
job.setCombinerClass(GrepReducer.class); | |
job.setReducerClass(GrepReducer.class); | |
// 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(Text.class); | |
// 输入输出路径 | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileUtil.fullyDelete(new File(args[1])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
System.exit(job.waitForCompletion(true)?0:1); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} |
入参出参
运行结果
布隆过滤
应用场景
判断某个记录是否存在于某个预判的集合中,存在失误。
代码实现
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileUtil; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.apache.hadoop.util.bloom.BloomFilter; | |
import org.apache.hadoop.util.bloom.Key; | |
import org.apache.hadoop.util.hash.Hash; | |
import java.io.File; | |
import java.net.URI; | |
/** | |
* @Author bluesnail95 | |
* @Date 2019/7/20 15:35 | |
* @Description | |
*/ | |
public class BloomFilterMain { | |
public static class BloomFilterMapper extends Mapper<Object, Text,Text, NullWritable> {int vectorSize = BloomFilterUtil.getOptimalBloomFilterSize(10,0.1f); | |
int nbHash = BloomFilterUtil.getOptimalK(10,vectorSize); | |
BloomFilter bloomFilter = new BloomFilter(vectorSize, nbHash, Hash.MURMUR_HASH); | |
// 建立预定义集合 | |
protected void setup(Context context) { | |
try {bloomFilter.add(new Key("BeiJing".getBytes())); | |
bloomFilter.add(new Key("ShangHai".getBytes())); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
public void map(Object key,Text value,Context context) {String word = value.toString(); | |
// 判断值是否在预判的集合中 | |
if(bloomFilter.membershipTest(new Key(word.getBytes()))) { | |
try {context.write(value,NullWritable.get()); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
} | |
public static void main(String[] args) { | |
try {Configuration conf = new Configuration(); | |
Job job = Job.getInstance(conf, "Grep"); | |
// 与自己定义的类名保持一致 | |
job.setJarByClass(BloomFilterMain.class); | |
// 与自己定义的 Mapper 类和 Reducer 类保持一致 | |
job.setMapperClass(BloomFilterMapper.class); | |
// 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(NullWritable.class); | |
// 输入输出路径 | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileUtil.fullyDelete(new File(args[1])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
System.exit(job.waitForCompletion(true)?0:1); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} |
出参入参
运行结果
top K 问题
应用场景
查找出最热门的 K 条记录等等。
代码实现
/** | |
* @Author bluesnail95 | |
* @Date 2019/7/20 17:09 | |
* @Description | |
*/ | |
public class Top10Data { | |
private String userId; | |
private Integer reputation; | |
public String getUserId() {return userId;} | |
public void setUserId(String userId) {this.userId = userId;} | |
public Integer getReputation() {return reputation;} | |
public void setReputation(Integer reputation) {this.reputation = reputation;} | |
} |
/** | |
* @Author bluesnail95 | |
* @Date 2019/7/20 17:09 | |
* @Description | |
*/ | |
public class Top10Main { | |
public static class Top10Mapper extends Mapper<Object, Text, NullWritable,Text> {private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>(); | |
public void map(Object key,Text value,Context context) {ObjectMapper objectMapper = new ObjectMapper(); | |
try {Top10Data top10Data = objectMapper.readValue(value.toString(),Top10Data.class); | |
Integer reputation = top10Data.getReputation(); | |
String userId = top10Data.getUserId(); | |
sortedMap.put(reputation,new Text(value)); | |
if(sortedMap.size() > 10) {sortedMap.remove(sortedMap.firstKey()); | |
} | |
} catch (IOException e) {e.printStackTrace(); | |
} | |
} | |
protected void cleanup(Context context) {for(Text t:sortedMap.values()) { | |
try {context.write(NullWritable.get(),t); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
} | |
public static class Top10Reducer extends Reducer<NullWritable,Text,NullWritable,Text> {private TreeMap<Integer,Text> sortedMap = new TreeMap<Integer,Text>(); | |
public void reduce(NullWritable key,Iterable<Text> values,Context context) {for(Text value:values) {System.out.println(value.toString()); | |
ObjectMapper objectMapper = new ObjectMapper(); | |
try {Top10Data top10Data = objectMapper.readValue(value.toString(), Top10Data.class); | |
int reputation = top10Data.getReputation(); | |
String userId = top10Data.getUserId(); | |
sortedMap.put(reputation,new Text(value)); | |
if(sortedMap.size() > 10) {sortedMap.remove(sortedMap.firstKey()); | |
} | |
} catch (IOException e) {e.printStackTrace(); | |
} | |
} | |
for (Text t:sortedMap.values()) { | |
try {context.write(NullWritable.get(), t); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
} | |
public static void main(String[] args) { | |
try {Configuration conf = new Configuration(); | |
Job job = Job.getInstance(conf, "Top 10"); | |
// 与自己定义的类名保持一致 | |
job.setJarByClass(Top10Main.class); | |
// 与自己定义的 Mapper 类和 Reducer 类保持一致 | |
job.setMapperClass(Top10Mapper.class); | |
job.setReducerClass(Top10Reducer.class); | |
// 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(NullWritable.class); | |
job.setOutputValueClass(Text.class); | |
// 输入输出路径 | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileUtil.fullyDelete(new File(args[1])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
System.exit(job.waitForCompletion(true)?0:1); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} |
注意输出的时候是 new Text(value) 重新建立了一个对象。
只找出 K 条记录,不进行全排序。每次在 TreeMap 集合中加入新的元素后,判断 Map 的大小是否大于 K,如果是就移除第一个元素。
入参出参
运行结果
去重
应用场景
集合中存在重复的数据需要剔除,以简化统计统计等。
代码实现
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.FileUtil; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.Text; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Mapper; | |
import org.apache.hadoop.mapreduce.Reducer; | |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import java.io.File; | |
/** | |
* @Author bluesnail95 | |
* @Date 2019/7/20 17:09 | |
* @Description | |
*/ | |
public class DistinctMain { | |
public static class DistinctMapper extends Mapper<Object, Text,Text, NullWritable> {public void map(Object key,Text value,Context context) { | |
try {context.write(value, NullWritable.get()); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
public static class DistinctReducer extends Reducer<Text, NullWritable,Text, NullWritable> {public void reduce(Text key,Iterable<NullWritable> values,Context context) { | |
try {context.write(key,NullWritable.get()); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} | |
public static void main(String[] args) { | |
try {Configuration conf = new Configuration(); | |
Job job = Job.getInstance(conf, "Distinct"); | |
// 与自己定义的类名保持一致 | |
job.setJarByClass(DistinctMapper.class); | |
// 与自己定义的 Mapper 类和 Reducer 类保持一致 | |
job.setMapperClass(DistinctMapper.class); | |
job.setReducerClass(DistinctReducer.class); | |
// 设置的输出键和输出值和 mapper 定义的需要保持一致。job.setOutputKeyClass(Text.class); | |
job.setOutputValueClass(NullWritable.class); | |
// 输入输出路径 | |
FileInputFormat.addInputPath(job, new Path(args[0])); | |
FileUtil.fullyDelete(new File(args[1])); | |
FileOutputFormat.setOutputPath(job, new Path(args[1])); | |
System.exit(job.waitForCompletion(true)?0:1); | |
} catch (Exception e) {e.printStackTrace(); | |
} | |
} | |
} |
出参入参
运行结果
参考资料
《MapReduce 设计模式》
正文完