Storm源码阅读学习笔记之一(待整理)
宏 for 和 doseq 可以用来做 list comprehension. 它们支持遍历多个集合 ( 最右边的最快 ) ,同时还可以做一些过滤用 :when 和 :while 。 宏 for 只接受一个表达式 ,它返回一个懒惰集合作为结果 . 宏 doseq 接受任意数量的表达式 , 以有副作用的方式执行它们 , 并且返回 nil;
$符号用于取代java包之间的点;
定义zookeeper的连接状态
1
2
3
4
5(def zk-keeper-states
{Watcher$Event$KeeperState/Disconnected :disconnected
Watcher$Event$KeeperState/SyncConnected :connected
Watcher$Event$KeeperState/AuthFailed :auth-failed
Watcher$Event$KeeperState/Expired :expired})定义zookeeper监听事件类型
1
2
3
4
5
6(def zk-event-types
{Watcher$Event$EventType/None :none
Watcher$Event$EventType/NodeCreated :node-created
Watcher$Event$EventType/NodeDeleted :node-deleted
Watcher$Event$EventType/NodeDataChanged :node-data-changed
Watcher$Event$EventType/NodeChildrenChanged :node-children-changed})defn-定义私有函数
storm中广泛使用defnk: 和普通defn的不同是, 可以在参数里面使用k,v, 并且可以在函数体中直接使用k来得到value, 也可以在指定参数时给定k的默认值, 其实它的实现就是增加一个hashmap来存放这些k,v
storm中zookeeper.clj中采用Curator作为zookeeper的客户端连接, Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
backtype.storm.utils 中用java代码定义了newCurator的构造方法;今天的工作主要是导入storm到eclipse中去,遇到很多问题;
When building a WAR with “lein ring uberwar” and if ring-servlet is only in the lib/dev directory but not in the lib directory, ring-servlet is not added to the WAR.When deploying the WAR, for instance on Jetty, one get the following error:
“Could not locate ring/util/servlet__init.class or ring/util/servlet.clj on classpath”.
A temporary work around is to explicitly adds [ring/ring-servlet “1.0.1”] to the list of dependencies in the project.clj. 我添加了ring-servlet到pom中,错误消失;reify 用来 实现java的接口, 如下例所示:
1
2
3
4myJButton.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent e) {
// do something here
} });1
2
3
4
5
6
7
8
9
10
11(.addActionListener my-jbutton
(proxy [ActionListener] []
(actionPerformed [evt]
;; (do something here)
)))
(.addActionListener my-jbutton
(reify ActionListener
(actionPerformed [this evt]
;; (do something here)
)))“..” 是同时调用两个函数的简写: 即 方法调用可以用 .. 宏串起来;
Curator 框架事件类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23Enum CuratorEventType :
CHILDREN
Corresponds to CuratorFramework.getChildren()
CLOSING
Event sent when client is being closed
CREATE
Corresponds to CuratorFramework.create()
DELETE
Corresponds to CuratorFramework.delete()
EXISTS
Corresponds to CuratorFramework.checkExists()
GET_ACL
Corresponds to CuratorFramework.getACL()
GET_DATA
Corresponds to CuratorFramework.getData()
SET_ACL
Corresponds to CuratorFramework.setACL()
SET_DATA
Corresponds to CuratorFramework.setData()
SYNC
Corresponds to CuratorFramework.sync(String, Object)
WATCHED
Corresponds to Watchable.usingWatcher(Watcher) or Watchable.watched()Clojure 函数参数中可以使用^符号+参数类型来限定参数类型;
Clojure方法调用可以用 .. 宏串起来 并不是有几个串联调用的方法就需要几个点, 而是都是两个点;
1
2
3(.getLocation (.getCodeSource (.getProtectionDomain (.getClass '(1 2)))))
可以缩写为
(.. '(1 2) getClass getProtectionDomain getCodeSource getLocation)Tips: Eclipse 中可以使用ctrl+H 进行整个项目的搜索,非常实用的功能;
Twitter Storm源代码分析之TimeCacheMap – 见另一篇笔记;
inimbus是一个接口, 主要实现方法为:查询调用可用slot, 分配slots, 获取supervisor的节点id, 获取调度器;
clojure cond宏: 接受一系列 test/expression 对, 它每次对一个 test 进行求值, 如果某个 test 返回 true , 那么 cond 求值并返回与这个 test 相对应的expression , 并且不再对其他 test 进行求值。如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16(defn type-of-number [n]
(cond (> n 0) "positive number"
(< n 0) "negative number"
:else "zero"))
```
18. storm中使用到的语法符号->:
->, (-> x form & more)
线性化嵌套, 使其更具有可读性, Inserts x as the second item in the first form ; 从下面的例子可以看出, 就是把第一个参数(x)作为最初的输入, 调用第二个参数(代表的fn), 然后拿返回值调用后续函数;和..用处差不多, 但..只能用于java调用;
```clojure
(first (.split (.replace (.toUpperCase "a b c d") "A" "X") " "))
可以改写为:
(-> "a b c d"
.toUpperCase
(.replace "A" "X")
(.split " ")
first)nimbus data 数据结构: 见nimbus.clj文件, 主要包括
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
:nimbus-host-port-info (NimbusInfo/fromConf conf)
:inimbus inimbus
:authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) conf)
:impersonation-authorization-handler (mk-authorization-handler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf)
:submitted-count (atom 0)
:storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS))
:submit-lock (Object.)
:cred-update-lock (Object.)
:log-update-lock (Object.)
:heartbeats-cache (atom {})
:downloaders (file-cache-map conf)
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")
))
:scheduler (mk-scheduler conf inimbus)
:leader-elector (zk-leader-elector conf)
:code-distributor (mk-code-distributor conf)
:id->sched-status (atom {})
:cred-renewers (AuthUtils/GetCredentialRenewers conf)
:nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
}))
```
20. **org.apache.commons.io.FileUtils** : forceMkdir方法:
public static void forceMkdir(File directory)
throws IOException
Makes a directory, including any necessary but nonexistent parent directories. If a file already exists with specified name but it is not a directory then an IOException is thrown. If the directory cannot be created (or does not already exist) then an IOException is thrown.
21. **doseq** : Repeatedly executes body (presumably for side-effects) with bindings and filtering as provided by "for". Does not retain the head of the sequence. Returns nil.
22. clojure 中**symbol** : Returns a Symbol with the given namespace and name.
23. 宏 **defmulti** 和 **defmethod** 经常被用在一起来定义 multimethod. 宏 defmulti 的参数包括一个方法名以及一个 dispatch 函数,这个 dispatch 函数的返回值会被用来到底调用哪个重载的函数。宏 defmethod 的参数则包括方法名, dispatch 的值, 参数列表以及方法体。一个特殊的 dispatch 值 :default 是用来表示默认情况的 — 即如果其它的 dispatch 值都不匹配的话,那么就调用这个方法。 defmethod 多定义的名字一样的方法,它们的参数个数必须一样。传给multimethod 的参数会传给 dipatch 函数的。
24. **->> , (->> x form & more)**
Inserts x as the last item in the first form 和->的差别在于x插入的位置不同, ->是插入在第二个item, 即紧跟在函数名后面, 而->>是插在最后一个item;
```clojure
user=> (->> (range)
(map #(* % %))
(filter even?)
(take 10)
(reduce +))
等价于:
user=> (reduce +
(take 10
(filter even?
(map #(* % %)
(range)))))backtype.storm.config.clj 文件中主要实现功能: 对conf参数内容进行提取, backtype.storm.Config.java文件中定义了所有的storm可配置参数,同时定义serilization register;
Clojure 语法 : with-meta (with-meta obj m) Returns an object of the same type and value as obj, with map m as its metadata.
into, (into to from)
把from join到to, 可以看到底下对于list, vector, set, 加完的顺序是不同的, 刚开始有些疑惑, 其实Into, 只是依次从from中把item读出, 并append到to里面, 最终的顺序不同因为数据结构对append的处理不同;‘() 是List, 类似于java里面的LinkedList, [] 是vector, #{} 可以定义set, {} 里面可以定义set;
merge, (merge & maps) : 把多个map merge在一起, 如果有一样的key则latter优先原则, 后出现的优先;
LocalState 定义了一个轻量级, 可持久化的KV 数据库, 它的效率不高,每次读写都要进行磁盘操作, 所以他一般只能用于读写次数不多的场景;
fn 定义匿名函数;
cluster.clj 文件中 ClusterState协议 定义了一系列用于对Zookeeper进行操作的方法;
defprotocol: protocol是clojure中的接口; 需要注意的是,protocol里所有方法的第一个参数都是self/this参数(类似python),从第二个开始才是调用时传入的参数。
cluster.clj 文件中 mk-distributed-cluster-state 函数返回一个实现了ClusterState协议的对象, 该对象的基本方法都是使用zookeeper.clj文件中定义的方法实现的;
cluster.clj 文件中 StormClusterState协议, 主要定义了与storm相关的zookeeper操作, 包括读取topology中的任务分配, 向zookeeper中发送心跳信息等;
cluster.clj 文件中 mk-storm-cluster-state 函数返回一个实现了StormClusterState协议的对象;
cluster.clj文件中主要定义了两个协议,并定义两个函数分别返回实现了该协议的对象;
register :
defrecord :
SupervisorInfo 在common.clj文件中定义; converter.clj 文件中定义了SupevisorInfo的使用, 即在Thrift中的传输;
1
(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version])
common.clj文件中定义了StormBase, 其中定义了Topology的基本信息;
1
2(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status])
;;component->executors 保存了<component-id , parallelism>common.clj文件中定义了Assignment, 其中定义当前topology的任务分配情况;
1
2
3
4(defrecord Assignment [master-code-dir node->host executor->node+port executor->start-time-secs])
;; master-code-dir 为nimbus在本地保存该topology信息的路径, 包括stormjar stormcode stormconf
;; node->host 定义了该Topology被分配到的<supervisor-id, hostname>
;; executor->node+port 定义了该topology中executor的分配情况, node对应于supervisor-id, port是supervisor的一个端口;Clojure的函数后面加个感叹号的意思: 跟下划线一样是普通的组词字符,定义的时候想加就加, 在coding风格上提倡用这个形式来表示函数有明显副作用, 但是这只是风格上的提倡,在语义层面没有强制; 参考: http://dev.clojure.org/display/community/Library+Coding+Standards
storm.thrift 是按照Thrift语法编写的RPC 接口文件, 其中定义了service Nimbus的所有注册服务; 由于storm运行在jvm上, 前面定义的结构需要使用thrift转换为对应的java代码; 即 backtype.storm.generated.Nimbus.Iface接口;
nimbus中定义的数据结构主要有两大类: java定义的数据结构和Clojure定义的数据结构; java定义的数据结构主要用于任务分配, 而Clojure定义的数据结构主要来充当一些Storm的元数据; java定义的数据结构主要封装在backtype.storm.scheduler包中;
nimbus文件中定义mk-assignments函数, 主要功能为负责对当前集群中的所有topology进行新一轮的任务调度, 一方面会检查已运行的topology所占用的资源, 是否需要重新分配; 另一方面会根据当前系统中的可用资源, 为新提交的topology分配任务;
log.clj 文件中定义了所有级别的log message 宏;
Clojure apply: Applies fn f to the argument list formed by prepending intervening arguments to args.
1
2
3
4
5
6(apply f args)
(apply f x args)
(apply f x y args)
(apply f x y z args)
(apply f a b c d & args)
示例 : (apply str ["str1" "str2" "str3"]) ;;=> "str1str2str3do-cleanup函数功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19;; cleanup-storm-ids 函数判断出哪些topology需要清理,
;; 然后对需要清理的topology进行相应的操作;
;; 操作包括: 删除zookeeper中心跳和错误信息,
;; 然后尝试清除nimbus本地目录中的相关文件
;; 并从nimbus心跳缓存中移除对应的信息
(defn do-cleanup [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
submit-lock (:submit-lock nimbus)]
(let [to-cleanup-ids (locking submit-lock
(cleanup-storm-ids conf storm-cluster-state))]
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
(swap! (:heartbeats-cache nimbus) dissoc id))
))))clean-inbox函数功能
1
2
3
4
5
6
7
8
9
10
11
12
13;; 该方法负责清理nimbus的inbox文件夹
;; 清理的条件是文件夹中的jar包从最后一次被修改到当前时间间隔超过了seconds
(defn clean-inbox [dir-location seconds]
"Deletes jar files in dir older than seconds."
(let [now (current-time-secs)
pred #(and (.isFile %) (file-older-than? now seconds %))
files (filter pred (file-seq (File. dir-location)))]
(doseq [f files]
(if (.delete f)
(log-message "Cleaning inbox ... deleted: " (.getName f))
;; This should never happen
(log-error "Cleaning inbox ... error deleting: " (.getName f))
))))nimbus.clj文件中 transition-name!函数
1
2
3
4
5
6
7;; 该方法根据topology-name对应的转移事件,完成topology的状态转换,
;; 将基于storm-name的状态转换为基于storm-id的状态
(defn transition-name! [nimbus storm-name event & args]
(let [storm-id (get-storm-id (:storm-cluster-state nimbus) storm-name)]
(when-not storm-id
(throw (NotAliveException. storm-name)))
(apply transition! nimbus storm-id event args)))nimbus.clj文件中transition! 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41;; transition! 方法会根据传入的转移事件,
;; 获取与当前的Topology状态对应的状态转移函数,并执行该函数获取转换后的新状态
(defn transition!
([nimbus storm-id event]
(transition! nimbus storm-id event false))
([nimbus storm-id event error-on-no-transition?]
(locking (:submit-lock nimbus)
(let [system-events #{:startup}
[event & event-args] (if (keyword? event) [event] event)
storm-base (-> nimbus :storm-cluster-state (.storm-base storm-id nil))
status (:status storm-base)]
;; handles the case where event was scheduled but topology has been removed
(if-not status
(log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
(let [get-event (fn [m e]
(if (contains? m e)
(m e)
(let [msg (str "No transition for event: " event
", status: " status,
" storm-id: " storm-id)]
(if error-on-no-transition?
(throw-runtime msg)
(do (when-not (contains? system-events event)
(log-message msg))
nil))
)))
transition (-> (state-transitions nimbus storm-id status storm-base)
(get (:type status))
(get-event event))
transition (if (or (nil? transition)
(keyword? transition))
(fn [] transition)
transition)
storm-base-updates (apply transition event-args)
storm-base-updates (if (keyword? storm-base-updates) ;if it's just a symbol, that just indicates new status.
{:status {:type storm-base-updates}}
storm-base-updates)]
(when storm-base-updates
(.update-storm! (:storm-cluster-state nimbus) storm-id storm-base-updates)))))
)))nimbus.clj文件中kill-transition 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16;; 函数功能: delay kill-time后 kill 掉topology
;; 在kill状态下执行remove转移事件
(defn kill-transition [nimbus storm-id]
(fn [kill-time]
(let [delay (if kill-time
kill-time
(get (read-storm-conf (:conf nimbus) storm-id)
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
delay
:remove)
{
:status {:type :killed}
:topology-action-options {:delay-secs delay :action :kill}})
))nimbus.clj文件中rebalance-transition 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19;; 函数功能: delay time后 rebalance topology
;; 返回的执行结果中topology状态被更新为rebalancing
;; 等到计时器线程中do-rebalance转移事件执行完毕, 真正的rebalance才完成
(defn rebalance-transition [nimbus storm-id status]
(fn [time num-workers executor-overrides]
(let [delay (if time
time
(get (read-storm-conf (:conf nimbus) storm-id)
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
delay
:do-rebalance)
{:status {:type :rebalancing}
:prev-status status
:topology-action-options (-> {:delay-secs delay :action :rebalance}
(assoc-non-nil :num-workers num-workers)
(assoc-non-nil :component->executors executor-overrides))
})))nimbus.clj文件中state-transitions 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40;; 该函数定义了一个状态转移矩阵, key的集合包括active, inactive, killed, rebalancing
;; 对应的value表示了处于由key指定的状态时,
;; 其状态变化需要遵循的从转移事件到对应状态的转移函数的集合
(defn state-transitions [nimbus storm-id status storm-base]
{:active {:inactivate :inactive
:activate nil
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
}
:inactive {:activate :active
:inactivate nil
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
}
:killed {:startup (fn [] (delay-event nimbus
storm-id
(-> storm-base
:topology-action-options
:delay-secs)
:remove)
nil)
:kill (kill-transition nimbus storm-id)
:remove (fn []
(log-message "Killing topology: " storm-id)
(.remove-storm! (:storm-cluster-state nimbus)
storm-id)
nil)
}
:rebalancing {:startup (fn [] (delay-event nimbus
storm-id
(-> storm-base
:topology-action-options
:delay-secs)
:do-rebalance)
nil)
:kill (kill-transition nimbus storm-id)
:do-rebalance (fn []
(do-rebalance nimbus storm-id status storm-base)
(:type (:prev-status storm-base)))
}})nimbus.clj文件中compute-new-topology->executor->node+port方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69; public so it can be mocked out
;; 该方法根据当前已经存在的分配情况, 结合系统当前的运行情况找出需要进行任务分配的Topology 集合
;; 并为他们分配任务, 计算出这一轮分配完之后的每个Topology对应的任务分配情况
;; scratch-topology-id 是需要重新进行分配操作的topology id
(defn compute-new-topology->executor->node+port [nimbus existing-assignments topologies scratch-topology-id]
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
topology->executors (compute-topology->executors nimbus (keys existing-assignments))
;; update the executors heartbeats first.
_ (update-all-heartbeats! nimbus existing-assignments topology->executors)
topology->alive-executors (compute-topology->alive-executors nimbus
existing-assignments
topologies
topology->executors
scratch-topology-id)
supervisor->dead-ports (compute-supervisor->dead-ports nimbus
existing-assignments
topology->executors
topology->alive-executors)
topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
existing-assignments
topology->alive-executors)
;; 过滤条件: 该Topology的所有Executor为空, 或者该Topology的所有Executors不等于该Topolgy
;; 的alive executors, 或者该Topology的num-used-worker 数目小雨指定的num workers
missing-assignment-topologies (->> topologies
.getTopologies ;; Collection<TopologyDetails>
(map (memfn getId))
(filter (fn [t]
(let [alle (get topology->executors t)
alivee (get topology->alive-executors t)]
(or (empty? alle)
(not= alle alivee)
(< (-> topology->scheduler-assignment
(get t)
num-used-workers )
(-> topologies (.getById t) .getNumWorkers)
))
))))
all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
(map (fn [[node-id port]] {node-id #{port}}))
(apply merge-with set/union))
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
_ (.schedule (:scheduler nimbus) topologies cluster)
new-scheduler-assignments (.getAssignments cluster)
;; add more information to convert SchedulerAssignment to Assignment
new-topology->executor->node+port (compute-topology->executor->node+port new-scheduler-assignments)]
(reset! (:id->sched-status nimbus) (.getStatusMap cluster))
;; print some useful information.
(doseq [[topology-id executor->node+port] new-topology->executor->node+port
:let [old-executor->node+port (-> topology-id
existing-assignments
:executor->node+port)
reassignment (filter (fn [[executor node+port]]
(and (contains? old-executor->node+port executor)
(not (= node+port (old-executor->node+port executor)))))
executor->node+port)]]
(when-not (empty? reassignment)
(let [new-slots-cnt (count (set (vals executor->node+port)))
reassign-executors (keys reassignment)]
(log-message "Reassigning " topology-id " to " new-slots-cnt " slots")
(log-message "Reassign executors: " (vec reassign-executors)))))
new-topology->executor->node+port))Clojure 中 mapcat方法: Returns the result of applying concat to the result of applying map to f and colls. Thus function f should return a collection. Returns a transducer when no collections are provided.
1
2
3
4
5
6
7user=> (mapcat (fn [[k v]]
(for [[k2 v2] v]
(concat [k k2] v2)))
'{:a {:x (1 2) :y (3 4)}
:b {:x (1 2) :z (5 6)}})
((:a :x 1 2) (:a :y 3 4) (:b :x 1 2) (:b :z 5 6))if-let : 对let添加if判断, 如下面的例子, 如果nums非false或nil, 则执行累加, 否则表示list中没有偶数打印”No even numbers found.” 适用于对于不同的let结果的不同处理.
1
2
3
4
5
6
7
8user=> (defn sum-even-numbers [nums]
(if-let [nums (seq (filter even? nums))]
(reduce + nums)
"No even numbers found."))
user=> (sum-even-numbers [1 3 5 7 9])
"No even numbers found."
user=> (sum-even-numbers [1 3 5 7 9 10 12])
22when-let : when-let, 一样的理论, 当let赋值非false或nil时, 执行相应逻辑, 否则返回nil
1
2
3
4
5
6
7
8(defn drop-one
[coll]
(when-let [s (seq coll)]
(rest s)))
user=> (drop-one [1 2 3])
(2 3)
user=> (drop-one [])
nilnimbus 本地工作目录结构
1
2
3
4
5
6
7
8
9
10
11
12- workdir
- nimbus
- inbox
- stormdist
- topology id命名的文件夹
- **stormcode.ser** ( backtype.storm.generated包中StormTopology对象序列化)
- **stormconf.ser** (topology conf 代码序列化)
- **stormjar.jar** (用户提交的jar包)
- supervisor
- isupervisor
- localstate
- tmpTopologyDetails 对象中的数据结构
1
2
3
4
5
6
7
8public class TopologyDetails {
String topologyId;
Map topologyConf;
StormTopology topology;
Map<ExecutorDetails, String> executorToComponent;
int numWorkers;
......
}ExcutorDetails 对象记录了每个Executor所对应的startTask和endTask, 这样定义是为了保证Storm在计算Executor时, 每个Executor都是一个连续的任务集合;
Clojure 中 swap! 宏的作用:
(swap! atom f) (swap! atom f x) (swap! atom f x y) (swap! atom f x y & args)
Atomically swaps the value of atom to be:
(apply f current-value-of-atom args). Note that f may be called
multiple times, and thus should be free of side effects. Returns
the value that was swapped in.
swap! 将函数f作用于当前状态值和额外的参数args之上,形成新的状态值SchedulerAssignment 是一个接口, SchedulerAssignmentImpl是对这个接口的实现;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18public interface SchedulerAssignment {
// Does this slot occupied by this assignment?
public boolean isSlotOccupied(WorkerSlot slot);
//is the executor assigned?
public boolean isExecutorAssigned(ExecutorDetails executor);
// get the topology-id this assignment is for.
public String getTopologyId();
//get the executor -> slot map.
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot();
// Return the executors covered by this assignments
public Set<ExecutorDetails> getExecutors();
public Set<WorkerSlot> getSlots();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75package backtype.storm.scheduler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
//TODO: improve this by maintaining slot -> executors as well for more efficient operations
public class SchedulerAssignmentImpl implements SchedulerAssignment {
//topology-id this assignment is for.
String topologyId;
// assignment detail, a mapping from executor to WorkerSlot
Map<ExecutorDetails, WorkerSlot> executorToSlot;
public SchedulerAssignmentImpl(String topologyId, Map<ExecutorDetails, WorkerSlot> executorToSlots) {
this.topologyId = topologyId;
this.executorToSlot = new HashMap<ExecutorDetails, WorkerSlot>(0);
if (executorToSlots != null) {
this.executorToSlot.putAll(executorToSlots);
}
}
public Set<WorkerSlot> getSlots() {
return new HashSet(executorToSlot.values());
}
//Assign the slot to executors.
public void assign(WorkerSlot slot, Collection<ExecutorDetails> executors) {
for (ExecutorDetails executor : executors) {
this.executorToSlot.put(executor, slot);
}
}
//Release the slot occupied by this assignment.
public void unassignBySlot(WorkerSlot slot) {
List<ExecutorDetails> executors = new ArrayList<ExecutorDetails>();
for (ExecutorDetails executor : this.executorToSlot.keySet()) {
WorkerSlot ws = this.executorToSlot.get(executor);
if (ws.equals(slot)) {
executors.add(executor);
}
}
// remove
for (ExecutorDetails executor : executors) {
this.executorToSlot.remove(executor);
}
}
//Does this slot occupied by this assignment?
public boolean isSlotOccupied(WorkerSlot slot) {
return this.executorToSlot.containsValue(slot);
}
public boolean isExecutorAssigned(ExecutorDetails executor) {
return this.executorToSlot.containsKey(executor);
}
public String getTopologyId() {
return this.topologyId;
}
public Map<ExecutorDetails, WorkerSlot> getExecutorToSlot() {
return this.executorToSlot;
}
// Return the executors covered by this assignments
public Set<ExecutorDetails> getExecutors() {
return this.executorToSlot.keySet();
}
}nimbus.clj文件中mk-assignments方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80;; get existing assignment (just the executor->node+port map) -> default to {}
;; filter out ones which have a executor timeout
;; figure out available slots on cluster. add to that the used valid slots to get total slots. figure out how many executors should be in each slot (e.g., 4, 4, 4, 5)
;; only keep existing slots that satisfy one of those slots. for rest, reassign them across remaining slots
;; edge case for slots with no executor timeout but with supervisor timeout... just treat these as valid slots that can be reassigned to. worst comes to worse the executor will timeout and won't assign here next time around
;; mk-assignments主要负责对当前集群中的所有Topology进行新一轮的任务调度, 它一方面会检查已运行的Topology
;; 所占用的资源,判断他们是否有问题, 是否需要重新分配, 另一方面也会根据当前的可用资源, 为新提交的Topology分配任务
;; 然后会将所有的分配信息保存或者更新到Zookeeper中,
(defnk mk-assignments [nimbus :scratch-topology-id nil]
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
^INimbus inimbus (:inimbus nimbus)
;; read all the topologies
topology-ids (.active-storms storm-cluster-state)
topologies (into {} (for [tid topology-ids]
{tid (read-topology-details nimbus tid)}))
topologies (Topologies. topologies)
;; read all the assignments
assigned-topology-ids (.assignments storm-cluster-state nil)
existing-assignments (into {} (for [tid assigned-topology-ids]
;; for the topology which wants rebalance (specified by the scratch-topology-id)
;; we exclude its assignment, meaning that all the slots occupied by its assignment
;; will be treated as free slot in the scheduler code.
(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
{tid (.assignment-info storm-cluster-state tid nil)})))
;; make the new assignments for topologies
topology->executor->node+port (compute-new-topology->executor->node+port
nimbus
existing-assignments
topologies
scratch-topology-id)
topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
now-secs (current-time-secs)
basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
;; construct the final Assignments by adding start-times etc into it
new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
:let [existing-assignment (get existing-assignments topology-id)
all-nodes (->> executor->node+port vals (map first) set)
node->host (->> all-nodes
(mapcat (fn [node]
(if-let [host (.getHostName inimbus basic-supervisor-details-map node)]
[[node host]]
)))
(into {}))
all-node->host (merge (:node->host existing-assignment) node->host)
reassign-executors (changed-executors (:executor->node+port existing-assignment) executor->node+port)
start-times (merge (:executor->start-time-secs existing-assignment)
(into {}
(for [id reassign-executors]
[id now-secs]
)))]]
{topology-id (Assignment.
(master-stormdist-root conf topology-id)
(select-keys all-node->host all-nodes)
executor->node+port
start-times)}))]
;; tasks figure out what tasks to talk to by looking at topology at runtime
;; only log/set when there's been a change to the assignment
(doseq [[topology-id assignment] new-assignments
:let [existing-assignment (get existing-assignments topology-id)
topology-details (.getById topologies topology-id)]]
(if (= existing-assignment assignment)
(log-debug "Assignment for " topology-id " hasn't changed")
(do
(log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment))
(.set-assignment! storm-cluster-state topology-id assignment)
)))
(->> new-assignments
(map (fn [[topology-id assignment]]
(let [existing-assignment (get existing-assignments topology-id)]
[topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
)))
(into {})
(.assignSlots inimbus topologies))
))WorkerTopologyContext 上下文对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40public class WorkerTopologyContext extends GeneralTopologyContext {
public static final String SHARED_EXECUTOR = "executor";
private Integer _workerPort;
private List<Integer> _workerTasks;
private String _codeDir;
private String _pidDir;
Map<String, Object> _userResources;
Map<String, Object> _defaultResources;
public WorkerTopologyContext(
StormTopology topology,
Map stormConf,
Map<Integer, String> taskToComponent,
Map<String, List<Integer>> componentToSortedTasks,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId,
String codeDir,
String pidDir,
Integer workerPort,
List<Integer> workerTasks,
Map<String, Object> defaultResources,
Map<String, Object> userResources
) {
super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId);
_codeDir = codeDir;
_defaultResources = defaultResources;
_userResources = userResources;
try {
if(pidDir!=null) {
_pidDir = new File(pidDir).getCanonicalPath();
} else {
_pidDir = null;
}
} catch (IOException e) {
throw new RuntimeException("Could not get canonical path for " + _pidDir, e);
}
_workerPort = workerPort;
_workerTasks = workerTasks;
}common.clj 文件定义了对Topology的检验函数, 为Topoloy添加acker, metric, system bolt的方法;
Clojure中-functionname 表示该函数是public的,调用该函数时,不需要加-, 直接functionname即可;
nimbus读取storm的配置文件函数read-storm-config 是在config.clj中定义的; 而nimbus中定义的read-storm-conf是读取指定Topology的配置信息;
read-storm-config 函数中真正读取storm.yaml函数是由java代码来实现的, readStormConfig定义在Utils.java中,该文件中解析yaml文件, 其中的Yaml类是由org.yaml.snakeyaml.Yaml 提供的;
Apache thrift的使用步骤:
- 编写后缀名为thrift的文件,使用工具生成对应语言的源代码,thrift支持的语言很多,包括C/C++/java/python等;
- 实现thrift client;
- 实现thrift server;thfirt server需要实现thrift文件中定义的service接口;
supervisor 启动时先设置了线程的未捕捉异常的Handler. Thread.UncaughtExceptionHandler解释:
一个处理接口,当一个线程因为没有捕捉的异常导致的突然终止时,该接口被唤醒。 当一个线程因为不能捕捉的异常将要终止的时候,JVM会调用getUncaughtExceptionHandler方法查找扎个线程对应的指定UncaughtExceptionHandler对象,并且回调这个对象的uncaughtException方法。如果这个线程没有指定的UncaughtExceptionHandler对象,那么将ThreadGroup对象作为UncaughtExceptionHandler对象,因为ThreadGroup类实现了Thread.UncaughtExceptionHandler接口;会调用顶层ThreadGroup的那个默认的UncaughtExceptionHandler对象的uncaughtException方法.standalone-supervisor 方法实例化一个实现了ISupervisor 接口的对象;
storm事件管理器定义在event.clj中,主要功能就是通过独立线程执行”事件处理函数”。我们可以将”事件处理函数”添加到EventManager的阻塞队列中,EventManager的事件处理线程不断地从阻塞队列中获取”事件处理函数”并执行。
EventManager协议: 协议就是一组函数定义的集合,协议中函数的第一个参数必须为实现该协议的实例本身,类似于java中实例方法的第一个参数为this;协议类似于java中的接口。1
2
3
4(defprotocol EventManager
(add [this event-fn])
(waiting? [this])
(shutdown [this]))event-manager函数定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51(defn event-manager
"Creates a thread to respond to events. Any error will cause process to halt"
;; daemon?表示是否将事件处理线程设置成守护线程
[daemon?]
;; added表示已添加的"事件处理函数"的个数
(let [added (atom 0)
;; processed表示已处理的"事件处理函数"的个数
processed (atom 0)
;; queue绑定事件管理器的阻塞队列LinkedBlockingQueue
^LinkedBlockingQueue queue (LinkedBlockingQueue.)
;; 设置事件管理器的状态为"running"
running (atom true)
;; 创建事件处理线程。Clojure函数实现了Runnable和Callable接口,所以可以将Clojure函数作为参数传递给java.lang.Thread类的构造函数
runner (Thread.
;; 事件处理线程循环检查事件处理器的状态是否是"running",如果是,就从阻塞队列中获取"事件处理函数",并执行;然后将processed加1
(fn []
(try-cause
(while @running
(let [r (.take queue)]
(r)
(swap! processed inc)))
(catch InterruptedException t
(log-message "Event manager interrupted"))
(catch Throwable t
(log-error t "Error when processing event")
(exit-process! 20 "Error when processing an event")))))]:
(.setDaemon runner daemon?)
;; 启动事件处理线程
(.start runner)
;; 返回一个实现了EventManager协议的实例
(reify
EventManager
;; add函数将"事件处理函数"添加到事件处理器的阻塞队列中
(add
[this event-fn]
;; should keep track of total added and processed to know if this is finished yet
(when-not @running
(throw (RuntimeException. "Cannot add events to a shutdown event manager")))
(swap! added inc)
(.put queue event-fn))
;; waiting?判断事件处理线程是否处于等待状态
(waiting?
[this]
(or (Time/isThreadWaiting runner)
(= @processed @added)))
;; 关闭事件管理器
(shutdown
[this]
(reset! running false)
(.interrupt runner)
(.join runner)))))UI界面上的信息获取是在ui.core.clj 文件中实现的,里面实现了所有的UI API 函数, 可以获取UI 界面信息;
storm 0.11.0 版本中使用的thrift 是0.9.2 版本的,我用0.9.3 版本编辑结果后用以下命令比较:
1
for i in `ls ./` ; do diff $i "/home/yjk/storm-master-copy/storm-core/src/jvm/backtype/storm/generated/$i"; done;
结果基本上是相等的, 但有细微差别. 存在许多以下转换
1
2
3< return get_num_tasks();
---
> return Integer.valueOf(get_num_tasks());
未完待续.
转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com