hadoop大数据在线教程(大数据Hadoop下)(1)


Combiner组件

(1)Combiner 是MR 程序中Mapper和Reducer 之外的一种组件

(2)Combiner的组件的父类就是Reducer

(3)Combiner 和Reducer的区别在于运行位置

Combiner 是在每一个MapTask节点上 局部汇总 Reducer   是接收全局的Mapper的输出结果

(4)Combiner的意义就是对一个每一个MapTask的输出做局部汇总以减少网络传输量

(5)Combiner应用的前提是不能影响最终的业务结果 而且 Combiner输出KV 应该和Reducer 的输入KV相对应


应用场景

Combiner 并不适用与所有的业务

1. Combiner 适合进行累加的业务 2. 不适用avg()(求平均值)为什么? 例如:求0 ,20 ,10 ,25 ,15 的平均数 直接全部加起来除5等于 14 (全局汇总求平均数)         {(0,20,10)/3 = 10 第一次局部汇总计算 } {(25 15)/2 = 20 第二次局部汇总计算}   {(10 20)/2 = 15 全局计算}           根据上述计算直接进行全局运算是不影响最终结果的,但是如果局部运算后 平均数的结果会受到影响         所以Combiner 不适合求平均数

使用

(1)新建 CombinerClass 集成Reducer

(2)在驱动类(Job类)中使用job对象设置CombinerClass job.setCombinerClass();

案例

(1)WC计数案例

(2)job.setCombinerClass();

在没有设置CombinerClass之前:


在设置CombinerClass之后


MapReduce过程

首先由客户端节点提交Job InputFomat首先会做数据的切片,由InputFomat组件提供数据给Map函数 Map处理完成后 会交由Reduce函数进行处理,最终输出。


Shuffle


Shuffle 是MR 的核心,描述数据从MapTask输出到作为ReduceTask的输入的这段过程。

hadoop的大数据集群运行,在真正的生产计算中,大部分MapTask 和ReduceTask 都不在同一个节点上,Reduce就要去其他节点上取中间结果(通过网络)。那么集群运行多个Job时,ReduceTask的正常执行或许会严重消耗集群中的网络资源。虽说这种消耗是正常的,是不可避免,但是我们可以采取措施尽量就减少网络传输(可以使用Combiner)。

另外一个点:相比较于,磁盘io对于Job的运行效率影响很大。

所以 从以上分析 Shuffle过程的基本要求:


总结:Shuffle 是对Map输出结果的分区(partition),排序(Sort),合并(Combine),归并(merge)等操作 交由Reduce的过程

Map 端的Shuffle

(1)map结果写入缓冲区

(2)缓冲区达到阈值 溢写到磁盘

(3)分区内排序合并归并成大文件(key,value-list)


Reduce 端的Shuffle

(1)领取数据

(2)归并数据

(3)数据输入给Reduce任务



编程案例


排序输出

数据

15117826008 83 218 301 zz 15617126008 83 218 302 zz 16519756181 150 164 315 tj 16519756281 150 164 314 tj 18611781131 123 112 237 sh 18611781113 123 112 235 sh 18711671123 45 186 232 bj 18711671133 45 186 233 bj 18711671233 45 186 234 tw 18711631233 45 186 235 xy


package com.baizhi.test07; ​ import com.baizhi.test06.OwnPartitioner; import com.baizhi.test06.PJob; import com.baizhi.test06.PMapper; import com.baizhi.test06.PReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; ​ public class SortJob {    public static void main(String[] args) throws Exception { ​        //System.setProperty("HADOOP_USER_NAME", "root"); ​        Configuration conf = new Configuration(); // //       conf.addResource("conf2/core-site.xml"); //       conf.addResource("conf2/hdfs-site.xml"); //       conf.addResource("conf2/mapred-site.xml"); //       conf.addResource("conf2/Yarn-site.xml"); //       conf.set("mapreduce.app-submission.cross-platform", "true"); // //       conf.set(MRJobConfig.JAR, "F:\\大数据\\代码\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar"); // ​        Job job = Job.getInstance(conf); ​        /*         * 设置类加载器         * */        job.setJarByClass(SortJob.class); ​        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        /*         * 使用自定义的分区规则         * */        // job.setPartitionerClass(OwnPartitioner.class); ​ ​        // job.setNumReduceTasks(1); ​ ​        TextInputFormat.setInputPaths(job, new Path("F:\\大数据\\笔记\\Day02-Hadoop\\数据文件\\sumFlow.txt"));        //TextInputFormat.setInputPaths(job, new Path("/flow.txt"));        TextOutputFormat.setOutputPath(job, new Path("F:\\大数据\\笔记\\Day02-Hadoop\\数据文件\\outS111"));        //TextOutputFormat.setOutputPath(job, new Path("/out111111")); ​ ​        job.setMapperClass(SortMapper.class);        job.setReducerClass(SortReducer.class); ​ ​        job.setMapOutputKeyClass(FlowBean.class);        job.setMapOutputValueClass(NullWritable.class); ​ ​        job.setOutputKeyClass(FlowBean.class);        job.setOutputValueClass(NullWritable.class); ​        job.waitForCompletion(true); ​ ​   } } ​

package com.baizhi.test07; ​ import com.sun.org.apache.bcel.internal.generic.FLOAD; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; ​ import java.io.IOException; ​ public class SortMapper extends Mapper<LongWritable, Text, FlowBean, NullWritable> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        // 拿到数据 使用空格进行分割        String[] flowinfos = value.toString().split(" "); ​        /*         * 拿到下标为0的电话         * */        String phone = flowinfos[0];        /*         * 拿到下标为1的上传流量         * */        Long upFlow = Long.valueOf(flowinfos[1]); ​        /*         * 拿到下标为2的下载流量         * */        Long downFlow = Long.valueOf(flowinfos[2]);        Long sumFlow = Long.valueOf(flowinfos[3]); ​ ​ ​        /*         * 准备bean对象         * */        FlowBean flowBean = new FlowBean(); ​        /*         * */        flowBean.setPhone(phone);        flowBean.setUpFlow(upFlow);        flowBean.setDownFlow(downFlow);        /*         * 设置总流量 上传 下载         * */        flowBean.setSumFlow(sumFlow); ​        /*         * 数据写出         * */        context.write(flowBean, NullWritable.get()); ​ ​   } } ​

package com.baizhi.test07; ​ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; ​ import java.io.IOException; ​ public class SortReducer extends Reducer<FlowBean, NullWritable, FlowBean, NullWritable> {    @Override    protected void reduce(FlowBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {        context.write(key, NullWritable.get());   } } ​


package com.baizhi.test07; ​ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; ​ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; ​ public class FlowBean implements WritableComparable<FlowBean> { ​    private String phone;    private Long upFlow;    private Long downFlow;    private Long sumFlow; ​    public FlowBean() {   } ​    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {        this.phone = phone;        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = sumFlow;   } ​    public String getPhone() {        return phone;   } ​    public void setPhone(String phone) {        this.phone = phone;   } ​    public Long getUpFlow() {        return upFlow;   } ​    public void setUpFlow(Long upFlow) {        this.upFlow = upFlow;   } ​    public Long getDownFlow() {        return downFlow;   } ​    public void setDownFlow(Long downFlow) {        this.downFlow = downFlow;   } ​    public Long getSumFlow() {        return sumFlow;   } ​    public void setSumFlow(Long sumFlow) {        this.sumFlow = sumFlow;   } ​    @Override    public String toString() {        return "FlowBean{"                "phone='" phone '\''                ", upFlow=" upFlow                ", downFlow=" downFlow                ", sumFlow=" sumFlow                '}';   } ​    /*     * 往外写 序列化 编码     * */    public void write(DataOutput dataOutput) throws IOException { ​        dataOutput.writeUTF(this.phone);        dataOutput.writeLong(this.upFlow);        dataOutput.writeLong(this.downFlow);        dataOutput.writeLong(this.sumFlow);   } ​ ​    /*     * 往里读 反序列化 解码     * */    public void readFields(DataInput dataInput) throws IOException { ​        this.phone = dataInput.readUTF();        this.upFlow = dataInput.readLong();        this.downFlow = dataInput.readLong();        this.sumFlow = dataInput.readLong();   } ​    public int compareTo(FlowBean flowBean) { ​        if (this.sumFlow > flowBean.sumFlow) { ​            return 1;       } else if (this.sumFlow == flowBean.sumFlow) { ​            return 0;       } else {            return -1;       } ​        // return this.sumFlow > flowBean.sumFlow ? -1 : 1;   } } ​


排序分区输出

数据

15117826008 83 218 301 zz 15617126008 83 218 302 zz 16519756181 150 164 315 tj 16519756281 150 164 314 tj 18611781131 123 112 237 sh 18611781113 123 112 235 sh 18711671123 45 186 232 bj 18711671133 45 186 233 bj 18711671233 45 186 234 tw 18711631233 45 186 235 xy


hadoop大数据在线教程(大数据Hadoop下)(2)


package com.baizhi.test08; ​ import org.apache.hadoop.io.WritableComparable; ​ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; ​ public class FlowBean implements WritableComparable<FlowBean> { ​    private String phone;    private Long upFlow;    private Long downFlow;    private Long sumFlow; ​    public FlowBean() {   } ​    public FlowBean(String phone, Long upFlow, Long downFlow, Long sumFlow) {        this.phone = phone;        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = sumFlow;   } ​    public String getPhone() {        return phone;   } ​    public void setPhone(String phone) {        this.phone = phone;   } ​    public Long getUpFlow() {        return upFlow;   } ​    public void setUpFlow(Long upFlow) {        this.upFlow = upFlow;   } ​    public Long getDownFlow() {        return downFlow;   } ​    public void setDownFlow(Long downFlow) {        this.downFlow = downFlow;   } ​    public Long getSumFlow() {        return sumFlow;   } ​    public void setSumFlow(Long sumFlow) {        this.sumFlow = sumFlow;   } ​    @Override    public String toString() {        return "FlowBean{"                "phone='" phone '\''                ", upFlow=" upFlow                ", downFlow=" downFlow                ", sumFlow=" sumFlow                '}';   } ​    /*     * 往外写 序列化 编码     * */    public void write(DataOutput dataOutput) throws IOException { ​        dataOutput.writeUTF(this.phone);        dataOutput.writeLong(this.upFlow);        dataOutput.writeLong(this.downFlow);        dataOutput.writeLong(this.sumFlow);   } ​ ​    /*     * 往里读 反序列化 解码     * */    public void readFields(DataInput dataInput) throws IOException { ​        this.phone = dataInput.readUTF();        this.upFlow = dataInput.readLong();        this.downFlow = dataInput.readLong();        this.sumFlow = dataInput.readLong();   } ​    public int compareTo(FlowBean flowBean) { ​        if (this.sumFlow > flowBean.sumFlow) { ​            return -1;       } else if (this.sumFlow == flowBean.sumFlow) { ​            return 0;       } else {            return 1;       } ​        // return this.sumFlow > flowBean.sumFlow ? -1 : 1;   } } ​

package com.baizhi.test08; ​ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; ​ import java.util.HashMap; ​ public class OwnPartitioner extends Partitioner<FlowBean, Text> { ​ ​    private static HashMap<String, Integer> map = new HashMap<String, Integer>(); ​    static { ​        map.put("zz", 0);        map.put("bj", 1);        map.put("tj", 2);        map.put("sh", 3); ​   } ​ ​    public int getPartition(FlowBean key, Text value, int i) { ​        String areaName = value.toString(); ​        Integer num = map.get(areaName); ​        /*         * 如果num 没有值 为空则返回4 不为空则返回正常的值         * */        return num == null ? 4 : num; ​   } } ​

package com.baizhi.test08; ​ ​ import com.baizhi.test07.SortJob; import com.baizhi.test07.SortMapper; import com.baizhi.test07.SortReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; ​ public class SPJob {    public static void main(String[] args) throws Exception { ​        //System.setProperty("HADOOP_USER_NAME", "root"); ​        Configuration conf = new Configuration(); // //       conf.addResource("conf2/core-site.xml"); //       conf.addResource("conf2/hdfs-site.xml"); //       conf.addResource("conf2/mapred-site.xml"); //       conf.addResource("conf2/yarn-site.xml"); //       conf.set("mapreduce.app-submission.cross-platform", "true"); // //       conf.set(MRJobConfig.JAR, "F:\\大数据\\代码\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar"); // ​        Job job = Job.getInstance(conf); ​        /*         * 设置类加载器         * */        job.setJarByClass(SPJob.class); ​        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        /*         * 使用自定义的分区规则         * */        job.setPartitionerClass(OwnPartitioner.class); ​ ​        job.setNumReduceTasks(5); ​ ​        TextInputFormat.setInputPaths(job, new Path("F:\\大数据\\笔记\\Day02-Hadoop\\数据文件\\sumFlow.txt"));        //TextInputFormat.setInputPaths(job, new Path("/flow.txt"));        TextOutputFormat.setOutputPath(job, new Path("F:\\大数据\\笔记\\Day02-Hadoop\\数据文件\\outSP111"));        //TextOutputFormat.setOutputPath(job, new Path("/out111111")); ​ ​        job.setMapperClass(SPMapper.class);        //job.setReducerClass(SortReducer.class); ​ ​        job.setMapOutputKeyClass(FlowBean.class);        job.setMapOutputValueClass(Text.class); ​        //job.setOutputKeyClass(FlowBean.class);        //job.setOutputValueClass(Text.class); ​        job.waitForCompletion(true); ​ ​   } } ​

package com.baizhi.test08; ​ import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; ​ import java.io.IOException; ​ /* * * 在valkue中存储地区信息 * */ public class SPMapper extends Mapper<LongWritable, Text, FlowBean, Text> {    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { ​        // 拿到数据 使用空格进行分割        String[] flowinfos = value.toString().split(" "); ​        /*         * 拿到下标为0的电话         * */        String phone = flowinfos[0];        /*         * 拿到下标为1的上传流量         * */        Long upFlow = Long.valueOf(flowinfos[1]); ​        /*         * 拿到下标为2的下载流量         * */        Long downFlow = Long.valueOf(flowinfos[2]);        Long sumFlow = Long.valueOf(flowinfos[3]);        /*        * 拿到地区信息        * */        String areaName = flowinfos[4]; ​ ​ ​ ​        /*         * 准备bean对象         * */        FlowBean flowBean = new FlowBean(); ​        /*         * */        flowBean.setPhone(phone);        flowBean.setUpFlow(upFlow);        flowBean.setDownFlow(downFlow);        /*         * 设置总流量 上传 下载         * */        flowBean.setSumFlow(sumFlow); ​        /*         * 数据写出         * */        context.write(flowBean, new Text(areaName)); ​ ​   } } ​


成绩合并

学生信息

gjf 00001 gzy 00002 jzz 00003 zkf 00004

学生课程信息

00001 yuwen 00001 shuxue 00002 yinyue 00002 yuwen 00003 tiyu 00003 shengwu 00004 tiyu 00004 wuli

期望输出结果:

00001 gjf yuwem shuxue

package com.baizhi.test09; ​ ​ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; ​ public class MJob {    public static void main(String[] args) throws Exception { ​        //System.setProperty("HADOOP_USER_NAME", "root"); ​        Configuration conf = new Configuration(); // //       conf.addResource("conf2/core-site.xml"); //       conf.addResource("conf2/hdfs-site.xml"); //       conf.addResource("conf2/mapred-site.xml"); //       conf.addResource("conf2/yarn-site.xml"); //       conf.set("mapreduce.app-submission.cross-platform", "true"); // //       conf.set(MRJobConfig.JAR, "F:\\大数据\\代码\\BigData\\Hadoop_Test\\target\\Hadoop_Test-1.0-SNAPSHOT.jar"); // ​        Job job = Job.getInstance(conf); ​        /*         * 设置类加载器         * */        job.setJarByClass(MJob.class); ​        job.setInputFormatClass(TextInputFormat.class);        job.setOutputFormatClass(TextOutputFormat.class);        /*         * 使用自定义的分区规则         * */        // job.setPartitionerClass(OwnPartitioner.class); ​ ​        // job.setNumReduceTasks(5); ​ ​        TextInputFormat.setInputPaths(job, new Path("F:\\大数据\\代码\\BigData\\Hadoop_Test\\src\\main\\java\\com\\baizhi\\test09\\in"));        //TextInputFormat.setInputPaths(job, new Path("/flow.txt"));        TextOutputFormat.setOutputPath(job, new Path("F:\\大数据\\代码\\BigData\\Hadoop_Test\\src\\main\\java\\com\\baizhi\\test09\\out11"));        //TextOutputFormat.setOutputPath(job, new Path("/out111111")); ​ ​        job.setMapperClass(MMapper.class);        job.setReducerClass(MReducer.class); ​ ​        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(Text.class); ​        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(Text.class); ​        job.waitForCompletion(true); ​ ​   } } ​


hadoop大数据在线教程(大数据Hadoop下)(3)


package com.baizhi.test09; ​ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; ​ import java.io.IOException; ​ public class MMapper extends Mapper<LongWritable, Text, Text, Text> { ​    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        /*         * 通过context 获得当前读取的文件         * */        FileSplit fileSplit = (FileSplit) context.getInputSplit();        /*         * 获取文件路径         * */        Path path = fileSplit.getPath(); ​ ​        /*         * 当前行使用空格进行分割         * */        String[] infos = value.toString().split(" "); ​        /*准备存储key的变量         * */        String mapkey = ""; ​        /*         * 准备存储value的变量         * */        String mapValue = ""; ​        /*         * 判断当前行来自哪个文件         * */ ​        if (path.toString().contains("student_info.txt")) { ​            /*             * 名字             * 加上a 代表是名字             * */            mapValue = infos[0] " a"; ​            /*             * 学号             * */            mapkey = infos[1]; ​       } else {            /*             *学科             * 加上b 代表是学科             * */            mapValue = infos[1] " b"; ​ ​            /*             * 学号             * */            mapkey = infos[0]; ​ ​            /*(00001,gjf)            (00001,yuwen)            (00001,shuxue) */ ​       } ​ ​        /*         * 输出学号 key | 输出可能是name或者学科 value         * */        context.write(new Text(mapkey), new Text(mapValue)); ​   } } ​


package com.baizhi.test09; ​ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; ​ import java.io.IOException; import java.util.ArrayList; ​ public class MReducer extends Reducer<Text, Text, Text, Text> { ​     /*     (00001,gjf a)            (00001,yuwen b)            (00001,shuxue b)    */ ​    @Override    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {        /*         * 准备存放名字的变量         * */        String name = ""; ​        /*         * 准备存放 学科的集合         * */        ArrayList<String> classList = new ArrayList<String>(); ​ ​        /*         * 遍历当前学号的下的所有信息         * */        for (Text value : values) { ​            /*             * 使用 空格进行分割             * */            String[] infos = value.toString().split(" ");            /*             * 拿到flag 标识   表示是学科还是姓名             * */            String flag = infos[1]; ​ ​            /*             * 如果为a 则为姓名             * */            if (flag.contains("a")) { ​                /*                 * 拿到name的值 复制给name                 * */                name = infos[0]; ​                /*                 * 其他则为 学科                 * */           } else {                /*                 * 拿到学科的值 封装在list中                 * */                classList.add(infos[0]); ​ ​           } ​ ​       } ​ ​        /*         * 将集合中的值 取出 放在 字符串中         * */        String clasLine = "";        for (String s : classList) {            clasLine = s " ";       } ​        /*         * 最终进行输出         * */        context.write(new Text(key.toString().trim()), new Text(name " " clasLine.trim()));   } } ​


MR优化策略

(1)干预切片计算逻辑-CombineTextInputFormat(小文件优化)

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4M

(2)实现Partitioner策略防止数据倾斜,实现Reduce Task负载均衡

(3)适当调整YranChild内存参数,需要查阅Yarn参数配置手册,一般调整vcores和内存参数CPU使用

(4)适当调整溢写缓冲区大小和阈值

(5)适当调整合并文件并行度mapreduce.task.io.sort.factor=10'

(6)对Map端输出溢写文件使用gzip压缩,节省网络带宽

conf.setBoolean("mapreduce.map.output.compress", true); conf.setClass("mapreduce.map.output.compress.codec",                GzipCodec.class, CompressionCodec.class);


hadoop大数据在线教程(大数据Hadoop下)(4)


,