Storm并行度详解
##基本概念##
Spout是数据源;
Bolt封装了数据处理逻辑;
Worker是工作进程,一个工作进程中可以含有一个或者多个Executor线程;
Executor是运行Spout或者Bolt处理逻辑的线程;
Task是Storm中的最小处理单元,一个Executor中可以包含一个或者多个Task,消息的分发都是从一个task到另一个task进行的;
Stream Grouping定义了消息分发策略,定义了Bolt节点以何种方式接收数据;
Topology就是由消息分组方式连接起来的Spout和Bolt节点网络;
##关系##
Worker由Supervisor负责启动,一个Worker中可以含有一个或者多个Executor线程,每个Executor线程都会启动一个消息循环线程,用于接收、处理和发送消息,当Executor收到其下某一task的消息后,就会调用该Task对应的处理逻辑对消息进行处理。
每个executor只会运行1个topology的1个component(spout或bolt)的task(注:task可以是1个或多个,storm默认是1个component只生成1个task,executor线程里会在每次循环里顺序调用所有task实例)。
supervisor和node是一一对应的关系,而worker就是process(进程),executor就是thread(线程),task就是在spout或bolt中定义的函数。
##storm的并行度解释##
下图为上述概念的关系图
下面是storm并发度的一个计算方式。
其中是一个包含有两个 worker 进程的拓扑。其中,蓝色的 BlueSpout 有两个 executor,每个 executor 中有一个 task,并行度为 2;绿色的 GreenBolt 有两个 executor,每个 executor 有两个 task,并行度也为2;而黄色的 YellowBolt 有 6 个 executor,每个 executor 中有一个 task,并行度为 6,因此,这个拓扑的总并行度就是 2 + 2 + 6 = 10。具体分配到每个 worker 就有 10 / 2 = 5 个 executor。
并行度是基于线程数量来确定的,线程数被平均分配到Worker进程中。
提高storm并行度的方法:增加work进程,增加executor线程,增加task实例
##storm的并行度设置##
###配置worker数量###
增加额外的worker 是增加topology 计算能力的简单方法。为此Storm 提供了API 和修改配置项两种修改方法。无论采取哪种方法,spout 和bolt 组件都不需要做变更,可以直接复用。
1 配置文件storm.yaml
storm.yaml中,如下指定了worker进程的端口,以及当前机器下能运行的work数量,每个端口用于对应进程对外通讯的。1
2
3
4
5supervisor.slots.ports: // 指定storm通讯端口
- 6701
- 6702
- 6703
- 6704
默认的worker是4个,同时需要注意一个Slot的概念,Slots的数目默认是跟worker的允许端口数相同,但是如果机器是超线程的,那么Slots数目也会翻倍;
2 代码中配置
worker进程数量也可以通过代码设置。(优先级更高)1
2Config stormConf = new Config();
config.setNumWorkers(workers);
###配置executor数量###
如何每个组件需要的执行线程数?
代码中设置1
2builder.setSpout(id, spout, parallelism_hint) // parallelism_hint设置spout的线程数量
builder.setBolt(id, bolt, parallelism_hint) // parallelism_hint设置bolt的线程数量
注意,不设置的话,storm中默认是每个组件(spout或bolt)一个executor;
例如:1
2
3
4
5
6
7
8
9
10
11
12TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",
new RandomSentenceSpout(), 2);
builder.setBolt("split",
new SplitSentence(),2)
.shuffleGrouping("spout");
builder.setBolt("count",
new WordCount(),4)
.shuffleGrouping("split");
builder.setBolt("report",
new ReportBolt())
.globalGrouping("count");
其中spout为2,split为2, count为4,report为1
###配置task数量###
task是通过 spout/bolt的声明中setNumTasks(num)设置对应spout/bolt的task个数。
代码中设置1
2
3
4
5
6
7
8
9
10
11
12
13
14
15TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout",
new RandomSentenceSpout()
.setNumTasks(2), 2);
builder.setBolt("split",
new SplitSentence()
.setNumTasks(4),2)
.shuffleGrouping("spout");
builder.setBolt("count",
new WordCount()
.setNumTasks(4),4)
.shuffleGrouping("split");
builder.setBolt("report",
new ReportBolt())
.globalGrouping("count");
如果你在设置 bolt 的时候不指定 task 的数量,那么每个 executor 的 task 数会默认设置为 1。
##其他知识##
###worker调优###
Config类对象可设置如下参数,来调整worker进入数据,处理数据的容量大小:
1 | conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); |
###rebalance调整topology并行度
Storm 的一个很有意思的特点是你可以随时增加或者减少 worker 或者 executor 的数量,而不需要重启集群或者拓扑。这个方法就叫做再平衡(rebalance)。
有两种方法可以对一个拓扑执行再平衡操作:
- 使用 Storm UI
- 使用命令行如下
1
2
3
4## 重新配置拓扑 "mytopology",使得该拓扑拥有 5 个 worker processes
##另外,配置名为 "blue-spout" 的 spout 使用 3 个 executor,配置名为 "yellow-bolt" 的 bolt 使用 10 个 executor。
$ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
##参考##
- http://chengjianxiaoxue.iteye.com/blog/2188864
- http://www.superwu.cn/2015/05/22/2316/
- http://weyo.me/pages/techs/storm-translations-understanding-the-parallelism-of-a-storm-topology/
转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com