当前课程知识点:大数据系统基础 > 7. 流计算 > 授课视屏 > Video
我们现在来介绍
另一个流计算的实例storm
Storm也是目前最为流行的
流计算系统
Storm首先它的数据类型呢
叫做tuple
它叫做named list of values
比如说在这个例子里面
它就是说Name 是chen
然后Age是40
实际上
这和这个key value
是非常接近的这样一种方式
Storm的基本概念呢
分为四个基本概念
第一个是stream
stream实际上就是
刚才讲到了这个tuple的
这样一个流
就是一个又一个的tuple流进来
第二个叫做spout
Spout类似于stream的一个源头
一个水龙头
大家用一个水龙头来代表
Bolt其实就是类似于刚才的
worker或者PE
当然storm里面的bolt
实际上和S4里面的PE
实际上是有有一些明显的区别的
我们一会儿会讲到这一点
那么最后呢
整个的这个东西叫做tOPology
实际上就是Storm的程序
就叫做topolopy
Stream里面
支持哪些数据类型
首先它支持
所有的primitiVw type的类型
比如说整数
字符串呀等等
那么字符串和字节数组
都可以作为tuple和value的类型
然后用户也可以
自己定义自己的对象
作为value的类型
只要你自己去实现
它的序列化方法就可以
Bolts实际上是主要的这个处理单元
那么它可以有这样几种
一个是filters对吧
一个流进来
把一些符合条件的
把它再输出出去
这就是filters的一个基本概念
另外呢像aggregation
就是有点类似于
我们刚才说的wordcount之类的
那么你流进来一些东西
我把它的统计的
把它聚合的信息
把它流下来就可以了
Joins就类似于说我们有两个流
流进来
我们可以把它变成一个流
流出去
bolts也可以访问数据库
那么像刚才我们说的那个worker
它要去这个
解数据库
把里面的这个对应的值更新
或者读取都是可以的
然后bolts里面
也可以运行自定义的函数
所以它的功能相当的强大和
丰富的
刚才其实提到了一个
很重要的问题
实际上就是说
处理速率的问题
比如说一个bolts和这个storm
如果说这样流出来的速率是1
我们处理的速度2
实际上我们一定要保证这个
2要大于这个1的
那么具体到每一个bolts
实际上它里面
都有一些实际的处理过程
那么如果这个处理过程
相对来说比较复杂的话
我们就不仅仅需要一个进程
或者一个线程
来做这个处理
而需要并行的需要多个
来完成同一个bolts的功能
所以我们这里就看到
这个bolts里面有三个这个元素
可以并行的来执行
那么这个bolts里面有两个
那直观上有另一个问题
实际上就是一个连接的问题
在我们讲这个
worker+queue模式的时候
其实我们就提到了
如果有些这个连接
它希望考虑到这个数据库的
这个不加锁的这样一个特征的话
并不能够让所有的这个bolts之间
并行元素之间可以
做完整的连接
有可能你希望做某一种连接
也就是说这个bolts里边的
并行的元组和另一个bolts里面
它们之间怎么样相连
这本身是一个
是一个问题
在这个strom里面呢
它实际上定义了这样几种
一种是随机的
也就是说一个可以连接给任意一个
Stream grouping
这个其实就有点
刚才我们讲的那个问题
也就是说我们可以考虑根据一个
hash这个方式
来选取我们要如何
把一个这个bolts里面的东西
连到另外一个东西上面
还有呢all grouping
要发给这个就是
这个里面所有的任务
还有一些global grouping
等等
不同的策略可以方便的
采用一种描述性的方式
而不需要你在代码里面
进一步去编写这个连接的方式
而采用这个描述性的方式
升值性的方式就可以做到
这种任务之间的这种连接
好
那么这里面呢
实际上就可以举一个
storm程序的例子
这个例子首先呢
你可以看到说它
它首先做了一个Topology builder
也就是实际上
就是这个大的程序这个storm程序
storm程序它首先set一个spout
这个spout
就是这个水龙头
是这个消息就从它这流出来了
那么它说
它这个spoutde 代码是这样的
它有十个
相当于这个spout
有十个这个圆圈在这
那么后面呢
它是一个exclamationbolt
是一个三个exclaim
1
3
然后这个呢
Setbolt这个exclamationbolt2
这样
然后呢后面呢
Shufflegrouping words
这个Shufflegrouping words
是这个之间的这个Shuffle
也就是说这个spout
这十个都可以和这个exclamation1
的这个
三个可以做一些这个任意的一个连接
那么exclamation2
实际上它可以
前面它可以Shufflegrouping exclamation1
就是说
其实我们把它挪到这来更合适
就是可以这样的连接
也就是说
这叫做Shufflegrouping
这样就定义了一个整个
storm这样的一个结构
这个就是刚才讲的这个
Test word spout
就是刚才这个我们生成
生成这个word
应该怎么样去生成这个数组
就是实际上
刚才我们讲到的这个spout
就是这个spout这部分
它是怎么产生消息的
实际上就是它
它里面有一个数组
然后它随机的从里面去
发射一个词出去
也就是说从这里面随机的
产生一个词出去
exclamationbolt
实际上它做的是什么事情呢
它也很简单
它就输出了一个
它的输入是这个tuple
也就是说这个exclamat
它输入一个tuple
输出的是一个tuple
里面的stream
刚才我们不是说
看到说这个里面我们可能会输入一个
比如说输入一个Mike
它会把这个Mike
把它读进来
这个tuple输入以后呢
它输出的是什么呢
它输出了一个
一个是这个输入的
这个tuple本身
这个我们叫做一个tuple
对吧
然后它输出的是什么呢
输出的是这个输入的这个tuple
然后和一个这个Mike
这里边可能需要注意的是说
为什么它这要emit这个tuple
这个东西是吧
实际上从原理上来讲
它直接去输出一个这个
Mike叹号就行
这个地方主要问题呢
实际上和后面的讨论有关
实际上和容错有关的
实际上我们注意到
在这个地方
Collectoracktuple
也就是说在一个tuple
在这个bolt
被处理完了以后
它可以发一个
Ack的这个消息
那么这个时候这个系统
就知道说
这个tuple
已经被这个exclamation
这个bolt已经被它处理完毕了
这个我们后面在讲
容错的时候
会讲到这个方式的这个重要性
好
这是一个
在storm里面的
的一个wordcount的一个 实例
那么在这个例子里面呢
大家可以看到说
这个wordcount
它首先内部的一个变量
它是一个map
也就是说
它这里面实际上相当于
它里面放了一个表
放了一个表
那么这个表呢
比如说这个是I
这是4
那么word34等等
也就是说
这个东西在它的这个类的实现里面
它并不是给每一个词
我们还能回想到S4系统里面的那个PE
PE里面呢这个它对每一个词
比如说I
它就有一个PE
这里面是四
然后word又有一个PE
是3
那么在这个PE里面
它实际上很简单写这个代码
它如果要加一的时候
它直接在PE的代码
里面写count++就可以了
而在这个storm的这个里面
由于它实际上并没有给
每一个key产生一个PE
也就是说它的一个bolt里面呢
实际上包含了
很多的这个信息
那么在例子里面
实际上包含了很多词的
这样一个计数
所以呢
它需要做什么事呢
它需要从这个tuple里面
把这个key先找出来
然后呢
它到这个hash tuple
里面
这个里面word
找出来的是I
然后它需要到这个
到这个里面getword
去找这个东西
去countgetword里边
那么这个时候
会返回一个4
返回一个4
那么如果这个
这个返回是no
就说明它这个
原来它这个表里面没有
就让它等于0对吧
这个count++
如果这里面是有的
那么我们说再把它写回去
实际上把它再改写成
这样的一个状况
那么这个时候它再去
Emit一个新的tuple
这个tuple的key呢
还是这个word就是I
然后count15
把它输出出去
就这样的一个方式
所以也就是说在
Storm的处理的模型里面
这个bolt
它的这个做法
它内部的这个状态
就是它并不给每一个key
产生一个PE
和S4的这种方式是不一样的
所以它相应的
这个代码的这个编写
也会要复杂很多
运行STORM呢
这里面给出了它的
一个运行的这个方式
那首先要产生一个这个
Cluster这个(10:56)
然后产生一个conf
然后然后呢这个topology debug
是说这样一个执行呢
是一个调适模式的执行
当然还有实际的这个其它的执行方式
那么Cluster submit topology
这样的一个方式
就是最后去执行
这样的一个storm程序
好 那么我们前面一直提到这个
在流处理器中的容错问题
那么storm
它是怎么处理容错的呢
首先storm
它提供了一个方式叫做
一个消息会被处理
At least once
就是如果我们认为这是一个spout
输入的这样一个消息
The cow jumped over the moon
那么它会保证这个消息被处理至少一次
那么它是怎么做到这一点呢
实际上它有一个叫做消息处理完成
这样一个功能
也就是说像这样一个消息
它可能首先被分词
(12:04)
然后再输入到这个计数的
这个里面去
那么我们还记得刚才的这个例子里面
我们都还有一个叫tuple.ack
这样一个调用
那么通过这样的调用呢
实际上就是可以知道说
这一条消息
如果所有的这些
所有这部分都执行完了
所有的这个都调用了这个ack
这样一个函数以后
我们就可以知道
这一条消息被处理完毕了
那么相应的如果说
这个消息没有完整的
被处理完毕
比如说这些都执行完了
那么在执行这个的时候
这三项可能没有被执行
那这个时候在一段时间内
如果还没有得到这三个的ack
都没有得到的话
那么有一个time out的时间
就是说如果在一个给定的
Timeout
的时间里面没有收到这样几个
ack的话
那么storm系统就判断说
这一条消息没有被完整的执行
所以它会重复的
像这个前面还有queue的一个系统
会向前面再要这个消息
然后这个消息
会输入到这个storm的这个系统
再执行一次
那这个时候大家其实已经可以看到了
这个中间的
就是这个storm的这个模型
实际上是有问题的
那么它这个
At least once
这件事情呢
实际上你还是说
那么这个消息再执行一次
实际上这个地方的count
可能已经被加过1了
就这个the cow 和jumped
在前面一次可能已经被加1了
那么再执行一次的话
实际上他们就被多加了
实际上这就会引入一个错误
当然这个错误是不是可以容忍
这个是和实际业务相关的
但是这确实不是一个
用户真正希望的一个方式
那么怎么样处理这个问题呢
有几个提出了一些方法
比如说第一个方法叫做
给每个tuple编号
那么如果我们每次
只处理一个tuple
那么这一个tuple
如果没有完整的实现
我们假设说刚才说的这计数的问题
他们最后都会写数据库这里边去
那么如果写数据库
那么如果最后写数据库这件事情
每次我们只处理一个词
比如说word(I,1)
这样
如果这个东西写进去了
那么 那么这个时候
我们就算这个tuple处理完毕 了
我们就不会再去处理这个tuple
那么我们再去处理
处理下一个tuple
二tuple3
等等
也就是说通过每次只处理一个tuple
而最后有一个commit
有一个
就是向数据库的一个提交
如果这个提交没有完成
那么我们就说这个整个的
这个没有成功的处理
那么我们就再执行一次
那么由于没有向数据库提交
所以它整个的状态并没有变化
再重新执行也不会引入新的问题
这种方法
给每一个tuple
进行编号的这种方法
实际上是可以达到我们所期望的
就是说一个消息被处理一次
且仅被处理一次
这样的一个
一个目标的
但是这个方式同时
也会引入一个巨大的开销
因为相当于在一个这个storm
这样一个程序里面
同时只能处理一个tuple
实际上会是一个效率
比较低的这样的一个事情
那么比这个好一些的呢
实际上就是一个批处理
也就是说我们把一些tuple
把它打成一个包
这是第一个包
第二个包
第三个包
那么对这一批这个tuple
都进行这个
都进行这些处理
那么最后如果这
所有的都处理成功
而且都写入到这个数据库里面
把这个状态改变的话
那么这个batch
这一批就算成功了
那么如果这个不成功呢
那这一批所有中间做过的事情
都可以被放弃掉
那么我们重新再处理
batch这个事情
重新处理这一批东西
那这种方式呢
应该说比前一种方式呢
又好了一些
也就是说它一次
可以在这个bolt上面
可以同时处理很多的任务
效率可以提高很多
但是即使是这样呢
实际上还是会有很大的的效率的损失
所以storm它采用的方法
实际上它是区分了bolt中
的计算和提交的部分
计算的部分实际上
你可以认为它是一个无状态的
就是它并不会重复执行
并不会引入这种状态的不一致
而提交的部分呢
它实际上会改变整个系统状况的
那么这一部分呢
就不能够并行的执行
所以它实际上是可以说
提交部分还是要按照这个编号
串行来执行
而计算部分
可以采用这种流水线的并行
这样的话就能够达到一种
既提高效率
又可以达到这个我们希望的
可以容错的这样一种效果
-授课视频
--什么是大数据
--大数据典型应用
--大数据的特点
--大数据技术体系
--大数据生态系统
--大数据技术挑战
--课程内容
-1. 绪论--Quiz 1
-授课视频
--2.2并行化理念
--2.9计算虚拟化
-2.云计算--Quiz 2
-授课视频
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
-3.文件存储--Quiz3
-授课视频
--4.13类似框架
--4.14章节总结
-4. 处理框架--Quiz4
-授课视频
-5.内存计算--Quiz5
-授课视频
--数据副本及一致性
--节点本地数据存储
-6. NoSQL--Quiz6
-授课视屏
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
-7. 流计算--Quiz7