当前课程知识点:高级大数据系统 > MapReduce > Tutorial > Video
我们刚刚介绍了map reduce和hadoop
下面我们会告诉大家
怎么样在一个真正的
已经部署好hadoop的
这样一个集群上去运行一个
你们自己的程序
那么在这一部分
我们必然以大家
我们介绍过很多次的
这个word count为例
来告诉大家一步一步
怎么样准备数据
怎么样把数据上传到
分布式文件系统
怎么样运行hadoop的程序
在一个集群上进行mapreduce
程序的一个部署
实际上它的步骤大概
是分成几步
第一步
就是说我们得准备好数据
我们有了数据我们才能去运行
这样的一个合作的程序
第二步我们其实是需要在单机上
进行这样一个程序的开发
就是我们需要在单一的一个服务器上
把这个程序给简单的
把它给编制出来
然后我们需要把这个程序
给部署到hadoop的机群上
然后我们去运行hadoop的
这样一个程序
最后我们需要把结果
保存到分布式文件系统
然后把结果还要可能移到本地
然后进行进一步的处理
或者说数据的这样一个分析
那我们来看
这几步我们分别是怎么做的
我们后面会给大家介绍
怎么样把数据传输到
这个分布式文件系统
我们首先来看在本机上
我们编写的这个程序
到底是怎么样进行的
那么这页PPT给的是
mapper这个程序
我们是怎么写的
而这页PPT我们给的是
reducer是怎么写的
我们后面会给
大家介绍它的细节
然后有了这两个程序之后
实际上我们需要在本地
来做一个这个调试
然后在本地
它能够真正work之后
我们才需要再远程去运行
那么我们现在来看
在本地这个程序
是怎么样进行的
那么我这边已经登陆到一个
hadoop的集群
在这个集群我的一个目录底下
实际上我已经编写好了
一个简单的mapper
和一个简单的这个reducer
那我们来看
首先我们看这个mapper
它是怎么写的
那么这个mapper起到的作用
是去把我们要进行count word
进行这个word count的
这个程序中的单词
首先把它按照空格
进行一个切分
切分了之后
我们需要进一步把这些单词
按照key value的形式输出
输出单词和数字一
那这是我们课堂上已经讲了的
这样一个输出方式
那我们来看这是怎么做的
那么第一步
我们对所有的这个输入的行
从stdin输入的行
这是我们后面运行的时候
系统可以支持给我们的
那么把这些行的
这个头尾进行一个去除
那这一步可能是一个行的
这样一个预处理
我们可以把它头和尾
多余的这个空行给去掉
然后我们把行进行一个切分
根据它的这个空格
当然这一步切分
其实是有问题的
因为如果句子中包含标点符号
逗号 句号
或者说其它的这个符号
也许是需要被我们去掉的
那这一步
我们简单认为空格
可以把单词给割裂出来
然后对于每一个单词
我们输出中间Key和中间value
那中间Key和中间value
我们要的这个形式
就是由tab隔开
然后第一个元素是单词
第二个元素是数字1
那这一步我们就把
mapper做成了
那么我们在本地来看一看
mapper运行的结果
是一个什么样的样子
那么我们针对test这个文件
test已经包含了若干的
句子在里面
我们来把这个文件进行map
通过管道
那么在之前
我们也给大家说到了
管道的方式
那我们看到它的输出
正是我们想要的一个样子
单词和中间的
这个value是1
那么经过这一步之后
实际上hadoop它会
帮助我们把这些Key给
分配到不同的reducer上去
那么有的reduce会负责
有的Key的这个处理
总之
相同的Key对应的所有的
value都会到达
某一个reducer
那我们看reducer
又是怎么去进行的呢
同样我们写了
最简单的一个reduce
那么在这个reduce中
我们会对单词的这个频数
进行统计
我们来看这是怎么进行的
我们制作了一个word2count
这样一个字典的一个结构
在python里面
那么当它收到一行
当reducer收到一行之后
它会对这一行
同样进行一个strip
把它的前后进行一下整理
然后我们通过tab分割
把它切成了单词
和对应的这个数字1
那么对应的这个value
对应的这个count
那么有了这个count之后
我们会把count
转成一个数字
原来可能是字符串
现在我们把它转成了数字
那么有了这个数字之后
我们就会对刚刚的这个word
这个count进行更新
更新的过程就是
将这个count叠加到
原来已经加和的
这个count当中
最后我们会输出
这个每个单词统计出来的
一个结果
那这一步
我们的一个效果是
做到了什么
我们会把所有的
这个单词对应的数字1取出来
然后把数字1进行这个叠加
也就是说会对这个word的
这个结果它的频数
进行一个叠加
我们最终把结果输出
那么当然这不输出
我们使用了print
我们交给了hadoop
去帮我们存储的
分布式文件系统中
大家要注意的是
真正的一个完整的
hadoop的程序
可能你需要自己
去处理文件的输入和输出
我们在这里只是使用了
hadoop的一个特殊的编程模式
叫做streaming
它可以使用print和stdin来进行
这个原始数据的读入
和原始数据的输出
那么分别从这个分布式文件系统来
或者是到分布式文件系统里面去
那么我们在本地调试的时候
我们就想看
刚刚reducer的
这个mapper的这个结果
交给reducer之后
它到底能不能正确的处理出来
那我们来看一下
这一步实际上我们
已经对它进行了map
会输出上面的这样一个结果
那么我们看当它接上reducer之后
得到的结果是什么样的
我们发现有的单词出现了3次
那有的单词出现了2次
结果好像正是我们需要的
所以当我们在本地
把这样一个程序调试的差不多之后
我们就可以进行hadoop
程序的一个部署了
那这就是我们
最简单的一个hadoop的程序
那我们看下一步应该做什么
我们在本地已经有了程序
已经有了数据
那我们实际上是需要
把我们的这个数据
第一步先放到分布式文件系统上
那么我们才能进行相应的操作
那么下面我们就来演示一下
怎么在这个hdfs集群上
进行分布式文件的一个操作
我们在这边列到
进行分布式文件操作
实际上它的一个命令是hadoop fs
那么Ffs之后
我们通过参数
实际上是可以进行类似于在
linux上的这个文件的操作
比如说我们可以进行列目录
通过-ls
我们进行文件的这个复制
从分布式文件系统
到分布式文件系统
我们还可以创建目录
在分布式文件系统上
创建一个新目录
那么我们可以把本地的文件
从本地服务器复制到远程的
这个分布式文件系统服务器
也可以把分布式文件系统的文件
复制到本地
那我们来演示一下
这些操作是怎么样进行的
首先我们来列一下目录
因为我的这台服务器
已经配置好了这个hadoop
所以我们可以
直接把目录列出来
比如说我们可以ls
列一下根目录
那么我们可以发现
根目录底下
可能会有一些这个路径
比如说给user的
这个/user
然后用于hbase的
这个/hbase
还有一个dataset
我们放置了dataset
还有tmp
我们放置一些临时的数据文件
这是我们在分布式文件系统里面
可以做的一些
这个列目录的操作
那么我们可以创建目录
比如说我们可以在tmp底下
创建一个新目录
比如说我们叫做学堂X
那么我们可以在tmp底下
创建一个叫做学堂X的目录
这时候它会通过客户端
hadoop的这个fs的客户端
通知它的这个中心服务器
hadoop的这样
一个master的节点
那么我们会在这个节点上
去创建目录的结构
然后分配这些原数据信息
那么我们来看
在tmp底下这些目录的
一个情况
进一步使用列目录的方式
我们发现这个目录
会被添加到hadoop当中
在这里我们可以
找着学堂这个目录
好 我们还可以把文件放到远程
放到分布式文件系统中
我们现在可以通过
叫做copy from local这个命令
也就是说从本地复制到
远端的这样一个命令
我们本地的文件
比如说我们刚刚有一个
叫test.txt的文件
我们可以复制到
远端tmp/学堂X底下
我们可以放到
我们刚刚创建的这样
一个目录里面去
我们看这个操作已经完成
我们可以进一步列这个目录
那我们发现这个文件
应该是已经放到
分布式文件系统里面去
大家可以注意的是
当我们把这个文件放到
分布式文件系统之后
实际上这个文件已经通过
replication的策略
被复制到了集群中的
各个服务器上
那我们这边就不去给
大家演示这个过程了
好
我们有了这个
分布式文件系统的操作方式
我们也把数据
从本地移动到了这个
hadoop可以去操作的
这些分布式文件的目录底下
下面我们就可以去运行
这样一个程序了
那么我们在这边通过hadoop
这个命令可以去运行用java
编写好的这些hadoop的程序
那在这边我们通过python
写了两个streaming
运行的这样一个脚本
那么我们会用hadoop
它提供的一个
streaming的这样一个运行环境
已经编好的一个hadoop的一个程序
来对这种简单的程序
进行运行
那么它的命令就在PPT里面
大家可以看得到
那我们把这个命令
放到我们的命令行之后
实际上我们来看一下它的运行的
效果是怎么样的
那么这个命令它调用了
本地的一个jar包
这个jar包会让我们编写好的
这种程序以streaming的方式运行
也就是说
可以以stdin作为输入
而以这个stdout作为输出
分别对应到分布式文件系统上的
方式去运行
它的好处是编写简单
我们演示的时候可以给大家一个
比较直观的效果
同时它的本地和远程的程序
几乎不用做修改
就能够直接运行
那么我们来看运行这个程序
它需要参数是什么样
首先我们在这里给出了一个input
这个input是分布式文件
系统中的一个目录底下对应的
要处理的这些输入文件
那么我们已经在这里
准备好了一系列的英文小说
我们要对这些英文小说
进行word count
它的输出是我本地的一个目录
在这个目录底下
会输出我们需要的数据
那么它的mapper是什么呢
我们在这里指定了
使用本地的map.py
作为它的一个输入
使用本地的这个reducer.py
作为它reducer的一个程序
好
那我们来看这个程序运行的
一个效果是什么样的
我运行回车之后
那这个程序开始被调动到了
这个hadoop上
那么hadoop的job task
开始分配task tracker
也就是说可以调动起mapper
和reducer的这些节点
当然这个过程可能会比较慢
尤其是当系统中
有多个用户需要
对这个资源进行争抢的时候
我们来看这个过程
从这一步开始
reducer mapper经开始执行了
那我们的这个系统身上
现在可能还有多人在运行
所以这个mapper的过程是比较慢的
同时我们处理的数据也比较大
大概有好几个GB的小说
所以reducer
在不断的进行
那么我们通过
这个地方实际上
我们是可以在本地
就对我们当前运行的
一个job行一个trak
我们知道map进行了多少
我们知道reducer
进行了多少
那我们来等待这个任务完成
当然在这个过程中
大家还可以通过外部的portal
我们去访问整个job tracker
它所有job的情况
我们可以去监控整个hadoop
环境运行的健康情况
比如说
目前有总共有多少个job在运行
有多少个reducer在运行
那么是不是我们某一些小的job
被大的job给阻塞住了
这些都是我们可以在整个
portal上面看得到的
那这边讲一个题外话
就是hadoop今天
依然是工业界使用的
最重要的运行环境
尽管我们今天已经
有了spark
但是很多这种流程化的业务
依然是在spark上运行着的
它的一个好处就是
当你的业务已经成熟
你的流程化已经做好之后
使用hadoop它会得到一个
非常稳定的运行的支持
因为它不依赖于很大的内存
它可以从公司文件系统
到分布式文件系统
基本上依赖于磁盘的io
来进行数据的处理
那在这边我们看到
reducer也已经开始启动了
也就是说在我们的整个数据中
也经有一部分key中间key
对应的所有map上的数据
到达了他们指定的reducer上
这些reducer已经开始进行count的叠加
好
reducer的过程是比较快的
现在reducer也已经完成了
那么现在需要做什么呢
现在整个hadoop程序
会把结果写回到
分布式文件系统
那么大家也会意识到
在我们中间的过程
会有大量的数据
是写在mapper的本地磁盘的
也就是我们做
shuffle的那个过程
这个大家可以
再去查验我们之前的课程
好 我们现在来验证一下
这个结果是不是真的生成了
我们看这个结果
是输出到了我自己的一个
目录底下
我们来看底下是一些
什么样的文件
我们同样通过hadoop fs -ls
来查看这个目录
我们发现当我们
没有做任何约束的时候
实际上这些结果
以这种part的形式输出出来
实际上这个结构
是我们可以去定义的
我们随便把其中一个part
拿回来看看
它的样子是什么样的
我们用hadoop文件复制的功能
我们用copyTolocal从远程到local
我们把这个路径底下的
这个文件复制到当前目录
那么这个过程实际上
很可能是需要从网络然后到达本地
那我们来看本地这个文件
已经到了
我们来看这个文件里面
到底写了一些什么东西
好 们发现不同的单词
出现在这儿了
然后单词后面是他的频数
这边大家可以发现
我们刚刚说到的一些问题
比如这个标点符号被包含进去
然后可能并不是一个真实的单词
同一个单词可能会被统计成
多个不同的拷贝
那这是大家可以
在后面的练习当中
去尝试解决的问题
好
我们刚刚介绍了这个spark
那么在真实的一个集群上
部署了spark之后
我们怎么样在上面
去运行一个真实的
数据处理任务
让我们通过一些实际的操作
给大家介绍这件事情
那么在spark上
去运行一个真实的程序
大概分成几个步骤
第一个步骤就是
说我们要准备数据
和我们在spark上运行数据
运行一个程序实际上是类似的
我们需要把这个程序
放到分布式文件系统
或者是放到其它hadoop
能够读取的这样一个数据源上
然后我们需要去通过
这些分布式文件系统
来生成最初的RDD
大家回想
在spark里面
我们的数据处理实际上
是基于RDD的
基于RDD的变换
基于RDD的action
然后我们会去对原生的这些RDD
进行transformation
进行一些形变
比如说map
比如说flat map
或者说是reduceByKey
然后有了这一步之后
我们会去做action
我们最终想要它输出
我们可以action
让它输出到文件
我们可以action
让它输出到桌面
最后我们会去查看
我们的结果
如果结果不正确
我们需要迭代
如果结果正确
我们需要进一步
进行持续化的操作
或者说是反复进行
这个数据的查询
那么我们来看看这几步
在spark的这种实际代码的编写当中
它是怎么样完成的
准备数据
那么准备数据
我们在之前已经介绍过
怎么样从本机把数据
复制到分布式文件系统上
这边我们就列举
那么实际上spark
它既能访问本机的数据
也能访问
分布式文件系统上的数据
它大概是需要这样的
这种文件标识的方法
本机的文件以file开头
分布式文件系统上的文件
以hdfs开头
通过两种方式
我们可以把这个数据
作为原始的这个数据源
用来生成第一批的RDD
那么有了这个原始数据之后
我们需要去启动spark的程序
那么启动spark的程序
实际上大概会有两种模式
第一个我们编写
独立的hadoop spark程序
让它在spark这个
运行环境下去运行
那么它的过程有点类似于这个
hadoop的执行方式
大家可以去参考一下手册
看看怎么样运行
那在这个过程中
大家也可以使用不同的编程语言
包括java、python、scala的方式
来进行程序的编写
那么第二种模式
就是我们经常提到的这个
交互式的操作
我们会开启一个命令行窗口
那么以交互的这种形式
来进行数据的操作
那么我们在这个实验课当中
会重点给大家介绍这种模式
那么屏幕上给的这个命令行
实际上就启动了一个叫做
py-spark的这样一个终端
它是一个命令行交互的一个终端
那么在这里面大家可以运行
spark支持的
这些命令和语言
那么它的这个语言
是用python来编写的
所以
大家可以在这里面运行类似于
python的这样一个代码
那么在这个里面
我们指定了它的master
是一个yarn-cline
然后我们指定了他的
executor的数量是十
那么启动了之后
我们就可以去运行这个
spark程序了
那么我们第一步要运行
是需要创建一个
spark的context
这在spark的编程方式
和命令行方式它是一致的
只不过在命令行方式
这个context
会帮我们直接创建起来
通过context
我们可以把一个
分布式文件系统的
这样一个文件
或者是本地的文件
给转成第一个RDD在这里面
就是一个text file
这样一个RDD
有了这个之后
我们可以对它做一系列的操作
那么大家看到
前面的3个大于符号
就是刚刚提到的这个shell
它的一个提示符
在这里面我们可以对它进行flat map
我们可以进行
flat map之后来继续做map
然后我们还可以进行reduceByKey
等等这样的一系列的操作
最后我们可以进行action
比如说我们要看
前10个元素是什么
然后我们可以去
对里面的一些元素进行查找
通过一个叫做filter的操作
然后我们可以把它输出到文件
那么把它保存
结果保存出来
那么下面我们会通过一些例子
来给大家介绍一下这个过程
那么在我的这个集群上
我们运行过hadoop的程序
那现在我们可以进一步来看
如果我们运行
spark程序的时候
是怎么样的
我这边已经保存了一系列
这个spark的代码
为了输入的方便
我直接把这些代码复制过去
首先
我们启动一个spark的shell
那么启动了之后实际上它会给
我们一个交互的操作环境
我们下面可以看到
这样一个操作的交互环境
当然它在这个过程中
已经去把相应的一些资源
进行了分配
那么启动spark交互环境之后
我们还是使用我们
在hadoop中使用过的
单词的这样一个文件
来对单词的文件
进行视频的统计
那么我们产生第一个RDD
通过spark的context
然后从分布式文件系统中
把这样的一个文件读取出来
好
我们现在已经有了一个RDD
叫做textFile
大家注意这个过程
实际上很快就完成了
因为它并没有真正的
发生什么运算
或者是io
因为它只是让spark记录了
我们要做出这样一个RDD
而这个RDD由于它没有action
所以基于它的这个
lazy execution的原则
那么这一步操作实际上
会在真正我们
发生action的时候
才会去完成
我们可以做一些什么样的事情呢
我们首先把这个加法导入进来
因为我们后面reducer中
是需要用到的
然后我们会形成一个新的RDD
这个新的RDD是会对
这个行进行一个分割
那么有点像我们在map里面做的
单词的一个行的一个切分
得到一系列单词
那我们得到了单词的RDD
那么我们得到单词RDD之后
我们进一步
去进行一个新的RDD的生成
我们把它叫做maprdd
实际上这一步
我们会把刚刚的一系列单词
输出成单词 1这样的
新的这样一个数据集
叫做maprdd
同样这些操作也是非常快的
那么有了maprdd之后
我们可以进一步去尝试着模拟
mapreduce的reduce过程
我们形成一个reduceRDD
这步做的操作就是把
我们有的这个reducer
有的这个中间的这些
这个MAP RDD里面的
这个value进行叠加
叠加了之后
实际上就是我们统计好的
这个词频
那么有了这一步之后
实际上我们就可以
去简单的看一下我们
这个结果到底怎么样
现在我们用一个
我们用一个action
我们来提出前面的20个单词
和它的这个词频来看一眼
我们看看是什么样的情况
那么大家可以注意到
当我们敲这个命令之后
看起来好像spark
就已经开始工作了
的确它已经开始工作
因为take是需要
结果呈现在屏幕上的
那么用户执行
这个命令之后
实际上就已经触发了
之前我们做的
若干个transformation
那么最终要求把结果
投到屏幕上
那么我们可以通过这个进度看到
整个这个transformation和action执行的
这样一个进度的一个情况
那么当这个进度执行完了之后
实际上我们就可以
拿到这样一个结果
那么这边大家可以看到
它切分的一个情况
实际上我们通过
Context的一些控制
我们可以让
RDD的一个控制
我们可以让这个RDD
它并行的一个情况
以及它partition的一个情况
对它进行一个优化
比如说在我们的任务中
也许我们通过partition的优化
可以达到提高整个
action速度的一个目标
这个大家需要
在后面实际的运行中去尝试
好
我们刚刚的take
已经拿到了
我们拿到了前20个单词的
这样一个输出
那么我们现在
如果还想再进一步看多一些
我们想看前50个
我们只需要take 50
那我们发现take 50之后
这个过程就变得快多了
那这也依赖于这个spark
对in memory的RDD做了优化
我们的结果已经在内存中出现
所以它可以直接在
屏幕上得到呈现
当然如果大家需要
对RDD显示的要求
他在内存中得到缓存的话
大家是可以用相应的命令
把RDD调整到内存当中
那这样的话你可以满足
比如说我们对关键词
进行搜索这样的目标
达到一个快速的一个效果
-What is big data and what is big data system?
--Video
-Problems in big data systems?
--Video
-Overview of the course
--Video
-Principles of big data system design
--Video
-Manipulating Data on Linux
--Video
--Video
--Video
-Basics of Linux Data Processing--Manipulating Data
-Running Commands on a Single Machine
--Video
-Running Commands on a Single Machine--作业
-Using a Linux Cluster
--Video
-Using a Linux Cluster--作业
-Storage for Big Data Computing: Distributed file system
--Video
-Storage for Big Data Computing: Distributed file system--作业
-File system and GFS
--Video
-File system and GFS--作业
-Understanding HDFS using Legos
--Video
-Understanding HDFS using Legos--作业
-File System Implementation and DFS
--Video
--Video
-File System Implementation and DFS--作业
-What is MapReduce and why
--Video
-What is MapReduce and why
-Learn MapReduce by playing with cards
--Video
-Processing pattern
--Video
-Processing pattern--作业
-Hadoop
--Video
-Hadoop--作业
-Algorithms in MapReduce
--Video
-Algorithms in MapReduce--作业
-Tutorial
--Video
-Background
--Video
-Background--作业
-Spark
--Video
-Spark--作业
-Use Spark for data mining
--Video
-Use Spark for data mining--作业
-Spark data processing
--Video
-Spark data processing--作业
-Experiment in Spark
--Video
-Experiment in Spark--作业
-Introduction to streaming data processing
--Video
-Introduction to streaming data processing--作业
-Storm
--Video
--Video
--Video
-Storm--作业
-Spark streaming
--Video
--Video
-Spark streaming--作业
-NoSQL introduction
--Video
-NoSQL introduction--作业
-Common Advantages
--Video
-Common Advantages--作业
-Bigtable
--Video
-Bigtable--作业
-Master Startup
--Video
-Master Startup--作业
-HBase
--Video
-HBase--作业
-What is GraphDB and Graph data processing
--Video
-What is GraphDB and Graph data processing--作业
-Graph systems
--Video
-Graph systems
-Example of a GraphDB
--Video
-Example of a GraphDB--作业
-Mahout
--Video
-Mahout--作业
-Case Study: Recommendation
--Video
-Case Study: Recommendatio作业
-Recommendation in Mahout
--Video
-Recommendation in Mahout--作业