接着 Hadoop之MapReduce分区 这一篇的内容。
介绍计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计。计数器还可辅助诊断系统故障。如果需要将日志信息传输到 map 或 reduce 任务, 更好的方法通常是看能否用一个计数器值来记录某一特定事件的发生。对于大型分布式作业而言,使用计数器更为方便。除了因为获取计数器值比输出日志更方便,还有根据计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
hadoop内置计数器列表
MapReduce任务计数器 |
org.apache.hadoop.mapreduce.TaskCounter |
文件系统计数器 |
org.apache.hadoop.mapreduce.FileSystemCounter |
FileInputFormat计数器 |
org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter |
FileOutputFormat计数器 |
org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter |
作业计数器 |
org.apache.hadoop.mapreduce.JobCounter |
每次mapreduce执行完成之后,我们都会看到一些日志记录出来,其中最重要的一些日志记录如下截图
所有的这些都是MapReduce的计数器的功能,既然MapReduce当中有计数器的功能,我们如何实现自己的计数器?
需求:以分区代码为案例,统计map接收到的数据记录条数
第一种方式定义计数器,通过context上下文对象可以获取我们的计数器,进行记录
通过context上下文对象,在map端使用计数器进行统计。
增加PartitionMapper.java
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class PartitionMapper extends Mapper<LongWritable, Text,Text, NullWritable> { /** * map方法将K1和V1转为K2和V2 */ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException { Counter counter = context.getCounter("MR_COUNT", "MyRecordCounter"); counter.increment(1L); context.write(value,NullWritable.get()); } }
同时在主类中增加设置:
job.setMapperClass(PartitionMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(NullWritable.class);
如下图所示:
运行程序之后就可以看到我们自定义的计数器在map阶段读取了15213条数据:
第二种方式,通过enum枚举类型来定义计数器,统计reduce端数据的输入的key有多少个
增加PartitionerReducer.java类
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class PartitionerReducer extends Reducer<Text, NullWritable,Text,NullWritable> { public static enum Counter{ MY_REDUCE_INPUT_RECORDS,MY_REDUCE_INPUT_BYTES } @Override protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException { context.getCounter(Counter.MY_REDUCE_INPUT_RECORDS).increment(1L); context.write(key, NullWritable.get()); } }
同时在主类中增加设置:
job.setReducerClass(PartitionerReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
如下图所示:
执行效果如下:
,