当前课程知识点:大数据平台核心技术 > 实践2:编写MR完成Group By+Join操作 > 主讲人:谢德军 > 实践2:编写MR完成Group By+Join操作(主讲人:谢德军)
大家好
我是来自阿里云
ODPS团队的谢德军
大家都知道
MapReduce是分布式
最基础的编程模型
实践2的内容就是用
最基础的MapReduce
来实现SQL Group By
和Left Outer Join操作
在大家真正开始实践之前
我会从几个方面
对实践2进行说明
首先我会介绍一下
实践2的具体内容
接下来会说明这个实践
是对哪些前序课程的检验
然后会介绍一下
MapReduce的背景知识
以及ODPS MapReduce的
接口设计
最后是一个事例
讲解如何用
ODPS MapReduce
实现一个Inner Join
我们可以看到
这里有两条SQL
第一个Group By是
为了找到
订单数大于等于3的客户
及其订单数
第二个Left Outer Join
是为了找到UK用户所下订单
同时保留其他国家用户信息
实践2的内容就是用
MapReduce分别实现
这两条SQL
前序课程分布式编程模型的
设计与演化中
我们有学习
MapReduce编程模型是什么
编程接口是什么样子
还学习了MapReduce
最基础的
事例程序WordCount
另外一个前序课程
分布式引擎的设计与实现中
我们有了解到
关系型计算的基本原理
还有两个最基础的算子
聚合和连接的分布式实现
实践2就是检查
大家对这两节课的掌握
下面我们一起重温一下
MapReduce编程模型
MapReduce是2004年
由Google提出的软件架构
用于大规模数据集的并行运算
其分成Map和Reduce
两个子过程
同时
框架会负责将Map的输出
按Key分区排序
并传给Reduce
也就是Shuffle过程
Shuffle过程又分成
Map端的Partition&Sort
Map和Reduce之间的Copy数据
以及Reduce端的Merge
几个子过程
所以有五个详细子过程
MapReduce有一些典型的
应用场景
比如日志分析 数据查询
数据仓库的ETL过程等
如果我是一个图书馆的管理员
我想清点一下
图书馆有多少本藏书
有很多志愿者可以帮我
我要怎么办呢
其实这就是一个最简单的
MapReduce事例
Map阶段
第一个志愿者数第一个书架
第二个志愿者数第二个书架
依此类推
每个志愿者告诉我
自己所数书架的书籍数
Reduce阶段
我汇总所有书架书籍数
得到最终结果
我们可以看到
如果志愿者越多
Map阶段就越快结束
MapReduce就是
用大量的计算资源
缩短数据处理过程的方法
MapReduce有哪些
基本的设计要点呢
前面提到Shuffle是
MapReduce的核心过程
围绕Shuffle过程
主要有两个设计要点
中间结果的设计
和数据以何种方式聚集
以右图Google MapReduce
论文中出现的
WordCount伪代码为例
程序的目的是要统计
页面中Word的数量
中间结果的Key value对
是Word和Count值
这里的Count值是常量一
我们期望相同的Word
能被分到同一个Reduce处理
所以中间结果
需要按Word来分区排序
ODPS MapReduce API
主要包括MapperBase
和ReducerBase两个基类
开发者需要实现两个类的函数
map和reduce
同时ODPS MapReduce
读写数据都是存在表中
所以map reduce函数的参数
都是Record
Map函数的输入是Key value
输出是Key value的列表
Reduce的输入已经按Key排序
是Key和Value的列表
输出和Map类似
也是Key value的列表
开发者可以设置
Shuffle的属性
包括数据分片列
排序列和分组列
对于WordCount的例子
Word既是分片列
也是排序列
分组列的使用场景
会在后续例子中说明
下面我们会详细讲解一下
Inter Join的MapReduce实现
这条SQL的功能特别简单
通过customer_id
连接orders和customers
两张表
找到order_id
和customer_name的对应
如果orders有五条记录
customers有三条记录
两表按customer_id连接
Join的输出有五条记录
为订单由哪个客户所下
前序课程分布式
SQL引擎的设计与实现中
已经讲到
如果分布式Join的物理算法
是MergeJoin的话
物理执行计划如右图所示
会有三个Task
其中两个Map分别读取
两个表的数据
然后按照customer_id
进行Shuffle
Reduce中Join Operate
能够按customer_id
有序地拿到左右两表数据
进行Join
但是ODPS MapReduce
不支持实现多个Map方法
所以Join在
ODPS MapReduce中的实现
只有一个Map
Map的输出会额外增加一列
tag列
用来标识数据来自哪张表
还是之前的示例数据
首先开发者可以设置
一个Worker读取的数据量
也就是Input split
这里假设Split.size很小
每一个Worker只处理
三条数据
Fuxi Job提交之后
会启动三个Map worker处理
两个表的数据
Map函数的输入是一条记录
红色部分为Map Key
是customer_id和tag
浅红色部分为Map value
是order_id
和customer_name
对于Order表Key的第二列
tag永远为零
orders的第二列
customer_name
一直为NULL
第二个worker同样处理
orders表
第三个worker处理
customer表
这里我们可以看到
tag order_id
以及customer_id
相对于oders表的变化
如果只有两个Reduce
每个Map的输入会按
Partition列分成两个分片
并且按照sort列排序
Copy阶段是将Map输出的
不同分片
分发到对应Reduce的过程
第一个Reduce
得到每个Map的第一个分片
同样
第二个Reduce
得到第二个分片
Merge阶段是归并排序的过程
归并排序的结果
仍然按sort列有序
这时Reduce worker启动
开发者实现的
Reduce函数被调用
默认情况下
Reduce的输入是Key
与对应的Value的List
这里的Key是
customer_id和tag
但是
我们期望每次Reduce的调用
能够拿到相同的
customer_id的数据
即使tag不同
这里需要用到前面提到的
Shuffle的Grouping属性
如果设置Grouping列
为customer_id的话
一次Reduce调用能够拿到
相同的customer_id
在两张表中对应数据
根据tag区分左右表进行连接
最终输出Join结果
图中数据流
实线为网络读写
虚线为本地读写和内存操作
下面让我们看一下
Inner Join的
ODPS MapReduce实现
首先
我们需要实现一个
JoinMapper类
并实现两个基类函数
Setup和Map
其中Setup函数同一个Worker
只会调用一次
用于初始化输出的MapKey和
MapValue record
同时
通过context得到
当前的数据来自哪一个Table
初始化tag变量
对于Table的每行数据
Map函数会被调用一次
根据tag选择输出的
Mapkey和Mapvalue
来自哪张源表的哪些列
然后我们还需要实现一个
Join Reduce类
同样实现两个基类函数
Setup和Reduce
其中
Setup初始化结果Record
对于相同的Grouping
Reduce函数被调用一次
根据tag区分左右表数据
Hold左表数据
遇到右表数据输出Join结果
Main函数首先构造
Job Count对象
SetMapperClass为之前实现的
JoinMapper类
Reduce同理
然后设置中间结果的
Key value对应的Schema
之后设置Partition列为
customer_id
设置Sort列为
customer_id+tag
设置Grouping列为
customer_id
最后添加输入输出表
并调用RunJob
这就是今天课程的全部内容
关于实践2
同学们有任何问题
都可以在MOOC平台提问
我会第一时间给您回复
-主讲人:武永卫
-主讲人:程永
-QUIZ--作业
-大纲
-初步认识大数据对分布式存储系统的需求
-理解大数据对分布式存储系统的需求
-具体说明大数据对分布式存储系统的需求
-大规模分布式存储的挑战
-小概率事件-Raid卡故障
-分布式存储系统举例
-分布式存储系统重要功能设计要点剖析
-链式写正常流程
-写流程的另一种常见方式:主从模式
-链式写异常流程
-写异常处理的另一种方法-Seal and New
--写异常处理的另一种方法-Seal and New(主讲人:姚文辉)
-读正常流程
-读流程优化-BackupRead
-IO QoS
-数据正确性:checksum
-数据可靠性-Replication
-数据均衡-Rebalance
-垃圾回收-Garbage collection
--垃圾回收-Garbage collection(主讲人:姚文辉)
-Erasure coding
-Erasure coding(3,2)写入和读取过程
--Erasure coding(3,2)写入和读取过程(主讲人:姚文辉)
-元数据管理的高可用性和可扩展性
-元数据管理的高可用性
-Paxos概要
-Raft
-元数据管理的可扩展性
-不同存储介质的特性
-盘古混合存储
-QUIZ--作业
-阿里云飞天分布式调度
-任务调度
-资源调度
-容错机制
-规模挑战
-安全域性能隔离
-分布式调度的发展方向
-QUIZ--作业
-数据格式和抽象
-分布式编程模型
-MapReuduce编程模型
-关系型数据编程模型
-分布式图计算模型
-分布式编程未来展望
-QUIZ--作业
-分布式事务
-分布式一致性算法
-两阶段提交与三阶段提交
-实践--介绍
-关系型计算基本原理_1
-关系型计算基本原理_2
-分布式环境中的连接计算和聚合计算
-其他计算和物理优化
-QUIZ--作业
-提纲
-课程背景介绍
-前序知识
-分布式节点距离计算法则
-数据分布策略
-分布式计算调度
-数据就近原则计算如何容错
-ODPS跨集群数据依赖
-QUIZ--作业
-主讲人:谢德军
--实践2:编写MR完成Group By+Join操作(主讲人:谢德军)
-增量计算和流式计算
-与批量计算的区别
-业界典型系统技术概要分析
-核心技术
-消息机制
-有状态计算、并行DAG、抢占式调度和资源隔离、Failover机制
--有状态计算、并行DAG、抢占式调度和资源隔离、Failover机制(主讲人:强琦)
-StreamSQL
-QUIZ--作业
-软硬件趋势、分布式计算简史与内存计算
-分布式计算
-内存计算
-统一的计算框架
-业界经典系统技术分析-spark&flink
--业界经典系统技术分析-spark&flink(主讲人:强琦)
-QUIZ--作业
-主讲人:褚葳
-QUIZ--作业
-分布式环境下的新问题
-工程实现范例
-课程设计相关问题