Storm 源码分析——工具篇
工欲善其事,必先利其器。
在开始分析storm源码之前,一个好的阅读代码的环境是非常有帮助的。
storm的代码包括:核心功能的Clojure代码,对外接口的java 代码,RPC通信的thrift代码,bin中客户端命令的python代码等。
虽然可以在命令行使用vim直接阅读,但是对于对storm还不熟悉的像我这样的小白,我的建议是使用IDE效率会更高。
Intellij idea + Cursive
我的推荐是:Intellij idea + Cursive (一开始我是用的是Eclipse,但是对于Clojure的支持并不是很好,直到我发现了Cursive)
Intellij idea的安装可以直接从其官网下载:https://www.jetbrains.com/idea/download/
免费的Community版本功能就足够使用;
我是CentOS系统,下载的linux版本,下载后解压运行即可:1
2
3#Unpack the idea idea-15.0.3.tar.gz file using the following command:
tar xfz idea-15.0.3.tar.gz
#Run idea.sh from the bin subdirectory.
然后安装Cursive 插件:
运行idea,点击configure,再点击setting。
然后点击plugins:
再点击底下的Browse repositories,
然后搜索cursive,点击Install ,我这边因为已经安装过了,所以没有install的按钮:
安装后重启idea,进入Cursive会提示需要license, 去他的官网https://cursive-ide.com/申请一个免费的即可:
点击首页的Get a License按钮,如下填写后:
会将License发送到你的邮箱。
至此安装完成。
安装storm编译依赖
按照storm的开发文档https://www.zybuluo.com/yanhealth/note/281972的说明,storm的编译依赖python, ruby 和nodejs,需要预先安装:
The ruby package manager rvm and nodejs package manager nvm are for convenience and are used in the tests which run on travis. (ruby的包管理器rvm和nodejs的包管理器nvm是用于在 travis (https://en.wikipedia.org/wiki/Travis_CI) 上的测试的) 。可以用如下命令安装:
1 | curl -L https://get.rvm.io | bash -s stable --autolibs=enabled && source ~/.profile |
1 | wget -qO- https://raw.githubusercontent.com/creationix/nvm/v0.26.1/install.sh | bash && source ~/.bashrc |
安装好以后运行如下命令:
1 | rvm use 2.1.5 --install |
导入Storm-core项目
在官网下载storm的源代码并解压后,在idea点击import,然后选择storm源码目录下的storm-core:
然后下一步,选择Maven:
next,默认就可以:
然后选择安装的jdk的位置,我选的版本1.7,没有安装需要先安装jdk;
导入后会开始解决依赖问题:
然后添加一个maven的打包操作:
此处-DskipTests=true ,是指跳过打包时的测试阶段,因为我的版本中测试会失败。同时也可以节省时间。
最后运行打包命令,会在storm-core项目根目录下的target中生成jar包文件:
如何编译storm生成可以发布的binary版本
以下命令可以将storm编译后安装到本地的.m2库中1
mvn clean install # 可加上-DskipTests=true 跳过测试
如果你需要对storm源代码进行改动,并且希望生成binary版本的话,可以按照storm的Develper教程https://www.zybuluo.com/yanhealth/note/281972来操作:
1 | # First, build the code. 首先在storm根目录下install |
至此,基本上storm的整个环境配置完成。可以开始storm源代码的阅读和二次开发。
storm的代码结构
Java (storm-core/src/jvm/)
backtype.storm.coordination: 实现了DRPC和事务性topology里用到的基于Storm的批处理功能。这个包里最重要得类是CoordinatedBolt
backtype.storm.drpc: DRPC的更高层次抽象的具体实现
backtype.storm.generated: 自动生成的Thrift代码(利用这里folk出来的Thrift版本生成的,主要是把org.apache.thrift包重命名成org.apache.thrift7来避免与其他Thrift版本的冲突)
backtype.storm.grouping: 包含了用户实现自定义stream分组类时需要用到的接口
backtype.storm.hooks: 定义了处理storm各种事件的钩子接口,例如当task发射tuple时、当tuple被ack时。关于钩子的手册详见这里
backtype.storm.serialization: storm序列化/反序列化tuple的实现。在Kryo之上构建。
backtype.storm.spout: spout及相关接口的定义(例如”SpoutOutputCollector”)。也包括了”ShellSpout”来实现非JVM语言定义spout的协议。
backtype.storm.task: bolt及相关接口的定义(例如”OutputCollector”)。也包括了”ShellBolt”来实现非JVM语言定义bolt的协议。最后,”TopologyContext”也是在这里定义的,用来在运行时供spout和bolt使用以获取topology的执行信息。
backtype.storm.testing: 包括了storm单元测试中用到的各种测试bolt及工具。
backtype.storm.topology: 在Thrift结构之上的Java层,用以提供一个纯Java API来使用Storm(用户不需要了解Thrift的细节)。”TopologyBuilder”及不同spout和bolt的基类们也在这里定义。稍高一层次的接口”IBasicBolt”也在这里定义,它会使得创建某些特定类型的bolt会更加简洁。
backtype.storm.transactional: 包括了事务性topology的实现。
backtype.storm.tuple: 包括Storm中tuple数据模型的实现。
backtype.storm.utils: 包含了Storm源码中用到的数据结构及各种工具类。
Clojure (storm-core/src/clj/)
backtype.storm.bootstrap: 包括了1个有用的宏来引入源码中用到的所有类及命名空间。
backtype.storm.clojure: 包括了利用Clojure为Storm定义的特定领域语言(DSL)。
backtype.storm.cluster: Storm守护进程中用到的Zookeeper逻辑都封装在这个文件中。这部分代码提供了API来将整个集群的运行状态映射到Zookeeper的”文件系统”上(例如哪里运行着怎样的task,每个task运行的是哪个spout/bolt)。
backtype.storm.command.: 这些命名空间包括了各种”storm xxx”开头的客户端命令行的命令实现。这些实现都很简短。
backtype.storm.config: Clojure中config的读取/解析实现。同时也包括了工具函数来告诉nimbus、supervisor等守护进程在各种情况下应该使用哪些本地目录。例如:”master-inbox”函数会返回本地目录告诉Nimbus应该将上传给它的jar包保存到哪里。
backtype.storm.daemon.acker: “acker” bolt的实现。这是Storm确保数据被完全处理的关键组成部分。
backtype.storm.daemon.common: Storm守护进程用到的公共函数,例如根据topology的名字获取其id,将1个用户定义的topology映射到真正运行的topology(真正运行的topology是在用户定义的topology基础上添加了ack stream及acker bolt,参见system-topology!函数),同时包括了各种心跳及Storm中其他数据结构的定义。
backtype.storm.daemon.drpc: 包括了DRPC服务器的实现,用来与DRPC topology一起使用。
backtype.storm.daemon.nimbus: 包括了Nimbus的实现。
backtype.storm.daemon.supervisor: 包括了Supervisor的实现。
backtype.storm.daemon.task: 包括了spout或bolt的task实例实现。包括了处理消息路由、序列化、为UI提供的统计集合及spout、bolt执行动作的实现。
backtype.storm.daemon.worker: 包括了worker进程(1个worker包含很多的task)的实现。包括了消息传输和task启动的实现。
backtype.storm.event: 包括了1个简单的异步函数的执行器。Nimbus和Supervisor很多场合都用到了异步函数执行器来避免资源竞争。
backtype.storm.log: 定义了用来输出log信息给log4j的函数。
backtype.storm.messaging.: 定义了1个高一层次的接口来实现点对点的消息通讯。工作在本地模式时Storm会使用内存中的Java队列来模拟消息传递。工作在集群模式时,消息传递使用的是ZeroMQ。通用的接口在protocol.clj中定义。
backtype.storm.stats: 实现了向Zookeeper中写入UI使用的统计信息时如何进行汇总。实现了不同粒度的聚合。
backtype.storm.testing: 包括了测试Storm topology的工具。包括时间仿真,运行一组固定数量的tuple然后获得输出快照的”complete-topology”,”tracker topology”可以在集群”空闲”时做更细粒度的控制操作,以及其他工具。
backtype.storm.thrift: 包括了自动生成的Thrift API的Clojure封装以使得使用Thrift结构更加便利。
backtype.storm.timer: 实现了1个后台定时器来延迟执行函数或者定时轮询执行。Storm不能使用Java里的Timer类,因为为了单测Nimbus和Supervisor,必须要与时间仿真集成起来使用。
backtype.storm.ui.*: Storm UI的实现。完全独立于其他的代码,通过Nimbus的Thrift API来获取需要的数据。
backtype.storm.util: 包括了Storm代码中用到的通用工具函数。
backtype.storm.zookeeper: 包括了Clojure对Zookeeper API的封装,同时也提供了一些高一层次的操作例如:”mkdirs”、”delete-recursive”
Thrift (storm-core/src/storm.thrift)
storm.thrift: RPC服务的通信接口定义
genthrift.sh: 将storm.thrift文件中定义的结构编译转换为java代码(backtype.storm.generated)的脚本文件
Web UI (storm-core/src/ui)
src/ui : 定义storm UI界面的网页文件资源
转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com
参考
[1] https://github.com/apache/storm
[2] Storm源码结构 (来源Storm Github Wiki)