当前课程知识点:大数据系统基础 > 7. 流计算 > 授课视屏 > Video
我们现在介绍
一个流计算系统的实例叫做S4
S4是
Simple Salable Streaming System
它首字母正好是4个s
所以叫做S4
它提供了一个非常简单的
流处理的一个编程接口
这个接口和Mapreduce类似
都是处理这个key-value pair的
那么它提供了一个
有限容错的功能
系统结点出错后
会重新的在这个
备用结点上启用进程
但是如果出错的话
当前的这个进程状态会丢失
但是S4它支持非协调式的检查点
也就是说每一个结点
可能保存自己的状态
就是你可以在出错后
恢复到这个你保存的这个状态
但是为什么叫做
非协调式
也就是说如果这样一个系统
中间是有消息传播的
那么这个结点
它保存自己的状态到硬盘上
这个结点也
保存自己的状态到硬盘上
我们把这个叫checkpoint1
这个叫checkpoint2
这两个之间并没有做任何协调
也就是说 有可能出现什么状况
比如说这中间是一个消息
那个这个结点认为
它的消息已经发出去了
所以在它的c2里面
它这个结点是发出去的一个状态
而在这个C1里面
它还没有收到这个消息
所以这个里面可能还是
一个消息还没收到的状态
所以C1和C2之间
并不构成一个一致性的系统状态
因为在整个的系统状况里面
实际上是一个
这边就是
这个消息的接收并没有被确认
那么如果一个协调性的状态
应该是C2已经确保说
这个消息已经收到了
那么C1认为
这个消息收到的情况下
C2才能认为这个消息是发出去的
那目前并不是这样的
S4并不提供这样的一个功能
S4还有一个设计的决策
是说在运行期间
不能够增加或删除系统结点
所以这个实际上
对应我们刚才讲的这个扩展性
实际上也是一个问题
等于说你不能够动态地
增加或删除系统结点的话
你就不能够根据负载做这种
非常动态的系统能力的调整扩展
S4的处理模型
提供了一个非常经典的
这个定型计算的模型叫做Actor
Actor这个模型它通过PE
它定义了这个PE
就是Processing Element
也就是实际上是做计算
跟我们刚才的那个worker的
那个概念是像的
那么PE之间通过event进行通信
PE之间的状态是互不可见
比如说这里面
PE里面写了一个这个N=5
那么在另外一个PE里面
它并不能够知道这个N=5
并不知道这件事情
由S4的框架负责产生创建这个PE
还有这个消息路由
也就是说
这个PE它想把消息传给这个PE
那么这个事情是
由这个框架来完成的
那么对于这个PE来说
它只需要能够通过
命名来对这个目标的PE
进行编制就好了
S4的设计它的输入是一个
就是keyvalue的这样一个流
也是叫keyAttribute的
这样一个流
也就是这样
那么你可以想象它有KA
还有ka1等等一直到这边
就很多这样的pair
也输入到S4的系统当中
产生这个中间结果
它的结果
也有可能去输出了另外一个流
那么k撇a等等
所以 你可以看成是一个输入流
或者是
当然它有可能不输出一个流
也有可能输出一个数字
或者一些非流的内容也是可能的
我们从这个例子来看一下
S4的这样的一个基本的框架
那么这是一个处理说
在这个文档里面
出现的词
词频最高的这个(4:12)
叫做topK
那么topK
首先输入的
是一个因为我们说输入的都是
key value pair
那么对第一个P1
它实际上是一个分词
和计数的这样一个PE
那么对于它来讲
输入的是一个句子
没有K就只有value
这个value就是这个
I meant what i said
and i said what i meant
这样一个东西输入给PE
它输出是什么
它会把这里面的首先做分词
然后它会对每一个词做统计
那么可以看到这个里面
这个said出现了几次
said出现了两次 对吧
所以它的一个输出就是说
said输出一个word said
然后count是2
那么我们还可以看到
这里面这个I
I一共出现了四次 对吧
那么所以它会输出一个消息
这个消息的key
key是这个word就是这个I
然后count是4
那么输出了这样一个消息
当然同样的它
还要输入这个meant
还有what等等这些
我们就不重复举例了
那么在它输出了这样一个
key和这个value以后
它输出给谁
那么它输出的是以这个key作为
一个路由的在这里面
那么也就是说这个P2在这层
这层呢实际上保留了
这层的名字叫做wordcountPE
那么这一层
它保留了过去
就是我们刚才讲的
这只有这一句话对吧
那么这个
这一层里面
这个wordcountPE
它保留了过去我们
还输入过很多句
那么每一句话
都会有类似的这样的消息到来
那么这个PE2里面
在这个said到来之前
它实际上里面存了一个
就是说在个过去的已经
输入过的这些句子里面
这个said一共出现了多少次
那么比如说过去一共出现了7次
那么在大家接到了这个消息以后
它就会把这个7再加2
也就是说输出9次
一共这个said在
迄今为止所有的文档里面
输入了9次
所以这个时候
它就会也输出一个消息
这个处理完了以后
输出一个消息呢
给下面一层的PE
叫做sortPE
那么这个里面
大家可以看到它的输出
叫sortID等于2
然后呢value是word等于said
count等于9
也就是说大家可以去要注意的
就是说在S4这个系统里面
这个key的功能是非常重要的
这个key实际上是它一个
在PE之间做路由的
这样的一个地址
也就是说sortID是2这件事情
就使得它首先去找这种
sortPE这种类型的这个PE
那么它这些PE有这么多个
那么它找哪个
它这个ID是2的这样一个PE
它去找这样一个PE
那么同样的道理
从这个PE1到PE2
这层的这个转换
大家可以看到所有的key都是word
实际上这样的一个结构
隐含了一个什么样的结果
也就是说 实际上对每一个词
对每一个词
比如说对said 对这个I
实际上都有一个对应的PE
因为我们可以看到
这个PE里面 它的状态
实际上只有它的一个count 对吧
所以说这个里面它并不管多个词
每一个词都会有一个对应的PE
好那么我们
我们现在来看PE5
PE5起到了一个什么作用
PE5实际上大家可以想像
它存了一部分词的计数
也就是说除了这个said以外
可能还有一些其它此的计计数
那么PE7
它存了另外一些词的计数
PE5和PE7
还有PE6它们之间的
这个词的计数
他们之间都是不会重叠的 对吧
根据我们现在的这样一个结构
因为PE2 PE3 PE4这一层
每一个PE它所对应的
这个词都是不一样的
而每一个PE2这一层
对PE5这一层又只有
它是一个多对一的这样一个关系
一个PE2只能对一个PE5
它不会再去对PE6了
所以在PE5 PE6 PE7里面
这些词是不会重叠的
那在这一层sortPE
就是PE5-PE7的这一层里面
它实际上会对它这里面
比如说它这里面有said是9
然后meant
假如说也在它这里面
这里面可能出现6次
可能还有些其它的词
那么在它这里面
它会对里面所有的
它里面包含的所有的这些个词
根据它的词频做一个排序
而输出在阶段性的
它并不是每一次
来了一个消息它就会输出
它会在一定的条件下
比如说 一定的时间内
或者是一定的消息数之后
多少个词到来以后
他会输出一次
那么输出什么东西
它输出的叫做partialTapKEV
的这么一个内容
那么这个内容是说在我这里面
词频出现在前K个的词
有哪几个
那么把它输入给PE8
那么同样的PE7 PE6
也干同样的事
那么PE8 它就是一个merge
那么也就是说
多个partialtopk到PE8里后面
把它整合成一个整体的topK
来输出
那么也就是说
整个的这样一个过程
大家可以看到每输入这样一句话
那么就通过这个wordcount
首先通过这个把它分词
并在这一句话里面计数
然后把它累加到
所有句话里面的计数
然后把它对应到
一个可以对部分的词进行排序
选出partialtopk的
这样的一个PE层
然后对最后再对应到一个
输出整体的这个topk
啊前k个词频最高的词
这样的一个工作
就可以通过这样的
一个S4定义的一个流计算框架
可能很好地完成
那么PE
实际上我们刚才已经讲到了
这个它的作用
它他是油配的代码和配置文件
两方面来定义的
一会我们可以看到一个例子
它处理的时间和类型
是一个(K,A)这样一个东西
这个key的作用
是非常重要的
我们刚才已经讲到
每个key的值对应一个PE
那么在我们这个例子里面
每个wordceunt里面
在wordcount里面如果遇到一个词
它就一定会有一个对应的PE
那如果遇到一个新词会怎么样
那系统实际上会创建一个新的PE
当然大家可以想象
在这样的一个
这样的一种模式里面
PE实际上会产生
非常非常多的PE 对吧
那如果系统内存不够了 对吧
如果长期地执行
总会出现这种情况
内存可能会不够了
不够的话
那实际上需要对PE进行垃圾收集
因为有可能很多用过的PE
后面很长时间都不用了
或者已经也不会再用了
那这个时候怎么样去处理它
比如说
是不是一段时间内不用了
我们就可能来做垃圾收集
还是
但是有的PE又是有状态的
比如说这个
我们里面存了这个
said它已经出现了7次
像这种PE
你又不能很简单地
去把它抛弃掉
因为抛弃掉了以后
这个词就坏掉了 对吧
这个它已经存在的状态
就损失掉了
这是一个比较复杂的问题
我们不再我们的课程里面
进行仔细的介绍
如果大家有兴趣的话
可以去参阅相关的文档
那么在物理的实现上
实际上S4
它会还是会在实际的结点上
机器的结点上面去执行
那么它有一个
叫做ProcessingNode的概念
当然这还是一个逻辑的概念
那么我们刚才讲到的PE
它们到底是怎么放的
实际上就是多个P
在这个
多个PE 它可能被一个
processingelementcontainer所包含
那么每一个
processingelementcontainer
又属于一个processingeode
这个processingeode里面
它实际上有这个EVENTLISTENER
然后可以听这个事件
然后听到这个事件以后
它可以把它转给这个适当的PE
那么也有
如果PE要发消息给别人
也有一个DISPATCHER
然后后面决定说
它发到那里到哪里去
那么需要注意的一点
是说S4的路由
是先到PN
processingeode再到PE的
那么PN到物理结点的映射
是不可以修改的
所以就是说如果某个结点坏掉了
那么我们可以新产生一个结点
然后把PN对应的物理结点
把它改掉
所以就可以从这种方式
就可以容错
另外这个processingnode这个S4
它还采用了zookeeper
这也是HODP里面
非常重要的一个分布式组件
来保存这个全局的信息
协调结点的行为
这里面呢
实际上就是一个例子的S4的程序
这个程序
大家其实可以看到个就是
这是一个query query count
大家可以看一个是
对于这个key啊
比如说
query实际上
大家就可以当做一个词
大家可以看这句话
query等于这个getkey value
get(0)也就是说这个实际上
就可以把这个query
当做一个query stream
或者是 就是查询的字符串
那么它实际上是说
这个query有多少次
出现了多少次
那么每一次
一个这个 我们知道每个query
实际上它都会对应一个
这个独立的PE
那个这个PE的代码就写在这里面
那么大家可以看
这个querycount++
也就是说
如果对于我们这个querycountPE
如果有一个消息
key是这个query的话
key是某一个querystream话
我到这个PE的话
它里面的这个还有个count
querycount是它的状态
那么每次到它的时候
它就把它加1
这个功能就是这么简单
而且它非常巧妙地
利用了它每一个key
会对应一个PE的这样一个特点
所以你要计算
每一个querykey它出现了多少次
那么非常简单
只要把她里面的这个querycount
也就是一个(14:44)的一个变量
把它加1就好了
那个这个变量什么时候会输出
实际桑他在这个配置文件里面
给了一个定义
output frequency by
time boundary vabe=600
这个单位呢实际上是秒
也就是说
这个OUTPUT函数
每600秒调用一次
输出
输出实际上就是说到现在为止
这个querystream被
这个querystream
一共出现了多少次
实际上就是这样一个例子
-授课视频
--什么是大数据
--大数据典型应用
--大数据的特点
--大数据技术体系
--大数据生态系统
--大数据技术挑战
--课程内容
-1. 绪论--Quiz 1
-授课视频
--2.2并行化理念
--2.9计算虚拟化
-2.云计算--Quiz 2
-授课视频
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
-3.文件存储--Quiz3
-授课视频
--4.13类似框架
--4.14章节总结
-4. 处理框架--Quiz4
-授课视频
-5.内存计算--Quiz5
-授课视频
--数据副本及一致性
--节点本地数据存储
-6. NoSQL--Quiz6
-授课视屏
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
--Video
-7. 流计算--Quiz7