乐趣区

在MaxCompute中利用bitmap进行数据处理

很多数据开发者使用 bitmap 技术对用户数据进行编码和压缩,然后利用 bitmap 的与 / 或 / 非的极速处理速度,实现类似用户画像标签的人群筛选、运营分析的 7 日活跃等分析。
本文给出了一个使用 MaxCompute MapReduce 开发一个对不同日期活跃用户 ID 进行 bitmap 编码和计算的样例。供感兴趣的用户进一步了解、分析,并应用在自己的场景下。

import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.MapperBase;
import com.aliyun.odps.mapred.ReducerBase;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.Iterator;

public class bitmapDemo2
{

    public static class BitMapper extends MapperBase {

        Record key;
        Record value;
        @Override
        public void setup(TaskContext context) throws IOException {key = context.createMapOutputKeyRecord();
            value = context.createMapOutputValueRecord();}

        @Override
        public void map(long recordNum, Record record, TaskContext context)
                throws IOException
        {RoaringBitmap mrb=new RoaringBitmap();
            long AID=0;
            {
                {
                    {
                        {AID=record.getBigint("id");
                            mrb.add((int) AID);
                            // 获取 key
                            key.set(new Object[] {record.getString("active_date")});

                        }
                    }
                }
            }
            ByteBuffer outbb = ByteBuffer.allocate(mrb.serializedSizeInBytes());
            mrb.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            value.set(new Object[] {serializedstring});
            context.write(key, value);
        }
    }

    public static class BitReducer extends ReducerBase {
        private Record result = null;

        public void setup(TaskContext context) throws IOException {result = context.createOutputRecord();
        }

        public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException {
            long fcount = 0;
            RoaringBitmap rbm=new RoaringBitmap();
            while (values.hasNext())
            {Record val = values.next();
                ByteBuffer newbb = ByteBuffer.wrap(Base64.getDecoder().decode((String)val.get(0)));
                ImmutableRoaringBitmap irb = new ImmutableRoaringBitmap(newbb);
                RoaringBitmap p= new RoaringBitmap(irb);
                rbm.or(p);
            }
            ByteBuffer outbb = ByteBuffer.allocate(rbm.serializedSizeInBytes());
            rbm.serialize(new DataOutputStream(new OutputStream(){
                ByteBuffer mBB;
                OutputStream init(ByteBuffer mbb) {mBB=mbb; return this;}
                public void close() {}
                public void flush() {}
                public void write(int b) {mBB.put((byte) b);}
                public void write(byte[] b) {mBB.put(b);}
                public void write(byte[] b, int off, int l) {mBB.put(b,off,l);}
            }.init(outbb)));
            String serializedstring = Base64.getEncoder().encodeToString(outbb.array());
            result.set(0, key.get(0));
            result.set(1, serializedstring);
            context.write(result);
        }
    }
    public static void main(String[] args ) throws OdpsException
    {System.out.println("begin.........");
        JobConf job = new JobConf();
        
        job.setMapperClass(BitMapper.class);
        job.setReducerClass(BitReducer.class);

        job.setMapOutputKeySchema(SchemaUtils.fromString("active_date:string"));
        job.setMapOutputValueSchema(SchemaUtils.fromString("id:string"));

        InputUtils.addTable(TableInfo.builder().tableName("bitmap_source").cols(new String[] {"id","active_date"}).build(), job);
//        +------------+-------------+
//        | id         | active_date |
//        +------------+-------------+
//        | 1          | 20190729    |
//        | 2          | 20190729    |
//        | 3          | 20190730    |
//        | 4          | 20190801    |
//        | 5          | 20190801    |
//        +------------+-------------+
        OutputUtils.addTable(TableInfo.builder().tableName("bitmap_target").build(), job);
//        +-------------+------------+
//        | active_date | bit_map    |
//        +-------------+------------+
//        20190729,OjAAAAEAAAAAAAEAEAAAAAEAAgA=3D
//        20190730,OjAAAAEAAAAAAAAAEAAAAAMA
//        20190801,OjAAAAEAAAAAAAEAEAAAAAQABQA=3D

        JobClient.runJob(job);
    }
}

对 Java 应用打包后,上传到 MaxCompute 项目中,即可在 MaxCompute 中调用该 MR 作业,对输入表的数据按日期作为 key 进行用户 id 的编码,同时按照相同日期对 bitmap 后的用户 id 取 OR 操作(根据需要可以取 AND,例如存留场景),并将处理后的数据写入目标结构表当中供后续处理使用。


本文作者:圣远

阅读原文

本文为云栖社区原创内容,未经允许不得转载。

退出移动版