Giraph源码分析七-添加消息统计功能

40次阅读

共计 2211 个字符,预计需要花费 6 分钟才能阅读完成。

作者 | 白松

1、添加类,把每个超步发送的消息量大小写入 Hadoop 的 Counter 中。在 org.apache.giraph.counters 包下新建 GiraphMessages 类,来统计消息量。

源代码如下:

package org.apache.giraph.counters;

import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;

/**
 * Hadoop Counters in group "Giraph Messages" for counting every superstep
 * message count.
 */
public class GiraphMessages extends HadoopCountersBase {
    /** Counter group name for the giraph Messages */
    public static final String GROUP_NAME = "Giraph Messages";

    /** Singleton instance for everyone to use */
    private static GiraphMessages INSTANCE;

    /** superstep time in msec */
    private final Map superstepMessages;

    private GiraphMessages(Context context) {super(context, GROUP_NAME);
        superstepMessages = Maps.newHashMap();}

    /**
     * Instantiate with Hadoop Context.
     * 
     * @param context
     *            Hadoop Context to use.
     */
    public static void init(Context context) {INSTANCE = new GiraphMessages(context);
    }

    /**
     * Get singleton instance.
     * 
     * @return singleton GiraphTimers instance.
     */
    public static GiraphMessages getInstance() {return INSTANCE;}

    /**
     * Get counter for superstep messages
     * 
     * @param superstep
     * @return
     */
    public GiraphHadoopCounter getSuperstepMessages(long superstep) {GiraphHadoopCounter counter = superstepMessages.get(superstep);
        if (counter == null) {
            String counterPrefix = "Superstep-" + superstep+" ";
            counter = getCounter(counterPrefix);
            superstepMessages.put(superstep, counter);
        }
        return counter;
    }

    @Override
    public Iterator iterator() {return superstepMessages.values().iterator();}
}

2、在 BspServiceMaster 类中添加统计功能。Master 在每次同步时候,会聚集每个 Worker 发送的消息量大小(求和),存储于 GlobalStats 中。因此只需要在每次同步后,从 GlobalStats 对象中取出总的通信量大小,然后写入 GiraphMessages 中。格式为 <SuperStep-Number,TotalMessagesCount>,实际存储于上步 GiraphMessages 类中定义的 Map<Long, GiraphHadoopCounter> superstepMessages 对象中。在 BspServiceMaster 的构造方法中,最后面追加一行代码,对 GiraphMessages 进行初始化。

GiraphMessages.init(context);

在 BspServiceMaster 类的 SuperstepState coordinateSuperstep() 方法中,添加记录功能。片段代码如下:

……
// If the master is halted or all the vertices voted to halt and there
// are no more messages in the system, stop the computation
GlobalStats globalStats = aggregateWorkerStats(getSuperstep());  

LOG.info("D-globalStats:"+globalStats+"\n\n");
// 添加下面语句。从第 0 个超步起开始记录。if(getSuperstep() != INPUT_SUPERSTEP) {GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount());
}
……

3、实验结果如下:

完!

正文完
 0