文章目录
  1. 1. UI进程的启动
  2. 2. backtype.storm.ui.core

#Storm源码分析——UI界面显示代码分析

storm开发环境的配置请参考:storm源码分析——工具篇

storm的UI界面的参数解释可以参考http://lbxc.iteye.com/blog/1522318

storm提供了ui界面的Restful API,主要API如下,详细的API的参数和返回值请参考https://www.zybuluo.com/yanhealth/note/142919:

1
2
3
4
5
6
7
8
9
10
/api/v1/cluster/configuration (GET) 
/api/v1/cluster/summary (GET)
/api/v1/supervisor/summary (GET)
/api/v1/topology/summary (GET)
/api/v1/topology/:id (GET)
/api/v1/topology/:id/component/:component (GET)
/api/v1/topology/:id/activate (POST)
/api/v1/topology/:id/deactivate (POST)
/api/v1/topology/:id/rebalance/:wait-time (POST)
/api/v1/topology/:id/kill/:wait-time (POST)

这些接口函数的定义主要是在ui/core.clj文件中。下面主要分析UI进程的启动过程和以上接口数据的产生。

UI进程的启动

我们知道,UI进程是通过以下命令启动的:

1
bin/storm ui

其中storm实际上是一个bash的脚本文件,storm 会通过以下命令调用bin/storm.py的python文件来执行:

1
exec "$PYTHON" "${STORM_BIN_DIR}/storm.py" "$@"

storm.py中,接收到ui参数后,执行的函数代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def ui():
"""
Syntax: [storm ui]
Launches the UI daemon.
"""

cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
"-Dlogfile.name=ui.log",
"-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml")
]
exec_storm_class(
"backtype.storm.ui.core",
jvmtype="-server",
daemonName="ui",
jvmopts=jvmopts,
extrajars=[STORM_DIR, CLUSTER_CONF_DIR])

其中首先会调用confvalue函数来获取UI进程执行的参数项,confvalue代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def confvalue(name, extrapaths, daemon=True):
global CONFFILE
command = [
JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrapaths, daemon), "backtype.storm.command.config_value", name
]
p = sub.Popen(command, stdout=sub.PIPE)
output, errors = p.communicate()
# python 3
if not isinstance(output, str):
output = output.decode('utf-8')
lines = output.split(os.linesep)
for line in lines:
tokens = line.split(" ")
if tokens[0] == "VALUE:":
return " ".join(tokens[1:])
return ""

其中sub.Popen是启动一个子进程,执行command命令,而command数组实际上是执行backtype.storm.command.config_value这个类,并将传入的ui.childopts作为参数。
config_value类的路径为:src/clj/backtype/storm/command/config_value.clj
config_value.clj的代码如下:

1
2
3
4
(defn -main [^String name]
(let [conf (read-storm-config)]
(println "VALUE:" (conf name))
))

因此,实际上就是读取storm的配置文件storm.conf,并得到ui.childopts参数。默认的ui.childopts参数是”-Xmx768m”,设置虚拟机内存堆的最大可用大小为768M.

因此UI进程启动的顺序为:

  1. 先调用backtype.storm.command.config_value获取读取到的配置文件中的ui启动项;
  2. 然后执行java命令行参数后调用backtype.storm.ui.core启动UI进程。

其中配置文件中可以配置的UI参数如下(定义在src/jvm/backtype/storm/Config.java):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//Childopts for Storm UI Java process.
static String UI_CHILDOPTS

//A class implementing javax.servlet.Filter for authenticating/filtering UI requests
static String UI_FILTER

//Initialization parameters for the javax.servlet.Filter
static String UI_FILTER_PARAMS

//The size of the header buffer for the UI in bytes
static String UI_HEADER_BUFFER_BYTES

//Storm UI binds to this host/interface.
static String UI_HOST

//Class name of the HTTP credentials plugin for the UI.
static String UI_HTTP_CREDS_PLUGIN

//Storm UI binds to this port.
static String UI_PORT

backtype.storm.ui.core

这一节看一下src/backtype/storm/ui/core.clj这个文件的主要功能(其中部分关于jetty服务器配置的函数定义在ui/helper.clj文件中):

backtype.storm.ui.core的主要功能是定义并启动一个jetty服务器,用于实现所有API的响应并返回json数据。

首先看main函数, 就是调用start-server! 函数(clojure中!表示函数是突变函数,表示方法或宏不能在STM中安全使用):

1
(defn -main [] (start-server!))

start-server!函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
(defn start-server!
[]
(try
(let [conf *STORM-CONF*
header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
filters-confs [{:filter-class (conf UI-FILTER)
:filter-params (conf UI-FILTER-PARAMS)}]
https-port (if (not-nil? (conf UI-HTTPS-PORT)) (conf UI-HTTPS-PORT) 0)
https-ks-path (conf UI-HTTPS-KEYSTORE-PATH)
https-ks-password (conf UI-HTTPS-KEYSTORE-PASSWORD)
https-ks-type (conf UI-HTTPS-KEYSTORE-TYPE)
https-key-password (conf UI-HTTPS-KEY-PASSWORD)
https-ts-path (conf UI-HTTPS-TRUSTSTORE-PATH)
https-ts-password (conf UI-HTTPS-TRUSTSTORE-PASSWORD)
https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
(storm-run-jetty {:port (conf UI-PORT)
:host (conf UI-HOST)
:https-port https-port
:configurator (fn [server]
(config-ssl server
https-port
https-ks-path
https-ks-password
https-ks-type
https-key-password
https-ts-path
https-ts-password
https-ts-type
https-need-client-auth
https-want-client-auth)
(doseq [connector (.getConnectors server)]
(.setRequestHeaderSize connector header-buffer-size))
(config-filter server app filters-confs))}))
(catch Exception ex
(log-error ex))))

首先从conf配置项中取出http的相应参数,然后将这些配置项封装为map作为参数去调用storm-run-jetty函数(helper.clj),参数有:port,:host,:https-port,:configurator。storm-run-jetty函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
;; Modified from ring.adapter.jetty 1.3.0
(defn- jetty-create-server
"Construct a Jetty Server instance."
[options]
(let [connector (doto (SelectChannelConnector.)
(.setPort (options :port 80))
(.setHost (options :host))
(.setMaxIdleTime (options :max-idle-time 200000)))
server (doto (Server.)
(.addConnector connector)
(.setSendDateHeader true))
https-port (options :https-port)]
(if (and (not-nil? https-port) (> https-port 0)) (remove-non-ssl-connectors server))
server))

(defn storm-run-jetty
"Modified version of run-jetty
Assumes configurator sets handler."
[config]
{:pre [(:configurator config)]}
(let [#^Server s (jetty-create-server (dissoc config :configurator))
configurator (:configurator config)]
(configurator s)
(.start s)))

storm-run-jetty的运行流程

  1. 传入参数为::port,:host,:https-port,:configurator。 其中:configurator是在start-server!中定义的参数,返回的是如下的函数,该函数首先利用https的传入参数初始化一个ssl connector,并将其添加到server中。然后对server中的每一个connector,设置HTTP请求头的buffer size,设置server的过滤器,再设置server的handle为app,主要通过config-filter函数(helper.clj)实现。。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    (fn [server]
    (config-ssl server
    https-port
    https-ks-path
    https-ks-password
    https-ks-type
    https-key-password
    https-ts-path
    https-ts-password
    https-ts-type
    https-need-client-auth
    https-want-client-auth)
    (doseq [connector (.getConnectors server)]
    (.setRequestHeaderSize connector header-buffer-size))
    (config-filter server app filters-confs))
  2. 在storm-run-jetty函数中首先调用jetty-create-server函数来创建一个jetty服务器对象:

    • 传入的参数为:port,:host,:https-port 。
    • 首先初始化一个SelectChannelConnector,设置端口(默认为80),设置host, 设置最大空闲时间(默认为200000ms),然后初始化一个Server, 添加连接器Connector,设置Http Header中发送日期。然后判断参数中是否存在https端口,如果存在的话,将Server中的所有不是SslSocketConnector实例的连接器删除。
    • 返回server
  3. 然后从storm-run-jetty函数参数中取出configurator函数,配置server对象,添加https连接器和过滤器,设置handle;
  4. 最后启动服务器server.start()。

storm中的jetty服务器的handle的实现是借助于Clojure中的web开发框架ring及其routing库Compojure来实现的,主要通过以下app代码来实现。即首先对main-routes路由函数(后面介绍)添加中间件:包括request参数json化,parse multipart, 在发送request前重载namespace;捕捉错误并json化,然后作为参数调用handler/site函数;

1
2
3
4
5
6
(def app
(handler/site (-> main-routes
(wrap-json-params)
(wrap-multipart-params)
(wrap-reload '[backtype.storm.ui.core])
catch-errors)))

compojure.handler.site函数的功能为:传入route函数,添加中间件,返回handle。

1
2
3
4
5
6
7
8
9
Create a handler suitable for a standard website. This adds the
following middleware to your routes:
- wrap-session
- wrap-flash
- wrap-cookies
- wrap-multipart-params
- wrap-params
- wrap-nested-params
- wrap-keyword-params

因此最后得到的app是一个handle。将app作为config-filter的参数执行以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
(defn config-filter [server handler filters-confs]
(if filters-confs
(let [servlet-holder (ServletHolder.
(ring.util.servlet/servlet handler))
context (doto (org.eclipse.jetty.servlet.ServletContextHandler. server "/")
(.addServlet servlet-holder "/"))]
(.addFilter context (cors-filter-handler) "/*" (EnumSet/allOf DispatcherType))
(doseq [{:keys [filter-name filter-class filter-params]} filters-confs]
(if filter-class
(let [filter-holder (doto (org.eclipse.jetty.servlet.FilterHolder.)
(.setClassName filter-class)
(.setName (or filter-name filter-class))
(.setInitParameters (or filter-params {})))]
(.addFilter context filter-holder "/*" FilterMapping/ALL))))
(.setHandler server context))))

这段代码的前面主要是添加filter。关键的最后一句server.setHandler(context) 函数会将前面得到的app作为jetty服务器的handle。这里注意:虽然默认的filters-confs参数是 {:filter-class nil, :filter-params nil},但其并不是nil,所以if filters-confs为true。

最后再来看一下main-routes函数,其定义如下。这是ring框架提供的macro定义,主要定义了不同API路径的请求处理和响应返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
(defroutes main-routes
(GET "/api/v1/cluster/configuration" [& m]
(json-response (cluster-configuration)
(:callback m) :serialize-fn identity))
(GET "/api/v1/cluster/summary" [:as {:keys [cookies servlet-request]} & m]
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(let [user (get-user-name servlet-request)]
(json-response (cluster-summary user) (:callback m))))
(GET "/api/v1/supervisor/summary" [:as {:keys [cookies servlet-request]} & m]
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(json-response (supervisor-summary) (:callback m)))
(GET "/api/v1/topology/summary" [:as {:keys [cookies servlet-request]} & m]
(populate-context! servlet-request)
(assert-authorized-user "getClusterInfo")
(json-response (all-topologies-summary) (:callback m)))
(GET "/api/v1/topology/:id" [:as {:keys [cookies servlet-request scheme params]} id & m]
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(let [user (.getUserName http-creds-handler servlet-request)]
(json-response (topology-page id (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m))))
(GET "/api/v1/topology/:id/visualization" [:as {:keys [cookies servlet-request]} id & m]
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(json-response (mk-visualization-data id (:window m) (check-include-sys? (:sys m))) (:callback m)))
(GET "/api/v1/topology/:id/component/:component" [:as {:keys [cookies servlet-request scheme]} id component & m]
(populate-context! servlet-request)
(assert-authorized-user "getTopology" (topology-config id))
(let [user (.getUserName http-creds-handler servlet-request)]
(json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user (= scheme :https)) (:callback m))))
(POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id & m]
(populate-context! servlet-request)
(assert-authorized-user "activate" (topology-config id))
(with-nimbus nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(.activate nimbus name)
(log-message "Activating topology '" name "'")))
(json-response (topology-op-response id "activate") (m "callback")))
(POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id & m]
(populate-context! servlet-request)
(assert-authorized-user "deactivate" (topology-config id))
(with-nimbus nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(.deactivate nimbus name)
(log-message "Deactivating topology '" name "'")))
(json-response (topology-op-response id "deactivate") (m "callback")))
(POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
(populate-context! servlet-request)
(assert-authorized-user "rebalance" (topology-config id))
(with-nimbus nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
rebalance-options (m "rebalanceOptions")
options (RebalanceOptions.)]
(.set_wait_secs options (Integer/parseInt wait-time))
(if (and (not-nil? rebalance-options) (contains? rebalance-options "numWorkers"))
(.set_num_workers options (Integer/parseInt (.toString (rebalance-options "numWorkers")))))
(if (and (not-nil? rebalance-options) (contains? rebalance-options "executors"))
(doseq [keyval (rebalance-options "executors")]
(.put_to_num_executors options (key keyval) (Integer/parseInt (.toString (val keyval))))))
(.rebalance nimbus name options)
(log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs")))
(json-response (topology-op-response id "rebalance") (m "callback")))
(POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time & m]
(populate-context! servlet-request)
(assert-authorized-user "killTopology" (topology-config id))
(with-nimbus nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)
options (KillOptions.)]
(.set_wait_secs options (Integer/parseInt wait-time))
(.killTopologyWithOpts nimbus name options)
(log-message "Killing topology '" name "' with wait time: " wait-time " secs")))
(json-response (topology-op-response id "kill") (m "callback")))
(GET "/" [:as {cookies :cookies}]
(resp/redirect "/index.html"))
(route/resources "/")
(route/not-found "Page not found"))

其中:

  • json-response函数用于将得到的response转换为json格式;
  • populate-context!函数用于将servlet-request对象转换为storm的RequestContext对象,即backtype.storm.security.auth.ReqContext (src/jvm/backtype/storm/security/auth/ReqContext.java),主要是用于添加用户认证的插件,插件只需要实现IHttpCredentialsPlugin接口即可。没有做详细了解。
  • assert-authorized-user也是验证用户操作权限的;
  • with-nimbus是一个宏,用于建立一个与thrift server的客户端连接,代码如下, 首先初始化一个ReqContext上下文对象,获取用户名,然后与storm thirift server建立连接,执行body所代表的操作。其中nimbus-sym是一个符号参数,其会在with-nimbus-connection-as-user中被赋值为NimbusClient对象,用于在执行body代码是作为参数;

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    ;; src/clj/backtype/storm/ui/core.clj
    (defmacro with-nimbus
    [nimbus-sym & body]
    `(let [context# (ReqContext/context)
    user# (if (.principal context#) (.getName (.principal context#)))]
    (thrift/with-nimbus-connection-as-user
    [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT) user#]
    ~@body)))

    ;; src/clj/backtype/storm/thrift.clj
    (defmacro with-nimbus-connection-as-user
    [[client-sym host port as-user] & body]
    `(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port ~as-user)]
    (try
    ~@body
    (finally (.close conn#)))))

    比如active API的代码如下:在建立与服务器的连接后得到了NimbusCilent对象nimbus,调用其.getTopologyInfoWithOpts函数,得到TopologyInfo 对象,进而得到topology name,然后调用active方法激活topology;

    1
    2
    3
    4
    5
    6
    7
    8
    (with-nimbus nimbus
    (let [tplg (->> (doto
    (GetInfoOptions.)
    (.set_num_err_choice NumErrorsChoice/NONE))
    (.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
    name (.get_name tplg)]
    (.activate nimbus name)
    (log-message "Activating topology '" name "'")))

note:jetty服务器API是org.eclipse.jetty.server这个包提供的。

至此整个UI进程的启动和运行过程已经解析完毕。下一步是解析UI界面上的数据来源。 请参考下一篇博客。






转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com

文章目录
  1. 1. UI进程的启动
  2. 2. backtype.storm.ui.core