java怎么拦截某个对象
233
2022-11-26
Giraph源码分析(七)—— 添加消息统计功能
作者|白松
1、 添加类,把每个超步发送的消息量大小写入Hadoop的Counter中。在org.apache.giraph.counters包下新建GiraphMessages类,来统计消息量。
源代码如下:
package org.apache.giraph.counters; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.mapreduce.Mapper.Context; import com.google.common.collect.Maps; /** * Hadoop Counters in group "Giraph Messages" for counting every superstep * message count. */ public class GiraphMessages extends HadoopCountersBase { /** Counter group name for the giraph Messages */ public static final String GROUP_NAME = "Giraph Messages"; /** Singleton instance for everyone to use */ private static GiraphMessages INSTANCE; /** superstep time in msec */ private final Map superstepMessages; private GiraphMessages(Context context) { super(context, GROUP_NAME); superstepMessages = Maps.newHashMap(); } /** * Instantiate with Hadoop Context. * * @param context * Hadoop Context to use. */ public static void init(Context context) { INSTANCE = new GiraphMessages(context); } /** * Get singleton instance. * * @return singleton GiraphTimers instance. */ public static GiraphMessages getInstance() { return INSTANCE; } /** * Get counter for superstep messages * * @param superstep * @return */ public GiraphHadoopCounter getSuperstepMessages(long superstep) { GiraphHadoopCounter counter = superstepMessages.get(superstep); if (counter == null) { String counterPrefix = "Superstep- " + superstep+" "; counter = getCounter(counterPrefix); superstepMessages.put(superstep, counter); } return counter; } @Override public Iterator iterator() { return superstepMessages.values().iterator(); } }
2、在BspServiceMaster类中添加统计功能。Master在每次同步时候,会聚集每个Worker发送的消息量大小(求和),存储于GlobalStats中。因此只需要在每次同步后,从GlobalStats对象中取出总的通信量大小,然后写入GiraphMessages中。格式为
GiraphMessages.init(context);
在BspServiceMaster类的SuperstepState coordinateSuperstep()方法中,添加记录功能。片段代码如下:
…… // If the master is halted or all the vertices voted to halt and there // are no more messages in the system, stop the computation GlobalStats globalStats = aggregateWorkerStats(getSuperstep()); LOG.info("D-globalStats: "+globalStats+"\n\n"); //添加下面语句。从第0个超步起开始记录。 if(getSuperstep() != INPUT_SUPERSTEP) { GiraphMessages.getInstance().getSuperstepMessages(getSuperstep()).increment(globalStats.getMessageCount()); } ……
3、实验结果如下:
完!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~