Storm 源码分析——UI界面显示代码分析(上)
#Storm源码分析——UI界面显示代码分析
storm开发环境的配置请参考:storm源码分析——工具篇
storm的UI界面的参数解释可以参考http://lbxc.iteye.com/blog/1522318
storm提供了ui界面的Restful API,主要API如下,详细的API的参数和返回值请参考https://www.zybuluo.com/yanhealth/note/142919:1
2
3
4
5
6
7
8
9
10/api/v1/cluster/configuration (GET)
/api/v1/cluster/summary (GET)
/api/v1/supervisor/summary (GET)
/api/v1/topology/summary (GET)
/api/v1/topology/:id (GET)
/api/v1/topology/:id/component/:component (GET)
/api/v1/topology/:id/activate (POST)
/api/v1/topology/:id/deactivate (POST)
/api/v1/topology/:id/rebalance/:wait-time (POST)
/api/v1/topology/:id/kill/:wait-time (POST)
这些接口函数的定义主要是在ui/core.clj文件中。下面主要分析UI进程的启动过程和以上接口数据的产生。
UI进程的启动
我们知道,UI进程是通过以下命令启动的:1
bin/storm ui
其中storm实际上是一个bash的脚本文件,storm 会通过以下命令调用bin/storm.py的python文件来执行:1
exec "$PYTHON" "${STORM_BIN_DIR}/storm.py" "$@"
storm.py中,接收到ui参数后,执行的函数代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16def ui():
"""
Syntax: [storm ui]
Launches the UI daemon.
"""
cppaths = [CLUSTER_CONF_DIR]
jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
"-Dlogfile.name=ui.log",
"-Dlog4j.configurationFile=" + os.path.join(get_log4j_conf_dir(), "cluster.xml")
]
exec_storm_class(
"backtype.storm.ui.core",
jvmtype="-server",
daemonName="ui",
jvmopts=jvmopts,
extrajars=[STORM_DIR, CLUSTER_CONF_DIR])
其中首先会调用confvalue函数来获取UI进程执行的参数项,confvalue代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17def confvalue(name, extrapaths, daemon=True):
global CONFFILE
command = [
JAVA_CMD, "-client", get_config_opts(), "-Dstorm.conf.file=" + CONFFILE,
"-cp", get_classpath(extrapaths, daemon), "backtype.storm.command.config_value", name
]
p = sub.Popen(command, stdout=sub.PIPE)
output, errors = p.communicate()
# python 3
if not isinstance(output, str):
output = output.decode('utf-8')
lines = output.split(os.linesep)
for line in lines:
tokens = line.split(" ")
if tokens[0] == "VALUE:":
return " ".join(tokens[1:])
return ""
其中sub.Popen是启动一个子进程,执行command命令,而command数组实际上是执行backtype.storm.command.config_value这个类,并将传入的ui.childopts作为参数。
config_value类的路径为:src/clj/backtype/storm/command/config_value.clj
config_value.clj的代码如下:1
2
3
4(defn -main [^String name]
(let [conf (read-storm-config)]
(println "VALUE:" (conf name))
))
因此,实际上就是读取storm的配置文件storm.conf,并得到ui.childopts参数。默认的ui.childopts参数是”-Xmx768m”,设置虚拟机内存堆的最大可用大小为768M.
因此UI进程启动的顺序为:
- 先调用backtype.storm.command.config_value获取读取到的配置文件中的ui启动项;
- 然后执行java命令行参数后调用backtype.storm.ui.core启动UI进程。
其中配置文件中可以配置的UI参数如下(定义在src/jvm/backtype/storm/Config.java):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//Childopts for Storm UI Java process.
static String UI_CHILDOPTS
//A class implementing javax.servlet.Filter for authenticating/filtering UI requests
static String UI_FILTER
//Initialization parameters for the javax.servlet.Filter
static String UI_FILTER_PARAMS
//The size of the header buffer for the UI in bytes
static String UI_HEADER_BUFFER_BYTES
//Storm UI binds to this host/interface.
static String UI_HOST
//Class name of the HTTP credentials plugin for the UI.
static String UI_HTTP_CREDS_PLUGIN
//Storm UI binds to this port.
static String UI_PORT
backtype.storm.ui.core
这一节看一下src/backtype/storm/ui/core.clj这个文件的主要功能(其中部分关于jetty服务器配置的函数定义在ui/helper.clj文件中):
backtype.storm.ui.core的主要功能是定义并启动一个jetty服务器,用于实现所有API的响应并返回json数据。
首先看main函数, 就是调用start-server! 函数(clojure中!表示函数是突变函数,表示方法或宏不能在STM中安全使用):1
(defn -main [] (start-server!))
start-server!函数定义如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37(defn start-server!
[]
(try
(let [conf *STORM-CONF*
header-buffer-size (int (.get conf UI-HEADER-BUFFER-BYTES))
filters-confs [{:filter-class (conf UI-FILTER)
:filter-params (conf UI-FILTER-PARAMS)}]
https-port (if (not-nil? (conf UI-HTTPS-PORT)) (conf UI-HTTPS-PORT) 0)
https-ks-path (conf UI-HTTPS-KEYSTORE-PATH)
https-ks-password (conf UI-HTTPS-KEYSTORE-PASSWORD)
https-ks-type (conf UI-HTTPS-KEYSTORE-TYPE)
https-key-password (conf UI-HTTPS-KEY-PASSWORD)
https-ts-path (conf UI-HTTPS-TRUSTSTORE-PATH)
https-ts-password (conf UI-HTTPS-TRUSTSTORE-PASSWORD)
https-ts-type (conf UI-HTTPS-TRUSTSTORE-TYPE)
https-want-client-auth (conf UI-HTTPS-WANT-CLIENT-AUTH)
https-need-client-auth (conf UI-HTTPS-NEED-CLIENT-AUTH)]
(storm-run-jetty {:port (conf UI-PORT)
:host (conf UI-HOST)
:https-port https-port
:configurator (fn [server]
(config-ssl server
https-port
https-ks-path
https-ks-password
https-ks-type
https-key-password
https-ts-path
https-ts-password
https-ts-type
https-need-client-auth
https-want-client-auth)
(doseq [connector (.getConnectors server)]
(.setRequestHeaderSize connector header-buffer-size))
(config-filter server app filters-confs))}))
(catch Exception ex
(log-error ex))))
首先从conf配置项中取出http的相应参数,然后将这些配置项封装为map作为参数去调用storm-run-jetty函数(helper.clj),参数有:port,:host,:https-port,:configurator。storm-run-jetty函数定义如下:
1 | ;; Modified from ring.adapter.jetty 1.3.0 |
storm-run-jetty的运行流程:
传入参数为::port,:host,:https-port,:configurator。 其中:configurator是在start-server!中定义的参数,返回的是如下的函数,该函数首先利用https的传入参数初始化一个ssl connector,并将其添加到server中。然后对server中的每一个connector,设置HTTP请求头的buffer size,设置server的过滤器,再设置server的handle为app,主要通过config-filter函数(helper.clj)实现。。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15(fn [server]
(config-ssl server
https-port
https-ks-path
https-ks-password
https-ks-type
https-key-password
https-ts-path
https-ts-password
https-ts-type
https-need-client-auth
https-want-client-auth)
(doseq [connector (.getConnectors server)]
(.setRequestHeaderSize connector header-buffer-size))
(config-filter server app filters-confs))在storm-run-jetty函数中首先调用jetty-create-server函数来创建一个jetty服务器对象:
- 传入的参数为:port,:host,:https-port 。
- 首先初始化一个SelectChannelConnector,设置端口(默认为80),设置host, 设置最大空闲时间(默认为200000ms),然后初始化一个Server, 添加连接器Connector,设置Http Header中发送日期。然后判断参数中是否存在https端口,如果存在的话,将Server中的所有不是SslSocketConnector实例的连接器删除。
- 返回server
- 然后从storm-run-jetty函数参数中取出configurator函数,配置server对象,添加https连接器和过滤器,设置handle;
- 最后启动服务器server.start()。
storm中的jetty服务器的handle的实现是借助于Clojure中的web开发框架ring及其routing库Compojure来实现的,主要通过以下app代码来实现。即首先对main-routes路由函数(后面介绍)添加中间件:包括request参数json化,parse multipart, 在发送request前重载namespace;捕捉错误并json化,然后作为参数调用handler/site函数;
1 | (def app |
compojure.handler.site函数的功能为:传入route函数,添加中间件,返回handle。
1 | Create a handler suitable for a standard website. This adds the |
因此最后得到的app是一个handle。将app作为config-filter的参数执行以下代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15(defn config-filter [server handler filters-confs]
(if filters-confs
(let [servlet-holder (ServletHolder.
(ring.util.servlet/servlet handler))
context (doto (org.eclipse.jetty.servlet.ServletContextHandler. server "/")
(.addServlet servlet-holder "/"))]
(.addFilter context (cors-filter-handler) "/*" (EnumSet/allOf DispatcherType))
(doseq [{:keys [filter-name filter-class filter-params]} filters-confs]
(if filter-class
(let [filter-holder (doto (org.eclipse.jetty.servlet.FilterHolder.)
(.setClassName filter-class)
(.setName (or filter-name filter-class))
(.setInitParameters (or filter-params {})))]
(.addFilter context filter-holder "/*" FilterMapping/ALL))))
(.setHandler server context))))
这段代码的前面主要是添加filter。关键的最后一句server.setHandler(context) 函数会将前面得到的app作为jetty服务器的handle。这里注意:虽然默认的filters-confs参数是 {:filter-class nil, :filter-params nil},但其并不是nil,所以if filters-confs为true。
最后再来看一下main-routes函数,其定义如下。这是ring框架提供的macro定义,主要定义了不同API路径的请求处理和响应返回。
1 | (defroutes main-routes |
其中:
- json-response函数用于将得到的response转换为json格式;
- populate-context!函数用于将servlet-request对象转换为storm的RequestContext对象,即backtype.storm.security.auth.ReqContext (src/jvm/backtype/storm/security/auth/ReqContext.java),主要是用于添加用户认证的插件,插件只需要实现IHttpCredentialsPlugin接口即可。没有做详细了解。
- assert-authorized-user也是验证用户操作权限的;
with-nimbus是一个宏,用于建立一个与thrift server的客户端连接,代码如下, 首先初始化一个ReqContext上下文对象,获取用户名,然后与storm thirift server建立连接,执行body所代表的操作。其中nimbus-sym是一个符号参数,其会在with-nimbus-connection-as-user中被赋值为NimbusClient对象,用于在执行body代码是作为参数;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16;; src/clj/backtype/storm/ui/core.clj
(defmacro with-nimbus
[nimbus-sym & body]
`(let [context# (ReqContext/context)
user# (if (.principal context#) (.getName (.principal context#)))]
(thrift/with-nimbus-connection-as-user
[~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT) user#]
~@body)))
;; src/clj/backtype/storm/thrift.clj
(defmacro with-nimbus-connection-as-user
[[client-sym host port as-user] & body]
`(let [[^Nimbus$Client ~client-sym ^TTransport conn#] (nimbus-client-and-conn ~host ~port ~as-user)]
(try
~@body
(finally (.close conn#)))))比如active API的代码如下:在建立与服务器的连接后得到了NimbusCilent对象nimbus,调用其.getTopologyInfoWithOpts函数,得到TopologyInfo 对象,进而得到topology name,然后调用active方法激活topology;
1
2
3
4
5
6
7
8(with-nimbus nimbus
(let [tplg (->> (doto
(GetInfoOptions.)
(.set_num_err_choice NumErrorsChoice/NONE))
(.getTopologyInfoWithOpts ^Nimbus$Client nimbus id))
name (.get_name tplg)]
(.activate nimbus name)
(log-message "Activating topology '" name "'")))
note:jetty服务器API是org.eclipse.jetty.server这个包提供的。
至此整个UI进程的启动和运行过程已经解析完毕。下一步是解析UI界面上的数据来源。 请参考下一篇博客。
转载请标明文章出处。本文内容为实践后的原创整理,如果侵犯了您的版权,请联系我进行删除,邮件:yanhealth@163.com