当前课程知识点:大数据系统基础 >  7. 流计算 >  授课视屏 >  Video

返回《大数据系统基础》慕课在线视频课程列表

Video在线视频

Video

下一节:Video

返回《大数据系统基础》慕课在线视频列表

Video课程教案、知识点、字幕

我们现在来介绍

另一个流计算的实例storm

Storm也是目前最为流行的

流计算系统

Storm首先它的数据类型呢

叫做tuple

它叫做named list of values

比如说在这个例子里面

它就是说Name 是chen

然后Age是40

实际上

这和这个key value

是非常接近的这样一种方式

Storm的基本概念呢

分为四个基本概念

第一个是stream

stream实际上就是

刚才讲到了这个tuple的

这样一个流

就是一个又一个的tuple流进来

第二个叫做spout

Spout类似于stream的一个源头

一个水龙头

大家用一个水龙头来代表

Bolt其实就是类似于刚才的

worker或者PE

当然storm里面的bolt

实际上和S4里面的PE

实际上是有有一些明显的区别的

我们一会儿会讲到这一点

那么最后呢

整个的这个东西叫做tOPology

实际上就是Storm的程序

就叫做topolopy

Stream里面

支持哪些数据类型

首先它支持

所有的primitiVw type的类型

比如说整数

字符串呀等等

那么字符串和字节数组

都可以作为tuple和value的类型

然后用户也可以

自己定义自己的对象

作为value的类型

只要你自己去实现

它的序列化方法就可以

Bolts实际上是主要的这个处理单元

那么它可以有这样几种

一个是filters对吧

一个流进来

把一些符合条件的

把它再输出出去

这就是filters的一个基本概念

另外呢像aggregation

就是有点类似于

我们刚才说的wordcount之类的

那么你流进来一些东西

我把它的统计的

把它聚合的信息

把它流下来就可以了

Joins就类似于说我们有两个流

流进来

我们可以把它变成一个流

流出去

bolts也可以访问数据库

那么像刚才我们说的那个worker

它要去这个

解数据库

把里面的这个对应的值更新

或者读取都是可以的

然后bolts里面

也可以运行自定义的函数

所以它的功能相当的强大和

丰富的

刚才其实提到了一个

很重要的问题

实际上就是说

处理速率的问题

比如说一个bolts和这个storm

如果说这样流出来的速率是1

我们处理的速度2

实际上我们一定要保证这个

2要大于这个1的

那么具体到每一个bolts

实际上它里面

都有一些实际的处理过程

那么如果这个处理过程

相对来说比较复杂的话

我们就不仅仅需要一个进程

或者一个线程

来做这个处理

而需要并行的需要多个

来完成同一个bolts的功能

所以我们这里就看到

这个bolts里面有三个这个元素

可以并行的来执行

那么这个bolts里面有两个

那直观上有另一个问题

实际上就是一个连接的问题

在我们讲这个

worker+queue模式的时候

其实我们就提到了

如果有些这个连接

它希望考虑到这个数据库的

这个不加锁的这样一个特征的话

并不能够让所有的这个bolts之间

并行元素之间可以

做完整的连接

有可能你希望做某一种连接

也就是说这个bolts里边的

并行的元组和另一个bolts里面

它们之间怎么样相连

这本身是一个

是一个问题

在这个strom里面呢

它实际上定义了这样几种

一种是随机的

也就是说一个可以连接给任意一个

Stream grouping

这个其实就有点

刚才我们讲的那个问题

也就是说我们可以考虑根据一个

hash这个方式

来选取我们要如何

把一个这个bolts里面的东西

连到另外一个东西上面

还有呢all grouping

要发给这个就是

这个里面所有的任务

还有一些global grouping

等等

不同的策略可以方便的

采用一种描述性的方式

而不需要你在代码里面

进一步去编写这个连接的方式

而采用这个描述性的方式

升值性的方式就可以做到

这种任务之间的这种连接

那么这里面呢

实际上就可以举一个

storm程序的例子

这个例子首先呢

你可以看到说它

它首先做了一个Topology builder

也就是实际上

就是这个大的程序这个storm程序

storm程序它首先set一个spout

这个spout
就是这个水龙头

是这个消息就从它这流出来了

那么它说

它这个spoutde 代码是这样的

它有十个

相当于这个spout

有十个这个圆圈在这

那么后面呢

它是一个exclamationbolt

是一个三个exclaim

1

3

然后这个呢

Setbolt这个exclamationbolt2

这样

然后呢后面呢

Shufflegrouping words

这个Shufflegrouping words

是这个之间的这个Shuffle

也就是说这个spout

这十个都可以和这个exclamation1

的这个

三个可以做一些这个任意的一个连接

那么exclamation2

实际上它可以

前面它可以Shufflegrouping exclamation1

就是说

其实我们把它挪到这来更合适

就是可以这样的连接

也就是说

这叫做Shufflegrouping

这样就定义了一个整个

storm这样的一个结构

这个就是刚才讲的这个

Test word spout

就是刚才这个我们生成

生成这个word

应该怎么样去生成这个数组

就是实际上

刚才我们讲到的这个spout

就是这个spout这部分

它是怎么产生消息的

实际上就是它

它里面有一个数组

然后它随机的从里面去

发射一个词出去

也就是说从这里面随机的

产生一个词出去

exclamationbolt

实际上它做的是什么事情呢

它也很简单

它就输出了一个

它的输入是这个tuple

也就是说这个exclamat

它输入一个tuple

输出的是一个tuple

里面的stream

刚才我们不是说

看到说这个里面我们可能会输入一个

比如说输入一个Mike

它会把这个Mike

把它读进来

这个tuple输入以后呢

它输出的是什么呢

它输出了一个

一个是这个输入的

这个tuple本身

这个我们叫做一个tuple

对吧

然后它输出的是什么呢

输出的是这个输入的这个tuple

然后和一个这个Mike

这里边可能需要注意的是说

为什么它这要emit这个tuple

这个东西是吧

实际上从原理上来讲

它直接去输出一个这个

Mike叹号就行

这个地方主要问题呢

实际上和后面的讨论有关

实际上和容错有关的

实际上我们注意到

在这个地方

Collectoracktuple

也就是说在一个tuple

在这个bolt

被处理完了以后

它可以发一个

Ack的这个消息

那么这个时候这个系统

就知道说

这个tuple

已经被这个exclamation

这个bolt已经被它处理完毕了

这个我们后面在讲

容错的时候

会讲到这个方式的这个重要性

这是一个

在storm里面的

的一个wordcount的一个 实例

那么在这个例子里面呢

大家可以看到说

这个wordcount

它首先内部的一个变量

它是一个map

也就是说

它这里面实际上相当于

它里面放了一个表

放了一个表

那么这个表呢

比如说这个是I

这是4

那么word34等等

也就是说

这个东西在它的这个类的实现里面

它并不是给每一个词

我们还能回想到S4系统里面的那个PE

PE里面呢这个它对每一个词

比如说I

它就有一个PE

这里面是四

然后word又有一个PE

是3

那么在这个PE里面

它实际上很简单写这个代码

它如果要加一的时候

它直接在PE的代码

里面写count++就可以了

而在这个storm的这个里面

由于它实际上并没有给

每一个key产生一个PE

也就是说它的一个bolt里面呢

实际上包含了

很多的这个信息

那么在例子里面

实际上包含了很多词的

这样一个计数

所以呢

它需要做什么事呢

它需要从这个tuple里面

把这个key先找出来

然后呢

它到这个hash tuple

里面

这个里面word

找出来的是I

然后它需要到这个

到这个里面getword

去找这个东西

去countgetword里边

那么这个时候

会返回一个4

返回一个4

那么如果这个

这个返回是no

就说明它这个

原来它这个表里面没有

就让它等于0对吧

这个count++

如果这里面是有的

那么我们说再把它写回去

实际上把它再改写成

这样的一个状况

那么这个时候它再去

Emit一个新的tuple

这个tuple的key呢

还是这个word就是I

然后count15

把它输出出去

就这样的一个方式

所以也就是说在

Storm的处理的模型里面

这个bolt

它的这个做法

它内部的这个状态

就是它并不给每一个key

产生一个PE

和S4的这种方式是不一样的

所以它相应的

这个代码的这个编写

也会要复杂很多

运行STORM呢

这里面给出了它的

一个运行的这个方式

那首先要产生一个这个

Cluster这个(10:56)

然后产生一个conf

然后然后呢这个topology debug

是说这样一个执行呢

是一个调适模式的执行

当然还有实际的这个其它的执行方式

那么Cluster submit topology

这样的一个方式

就是最后去执行

这样的一个storm程序

好 那么我们前面一直提到这个

在流处理器中的容错问题

那么storm

它是怎么处理容错的呢

首先storm

它提供了一个方式叫做

一个消息会被处理

At least once

就是如果我们认为这是一个spout

输入的这样一个消息

The cow jumped over the moon

那么它会保证这个消息被处理至少一次

那么它是怎么做到这一点呢

实际上它有一个叫做消息处理完成

这样一个功能

也就是说像这样一个消息

它可能首先被分词

(12:04)

然后再输入到这个计数的

这个里面去

那么我们还记得刚才的这个例子里面

我们都还有一个叫tuple.ack

这样一个调用

那么通过这样的调用呢

实际上就是可以知道说

这一条消息

如果所有的这些

所有这部分都执行完了

所有的这个都调用了这个ack

这样一个函数以后

我们就可以知道

这一条消息被处理完毕了

那么相应的如果说

这个消息没有完整的

被处理完毕

比如说这些都执行完了

那么在执行这个的时候

这三项可能没有被执行

那这个时候在一段时间内

如果还没有得到这三个的ack

都没有得到的话

那么有一个time out的时间

就是说如果在一个给定的

Timeout

的时间里面没有收到这样几个

ack的话

那么storm系统就判断说

这一条消息没有被完整的执行

所以它会重复的

像这个前面还有queue的一个系统

会向前面再要这个消息

然后这个消息

会输入到这个storm的这个系统

再执行一次

那这个时候大家其实已经可以看到了

这个中间的

就是这个storm的这个模型

实际上是有问题的

那么它这个

At least once

这件事情呢

实际上你还是说

那么这个消息再执行一次

实际上这个地方的count

可能已经被加过1了

就这个the cow 和jumped

在前面一次可能已经被加1了

那么再执行一次的话

实际上他们就被多加了

实际上这就会引入一个错误

当然这个错误是不是可以容忍

这个是和实际业务相关的

但是这确实不是一个

用户真正希望的一个方式

那么怎么样处理这个问题呢

有几个提出了一些方法

比如说第一个方法叫做

给每个tuple编号

那么如果我们每次

只处理一个tuple

那么这一个tuple

如果没有完整的实现

我们假设说刚才说的这计数的问题

他们最后都会写数据库这里边去

那么如果写数据库

那么如果最后写数据库这件事情

每次我们只处理一个词

比如说word(I,1)

这样

如果这个东西写进去了

那么 那么这个时候

我们就算这个tuple处理完毕 了

我们就不会再去处理这个tuple

那么我们再去处理

处理下一个tuple

二tuple3

等等

也就是说通过每次只处理一个tuple

而最后有一个commit

有一个

就是向数据库的一个提交

如果这个提交没有完成

那么我们就说这个整个的

这个没有成功的处理

那么我们就再执行一次

那么由于没有向数据库提交

所以它整个的状态并没有变化

再重新执行也不会引入新的问题

这种方法

给每一个tuple

进行编号的这种方法

实际上是可以达到我们所期望的

就是说一个消息被处理一次

且仅被处理一次

这样的一个

一个目标的

但是这个方式同时

也会引入一个巨大的开销

因为相当于在一个这个storm

这样一个程序里面

同时只能处理一个tuple

实际上会是一个效率

比较低的这样的一个事情

那么比这个好一些的呢

实际上就是一个批处理

也就是说我们把一些tuple

把它打成一个包

这是第一个包

第二个包

第三个包

那么对这一批这个tuple

都进行这个

都进行这些处理

那么最后如果这

所有的都处理成功

而且都写入到这个数据库里面

把这个状态改变的话

那么这个batch

这一批就算成功了

那么如果这个不成功呢

那这一批所有中间做过的事情

都可以被放弃掉

那么我们重新再处理

batch这个事情

重新处理这一批东西

那这种方式呢

应该说比前一种方式呢

又好了一些

也就是说它一次

可以在这个bolt上面

可以同时处理很多的任务

效率可以提高很多

但是即使是这样呢

实际上还是会有很大的的效率的损失

所以storm它采用的方法

实际上它是区分了bolt中

的计算和提交的部分

计算的部分实际上

你可以认为它是一个无状态的

就是它并不会重复执行

并不会引入这种状态的不一致

而提交的部分呢

它实际上会改变整个系统状况的

那么这一部分呢

就不能够并行的执行

所以它实际上是可以说

提交部分还是要按照这个编号

串行来执行

而计算部分

可以采用这种流水线的并行

这样的话就能够达到一种

既提高效率

又可以达到这个我们希望的

可以容错的这样一种效果

大数据系统基础课程列表:

1. 绪论

-授课视频

--什么是大数据

--大数据典型应用

--大数据的特点

--大数据技术体系

--大数据生态系统

--大数据技术挑战

--课程内容

-1. 绪论--Quiz 1

2.云计算

-授课视频

--2.1大数据和云计算关系概述

--2.2并行化理念

--2.3规模经济理念

--2.4从仓库规模计算机到云

--2.5云计算商业模式概述

--2.6云计算带来的价值

--2.7云计算的分类

--2.8虚拟化技术概述

--2.9计算虚拟化

--2.10网络虚拟化:基础

--2.11网络虚拟化:软件定义网络

--2.12软件定义网络实现

--2.13存储虚拟化:用户接口

--2.14存储虚拟化:分布式存储实现方式

--2.15虚拟化技术总结

--2.16OPENSTACK

--2.17云计算小结

-2.云计算--Quiz 2

3.文件存储

-授课视频

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

-3.文件存储--Quiz3

4. 处理框架

-授课视频

--4.1大数据的处理框架

--4.2MapReduce编程模型

--MapReduce执行过程

--4.4MapReduce数据流

--4.5MapReduce性能优化与容错

--4.6Hadoop

--4.7MapReduce总结

--4.8Pig Latin

--4.9Pig Latin语法

--4.10Pig Latin 嵌套数据类型

--4.11Pig Latin 实现与优化

--Pig Latin 实现与优化(2)

--4.13类似框架

--4.14章节总结

-4. 处理框架--Quiz4

5.内存计算

-授课视频

--5.1内存计算概述

--5.2并行计算挑战

--5.3并行计算的局限性

--5.4大数据处理并行系统

--5.5内存计算需求

--5.6MapReduce文件传递数据

--5.7内存计算的可行性

--5.8内存层次的延迟

--5.9内存计算实例-spark

--5.10SPARK-RDD

--5.11大数据并行系统

--5.12Spark编程接口

--5.13Spark编程实例——Log挖掘

--5.14Spark编程实例——WorkCount

--5.15Spark实现技术

--5.16复杂的DAG示例

--5.17RDD性能的提高

--5.18Spark应用和生态环境

--5.19Spark的局限性

-5.内存计算--Quiz5

6. NoSQL

-授课视频

--NoSQL与Cassandra

--数据模型、接口、语言

--系统架构与Gossip协议

--一致性哈希与数据分区

--数据副本及一致性

--节点本地数据存储

-6. NoSQL--Quiz6

7. 流计算

-授课视屏

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

--Video

-7. 流计算--Quiz7

Video笔记与讨论

也许你还感兴趣的课程:

© 柠檬大学-慕课导航 课程版权归原始院校所有,
本网站仅通过互联网进行慕课课程索引,不提供在线课程学习和视频,请同学们点击报名到课程提供网站进行学习。