Giraph源码分析(七)—— 添加消息统计功能

网友投稿 233 2022-11-26

Giraph源码分析(七)—— 添加消息统计功能

作者|白松

1、 添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。

源代码如下:

package org.apache.giraph.counters; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.mapreduce.Mapper.Context; import com.google.common.collect.Maps; /** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */ public class GiraphMessages extends HadoopCountersBase { /** Counter group name for the giraph Messages */ public static final String GROUP_NAME = "Giraph Messages"; /** Singleton instance for everyone to use */ private static GiraphMessages INSTANCE; /** superstep time in msec */ private final Map superstepMessages; private GiraphMessages(Context context) { super(context, GROUP_NAME); superstepMessages = Maps.newHashMap(); } /** * Instantiate with Hadoop Context. * * @param context * Hadoop Context to use. */ public static void init(Context context) { INSTANCE = new GiraphMessages(context); } /** * Get singleton instance. * * @return singleton GiraphTimers instance. */ public static GiraphMessages getInstance() { return INSTANCE; } /** * Get counter for superstep messages * * @param superstep * @return */ public GiraphHadoopCounter getSuperstepMessages(long superstep) { GiraphHadoopCounter counter = superstepMessages.get(superstep); if (counter == null) { String counterPrefix = "Superstep- " + superstep+" "; counter = getCounter(counterPrefix); superstepMessages.put(superstep, counter); } return counter; } @Override public Iterator iterator() { return superstepMessages.values().iterator(); } }

2、在BspServiceMaster类中添加统计功能。Master在每次同步时候,会聚集每个Worker发送的消息量大小(求和),存储于GlobalStats中。因此只需要在每次同步后,从GlobalStats对象中取出总的通信量大小,然后写入GiraphMessages中。格式为,实际存储于上步GiraphMessages类中定义的Map superstepMessages 对象中。 在BspServiceMaster的构造方法中,最后面追加一行代码,对GiraphMessages进行初始化。

GiraphMessages.init(context);

在BspServiceMaster类的SuperstepState coordinateSuperstep()方法中,添加记录功能。片段代码如下:

…… // If the master is halted or all the vertices voted to halt and there // are no more messages in the system, stop the computation GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); LOG.info("D-globalStats: "+globalStats+"\n\n"); //添加下面语句。从第0个超步起开始记录。 if(getSuperstep() != INPUT_SUPERSTEP) { GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount()); } ……

3、实验结果如下:

完!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:浅谈Java中GuavaCache返回Null的注意事项
下一篇:学大数据的实训项目,符合这三个要求的才是有用的
相关文章

 发表评论

暂时没有评论,来抢沙发吧~