案例使用MapReduce实现join操作

32次阅读

共计 8046 个字符,预计需要花费 21 分钟才能阅读完成。

哈喽~各位小伙伴们中秋快乐,好久没更新新的文章啦,今天分享如何使用 mapreduce 进行 join 操作。

在离线计算中,我们常常不只是会对单一一个文件进行操作,进行需要进行两个或多个文件关联出更多数据,类似与 sql 中的 join 操作。
今天就跟大家分享一下如何在 MapReduce 中实现 join 操作

需求


现有两张,一张是产品信息表,一张是订单表。订单表中只表存了产品 ID,如果想要查出订单以及产品的相关信息就必须使用关联。

实现


根据 MapReduce 特性,大家都知道在 reduce 端,相同 key 的 key,value 对会被放到同一个 reduce 方法中(不设置 partition 的话)。利用这个特点我们可以轻松实现 join 操作,请看下面示例。

产品表

ID brand model
p0001 苹果 iphone11 pro max
p0002 华为 p30
p0003 小米 mate10

订单表

id name address produceID num
00001 kris 深圳市福田区 p0001 1
00002 pony 深圳市南山区 p0001 2
00003 jack 深圳市坂田区 p0001 3

假如数据量巨大,两表的数据是以文件的形式存储在 HDFS 中,需要用 mapreduce 程序来实现一下 SQL 查询运算:

select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID

MapReduce 实现思路

通过将关联的条件 (prodcueID) 作为 map 输出的 key,将两表满足 join 条件的数据并携带数据所来源的文件信息,发往同一个
reduce task,在 reduce 中进行数据的串联

实现方式一 -reduce 端 join

定义一个 Bean

public class RJoinInfo implements Writable{
    private String customerName="";
    private String customerAddr="";
    private String orderID="";
    private int orderNum;
    private String productID="";
    private String productBrand="";
    private String productModel="";
//    0 是产品,1 是订单
    private int flag;
    
    setter/getter

编写 Mapper

public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> {private static Logger logger = LogManager.getLogger(RJoinMapper.class);
    private RJoinInfo rJoinInfo = new RJoinInfo();
    private Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//        输入方式支持很多中包括数据库等等。这里用的是文件,因此可以直接强转为文件切片
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
//        获取文件名称
        String name = fileSplit.getPath().getName();
        logger.info("splitPathName:"+name);

        String line = value.toString();
        String[] split = line.split("\t");


        String productID = "";

            if(name.contains("product")){productID = split[0];
                String setProductBrand = split[1];
                String productModel = split[2];

                rJoinInfo.setProductID(productID);
                rJoinInfo.setProductBrand(setProductBrand);
                rJoinInfo.setProductModel(productModel);
                rJoinInfo.setFlag(0);
            }else if(name.contains("orders")){String orderID = split[0];
                String customerName = split[1];
                String cutsomerAddr = split[2];
                productID = split[3];
                String orderNum = split[4];

                rJoinInfo.setProductID(productID);
                rJoinInfo.setCustomerName(customerName);
                rJoinInfo.setCustomerAddr(cutsomerAddr);
                rJoinInfo.setOrderID(orderID);
                rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
                rJoinInfo.setFlag(1);
            }

        k.set(productID);
        context.write(k,rJoinInfo);
    }
}

代码解释,这里根据 split 的文件名,判断是 products 还是 orders,
然后根据是 product 还是 orders 获取不同的数据,最用都以 productID 为 Key 发送给 Reduce 端

编写 Reducer

public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> {private static Logger logger = LogManager.getLogger(RJoinReducer.class);
    @Override
    protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException {List<RJoinInfo> orders = new ArrayList<>();

        String productID = key.toString();
        logger.info("productID:"+productID);
        RJoinInfo rJoinInfo = new RJoinInfo();

        for (RJoinInfo value : values) {int flag = value.getFlag();
            if (flag == 0) {
//                产品
                try {BeanUtils.copyProperties(rJoinInfo,value);
                } catch (IllegalAccessException e) {logger.error(e.getMessage());
                } catch (InvocationTargetException e) {logger.error(e.getMessage());
                }
            }else {
//                订单
                RJoinInfo orderInfo = new RJoinInfo();
                try {BeanUtils.copyProperties(orderInfo,value);
                } catch (IllegalAccessException e) {logger.error(e.getMessage());
                } catch (InvocationTargetException e) {logger.error(e.getMessage());
                }
                orders.add(orderInfo);
            }
        }

        for (RJoinInfo order : orders) {rJoinInfo.setOrderNum(order.getOrderNum());
            rJoinInfo.setOrderID(order.getOrderID());
            rJoinInfo.setCustomerName(order.getCustomerName());
            rJoinInfo.setCustomerAddr(order.getCustomerAddr());

//          只输出 key 即可,value 可以使用 nullwritable
            context.write(rJoinInfo,NullWritable.get());
        }
    }
}

代码解释: 根据 productID 会分为不同的组发到 reduce 端,reduce 端拿到后一组数据后,其中有一个产品对象和多个订单对象。
遍历每一个对象,根据 flag 区分产品和订单。保存产品对象,获取每个订单对象到一个集合中。当我们对每个对象都分好
类后,遍历订单集合将订单和产品信息集合,然后输出。

注意: 我们这里效率虽然不是最高的,主要是想说明 join 的思路。

编写 Driver

public class RJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();
//        conf.set("mapreduce.framework.name","yarn");
//        conf.set("yarn.resourcemanager.hostname","server1");
//        conf.set("fs.defaultFS","hdfs://server1:9000");
        conf.set("mapreduce.framework.name","local");
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);

//       如果是本地运行,可以不用设置 jar 包的路径,因为不用拷贝 jar 到其他地方
        job.setJarByClass(RJoinDriver.class);
//        job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar");

        job.setMapperClass(RJoinMapper.class);
        job.setReducerClass(RJoinReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(RJoinInfo.class);
        job.setOutputKeyClass(RJoinInfo.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
        FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output"));

        boolean waitForCompletion = job.waitForCompletion(true);
        System.out.println(waitForCompletion);
    }
}

== 上面实现的这种方式有个缺点,就是 join 操作是在 reduce 阶段完成的,reduce 端的处理压力太大,map 节点的运算负载则很低,资源利用率不高,且在 reduce 阶段极易产生数据倾斜 ==

实现方式二 -map 端 join


这种方式适用于关联表中有小表的情形:
可以将小表分发到所有的 map 节点,这样,map 节点就可以在本地对自己所读到的大表数据进行 join 操作并输出结果,可以大大提高 join 操作的并发度,加快处理速度。

编写 Mapper

在 Mapper 端我们一次性加载数据或者用 Distributedbache 将文件拷贝到每一个运行的 maptask 的节点上加载

这里我们使用第二种,在 mapper 类中定义好小表进行 join

static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{private static Map<String, RJoinInfo> productMap = new HashMap<>();

//      在循环调用 map 方法之前会先调用 setup 方法。因此我们可以在 setup 方法中,先对文件进行处理
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {

            // 通过这几句代码可以获取到 cache file 的本地绝对路径,测试验证用
            URI[] cacheFiles = context.getCacheFiles();
            System.out.println(Arrays.toString(new URI[]{cacheFiles[0]}));

//          直接指定名字,默认在工作文件夹的目录下查找 1⃣
            try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){

                String line;
                while ((line = bufferedReader.readLine())!=null){String[] split = line.split("\t");
                    String productID = split[0];
                    String setProductBrand = split[1];
                    String productModel = split[2];

                    RJoinInfo rJoinInfo = new RJoinInfo();
                    rJoinInfo.setProductID(productID);
                    rJoinInfo.setProductBrand(setProductBrand);
                    rJoinInfo.setProductModel(productModel);
                    rJoinInfo.setFlag(0);
                    productMap.put(productID, rJoinInfo);
                }
            }

            super.setup(context);
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {FileSplit fileSplit = (FileSplit)context.getInputSplit();

            String name = fileSplit.getPath().getName();

            if (name.contains("orders")) {String line = value.toString();

                String[] split = line.split("\t");
                String orderID = split[0];
                String customerName = split[1];
                String cutsomerAddr = split[2];
                String productID = split[3];
                String orderNum = split[4];

                RJoinInfo rJoinInfo = productMap.get(productID);
                rJoinInfo.setProductID(productID);
                rJoinInfo.setCustomerName(customerName);
                rJoinInfo.setCustomerAddr(cutsomerAddr);
                rJoinInfo.setOrderID(orderID);
                rJoinInfo.setOrderNum(Integer.parseInt(orderNum));
                rJoinInfo.setFlag(1);

                context.write(rJoinInfo, NullWritable.get());
            }
        }
    }

代码解释: 这里我们又重写了一个 setup()方法,这个方法会在执行 map()方法前先执行,因此我们可以在这个方法中事先加载好数据。
在上述代码中,我们直接指定名字就拿到了 product.txt 文件,这个究竟这个文件是怎么复制在 maptask 的节点上的呢,还要看下面的 driver

编写 Driver

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {Configuration conf = new Configuration();
        conf.set("mapreduce.framework.name","local");
        conf.set("fs.defaultFS","file:///");

        Job job = Job.getInstance(conf);
        job.setJarByClass(RJoinDemoInMapDriver.class);

        job.setMapperClass(RjoinMapper.class);
        job.setOutputKeyClass(RJoinInfo.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input"));
        FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2"));

//        指定需要缓存一个文件到所有的 maptask 运行节点工作目录
//        job.addFileToClassPath(); 将普通文件缓存到 task 运行节点的 classpath 下
//        job.addArchiveToClassPath(); 缓存 jar 包到 task 运行节点的 classpath 下
//        job.addCacheArchive(); 缓存压缩包文件到 task 运行节点的工作目录
//        job.addCacheFile(); 将普通文件 1⃣
        job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt"));

//      设置 reduce 的数量为 0
        job.setNumReduceTasks(0);


        boolean waitForCompletion = job.waitForCompletion(true);
        System.out.println(waitForCompletion);

    }

代码解释: 上述 Driver 中,我们通过 job.addCacheFile()指定了一个 URI 本地地址,运行时 mapreduce 就会将这个文件拷贝到 maptask 的运行工作目录中。

好啦~本期分享代码量偏多,主要是想分享如何使用 mapreduce 进行 join 操作的思路。下一篇我会再讲一下 计算共同好友的思路以及代码~

        公众号搜索: 喜讯 XiCent    获取更多福利资源~~~~

正文完
 0