源码阅读之storm操作zookeeper

网友投稿 272 2023-03-26


源码阅读之storm操作zookeeper

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState。

clojure中的protocol可以看成java中的接口,封装了一组方法。ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协议定义如下:

ClusterState协议

(defprotocol ClusterState

(set-ephemeral-node [this path data])

(delete-node [this path])

(create-sequential [this path data])

;; if node does not exist, create persistent with this data

(set-data [this path data])

(get-data [this path watch?])

(get-version [this path watch?])

(get-data-with-version [this path watch?])

(get-children [this path watch?])

(mkdirs [this path])

(close [this])

(register [this callback])

(unregister [this id]))

StormClusterState协议封装了一组storm与zookeeper进行交互的函数,可以将StormClusterState协议中的函数看成ClusterState协议中函数的"组合"。StormClusterState协议定义如下:

StormClusterState协议

(defprotocol StormClusterState

(assignments [this callback])

(assignment-info [this storm-id callback])

(assignment-info-with-version [this storm-id callback])

(assignment-version [this storm-id callback])

(active-storms [this])

(storm-base [this storm-id callback])

(get-worker-heartbeat [this storm-id node port])

(executor-beats [this storm-id executor->node+port])

(supervisors [this callback])

(supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist

(setup-heartbeats! [this storm-id])

(teardown-heartbeats! [this storm-id])

(teardown-topology-errors! [this storm-id])

(heartbeat-storms [this])

(error-topologies [this])

(worker-heartbeat! [this storm-id node port info])

(remove-worker-heartbeat! [this storm-id node port])

(supervisor-heartbeat! [this supervisor-id info])

(activate-storm! [this storm-id storm-base])

(update-storm! [this storm-id new-elems])

(remove-storm-base! [this storm-id])

(set-assignment! [this storm-id info])

(remove-storm! [this storm-id])

(report-error [this storm-id task-id node port error])

(errors [this storm-id task-id])

(disconnect [this]))

命名空间backtype.storm.cluster除了定义ClusterState和StormClusterState这两个重要协议外,还定义了两个重要函数:mk-distributed-cluster-state和mk-storm-cluster-state。

mk-distributed-cluster-state函数如下:

该函数返回一个实现了ClusterState协议的对象,通过这个对象就可以与zookeeper进行交互了。

mk-distributed-cluster-state函数

(defn mk-distributed-cluster-state

;; conf绑定了storm.yaml中的配置信息,是一个map对象

[conf]

;; zk绑定一个zk client,Storm使用CuratorFramework与Zookeeper进行交互

(let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf conf)]

;; 创建storm集群在zookeeper上的根目录,默认值为/storm

(zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))

(.close zk))

;; callbacks绑定回调函数集合,是一个map对象

(let [callbacks (atom {})

;; active标示zookeeper集群状态

active (atom true)

;; zk重新绑定新的zk client,该zk client设置了watcher,这样当zookeeper集群的状态发生变化时,zk server会给zk client发送相应的event,zk client设置的watcher会调用callbacks中相应回调函数来处理event

;; 启动nimbus时,callbacks是一个空集合,所以nimbus端收到event后不会调用任何回调函数;但是启动supervisor时,callbacks中注册了回调函数,所以当supervisor收到zk server发送的event后,会调用相应的回调函数

;; mk-client函数定义在zookeeper.clj文件中,请参见其定义部分

zk (zk/mk-client conf

(conf STORM-ZOOKEEPER-SERVERS)

(conf STORM-ZOOKEEPER-PORT)

:auth-conf conf

:root (conf STORM-ZOOKEEPER-ROOT)

;; :watcher绑定一个函数,指定zk client的默认watcher函数,state标示当前zk client的状态;type标示事件类型;path标示zookeeper上产生该事件的znode

;; 该watcher函数主要功能就是执行callbacks集合中的函数,callbacks集合中的函数是在mk-storm-cluster-state函数中通过调用ClusterState的register函数添加的

:watcher (fn [state type path]

(when @active

http://(when-not (= :connected state)

(log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))

(when-not (= :none type)

(doseq [callback (vals @callbacks)]

(callback type path))))))]

;; reify相当于java中的implements,这里表示实现一个协议

(reify

ClusterState

;; register函数用于将回调函数加入callbacks中,key是一个32位的标识

(register

[this callback]

(let [id (uuid)]

(swap! callbacks assoc id callback)

id))

;; unregister函数用于将指定key的回调函数从callbacks中删除

(unregister

[this id]

(swap! callbacks dissoc id))

;; 在zookeeper上添加一个临时节点

(set-ephemeral-node

[this path data]

(zk/mkdirs zk (parent-path path))

(if (zk/exists zk path false)

(try-cause

(zk/set-data zk path data) ; should verify that it's ephemeral

(catch KeeperException$NoNodeException e

(log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")

(zk/create-node zk path data :ephemeral)

))

(zk/create-node zk path data :ephemeral)))

;; 在zookeeper上添加一个顺序节点

(create-sequential

[this path data]

(zk/create-node zk path data :sequential))

;; 修改某个节点数据

(set-data

[this path data]

;; note: this does not turn off any existing watches

(if (zk/exists zk path false)

(zk/set-data zk path data)

(do

(zk/mkdirs zk (parent-path path))

(zk/create-node zk path data :persistent))))

;; 删除指定节点

(delete-node

[this path]

(zk/delete-recursive zk path))

;; 获取指定节点数据。path标示节点路径;watch?是一个布尔类型值,表示是否需要对该节点进行"观察",如果watch?=true,当调用set-data函数修改该节点数据后,

;; 会给zk client发送一个事件,zk client接收事件后,会调用创建zk client时指定的默认watcher函数(即:watcher绑定的函数)

(get-data

[this psxghRcnHath watch?]

(zk/get-data zk path watch?))

;; 与get-data函数的区别就是获取指定节点数据的同时,获取节点数据的version,version表示节点数据修改的次数

(get-data-with-version

[this path watch?]

(zk/get-data-with-version zk path watch?))

;; 获取指定节点的version,watch?的含义与get-data函数中的watch?相同

(get-version

[this path watch?]

(zk/get-version zk path watch?))

;; 获取指定节点的子节点列表,watch?的含义与get-data函数中的watch?相同

(get-children

[this path watch?]

(zk/get-children zk path watch?))

;; 在zookeeper上创建一个节点

(mkdirs

[this path]

(zk/mkdirs zk path))

;; 关闭zk client

(close

[this]

(reset! active false)

(.close zk)))))

mk-storm-cluster-state函数定义如下:

mk-storm-cluster-state函数非常重要,该函数返回一个实现了StormClusterState协议的实例,通过该实例storm就可以更加方便与zookeeper进行交互。

在启动nimbus和supervisor的函数中均调用了mk-storm-cluster-state函数。关于nimbus和supervisor的启动将在之后的文章中介绍。

mk-storm-cluster-state函数

(defn mk-storm-cluster-state

[cluster-state-spec]

;; satisfies?谓词相当于java中的instanceof,判断cluster-state-spec是不是ClusterState实例

(let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)

[false cluster-state-spec]

[true (mk-distributed-cluster-state cluster-state-spec)])

;; 绑定topology id->回调函数的map,当/assignments/{topology id}数据发生变化时,zk client执行assignment-info-callback中topology id所对应的回调函数

assignment-info-callback (atom {})

;; assignment-info-with-version-callback与assignment-info-callback类似

assignment-info-with-version-callback (atom {})

;; assignment-version-callback与assignments-callback类似

assignment-version-callback (atom {})

;; 当/supervisors标示的znode的子节点发生变化时,zk client执行supervisors-callback指向的函数

supervisors-callback (atom nil)

;; 当/assignments标示的znode的子节点发生变化时,zk client执行assignments-callback指向的函数

assignments-callback (atom nil)

;; 当/storms/{topology id}标示的znode的数据发生变化时,zk client执行storm-base-callback中topology id所对应的回调函数

storm-base-callback (atom {})

;; register函数将"回调函数(fn ...)"添加到cluster-state的callbacks集合中,并返回标示该回调函数的uuid

state-id (register

cluster-state

;; 定义"回调函数",type标示事件类型,path标示znode

(fn [type path]

;; subtree绑定路径前缀如"assignments"、"storms"、"supervisors"等,args存放topology id

(let [[subtree & args] (tokenize-path path)]

;; condp相当于java中的switch

(condp = subtree

;; 当subtree="assignments"时,如果args为空,说明是/assignments的子节点发生变化,执行assignments-callback指向的回调函数,否则

;; 说明/assignments/{topology id}标示的节点数据发生变化,执行assignment-info-callback指向的回调函数

ASSIGNMENTS-ROOT (if (empty? args)

(issue-callback! assignments-callback)

(issue-map-callback! assignment-info-callback (first args)))

;; 当subtree="supervisors"时,说明是/supervisors的子节点发生变化,执行supervisors-callback指向的回调函数

SUPERVISORS-ROOT (issue-callback! supervisors-callback)

;; 当subtree="storms"时,说明是/storms/{topology id}标示的节点数据发生变化,执行storm-base-callback指向的回调函数

STORMS-ROOT (issue-map-callback! storm-base-callback (first args))

;; this should never happen

(exit-process! 30 "Unknown callback for subtree " subtree args)))))]

;; 在zookeeper上创建storm运行topology所必需的znode

(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]]

(mkdirs cluster-state p))

;; 返回一个实现StormClusterState协议的实例

(reify

StormClusterState

;; 获取/assignments的子节点列表,如果callback不为空,将其赋值给assignments-callback,并对/assignments添加"节点观察"

(assignments

[this callback]

(when callback

(reset! assignments-callback callback))

(get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))

;; 获取/assignments/{storm-id}节点数据,即storm-id的分配信息,如果callback不为空,将其添加到assignment-info-callback中,并对/assignments/{storm-id}添加"数据观察"

(assignment-info

[this storm-id callback]

(when callback

(swap! assignment-info-callback assoc storm-id callback))

(maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))

;; 获取/assignments/{storm-id}节点数据包括version信息,如果callback不为空,将其添加到assignment-info-with-version-callback中,并对/assignments/{storm-id}添加"数据观察"

(assignment-info-with-version

[this storm-id callback]

(when callback

(swap! assignment-info-with-version-callback assoc storm-id callback))

(let [{data :data version :version}

(get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]

{:data (maybe-deserialize data)

:version version}))

;; 获取/assignments/{storm-id}节点数据的version信息,如果callback不为空,将其添加到assignment-version-callback中,并对/assignments/{storm-id}添加"数据观察"

(assignment-version

[this storm-id callback]

(when callback

(swap! assignment-version-callback assoc storm-id callback))

(get-version cluster-state (assignment-path storm-id) (not-nil? callback)))

;; 获取storm集群中正在运行的topology id即/storms的子节点列表

(active-storms

[this]

(get-children cluster-state STORMS-SUBTREE false))

;; 获取storm集群中所有有心跳的topology id即/workerbeats的子节点列表

(heartbeat-storms

[this]

(get-children cluster-state WORKERBEATS-SUBTREE false))

;; 获取所有有错误的topology id即/errors的子节点列表

(error-topologies

[this]

(get-children cluster-state ERRORS-SUBTREE false))

;; 获取指定storm-id进程的心跳信息,即/workerbeats/{storm-id}/{node-port}节点数据

(get-worker-heartbeat

[this storm-id node port]

(-> cluster-state

(get-data (workerbeat-path storm-id node port) false)

maybe-deserialize))

;; 获取指定进程中所有线程的心跳信息

(executor-beats

[this storm-id executor->node+port]

;; need to take executor->node+port in explicitly so that we don't run into a situation where a

;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats

;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,

;; we avoid situations like that

(let [node+port->executors (reverse-map executor->node+port)

all-heartbeats (for [[[node port] executors] node+port->executors]

(->> (get-worker-heartbeat this storm-id node port)

(convert-executor-beats executors)

))]

(apply merge all-heartbeats)))

;; 获取/supervisors的子节点列表,如果callback不为空,将其赋值给supervisors-callback,并对/supervisors添加"节点观察"

(supervisors

[this callback]

(when callback

(reset! supervisors-callback callback))

(get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))

;; 获取/supervisors/{supervisor-id}节点数据,即supervisor的心跳信息

(supervisor-info

[this supervisor-id]

(maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))

;; 设置进程心跳信息

(worker-heartbeat!

[this storm-id node port info]

(set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info)))

;; 删除进程心跳信息

(remove-worker-heartbeat!

[this storm-id node port]

(delete-node cluster-state (workerbeat-path storm-id node port)))

;; 创建指定storm-id的topology的用于存放心跳信息的节点

(setup-heartbeats!

[this storm-id]

(mkdirs cluster-state (workerbeat-storm-root storm-id)))

;; 删除指定storm-id的topology的心跳信息节点

(teardown-heartbeats!

[this storm-id]

(try-cause

(delete-node cluster-state (workerbeat-storm-root storm-id))

(catch KeeperException e

(log-warn-error e "Could not teardown heartbeats for " storm-id))))

;; 删除指定storm-id的topology的错误信息节点

(teardown-topology-errors!

[this storm-id]

(try-cause

(delete-node cluster-state (error-storm-root storm-id))

(catch KeeperException e

(log-warn-error e "Could not teardown errors for " storm-id))))

;; 创建临时节点存放supervisor的心跳信息

(supervisor-heartbeat!

[this supervisor-id info]

(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info)))

;; 创建/storms/{storm-id}节点

(activate-storm!

[this storm-id storm-base]

(set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)))

;; 更新topology对应的StormBase对象,即更新/storm/{storm-id}节点

(update-storm!

[this storm-id new-elems]

;; base绑定storm-id在zookeeper上的StormBase对象

(let [base (storm-base this storm-id nil)

;; executors绑定component名称->组件并行度的map

executors (:component->executors base)

;; new-elems绑定合并后的组件并行度map,update函数将组件新并行度map合并到旧map中

new-elems (update new-elems :component->executors (partial merge executors))]

;; 更新StormBase对象中的组件并行度map,并写入zookeeper的/storms/{storm-id}节点

(set-data cluster-state (storm-path storm-id)

(-> base

(merge new-elems)

Utils/serialize))))

;; 获取storm-id的StormBase对象,即读取/storms/{storm-id}节点数据,如果callback不为空,将其赋值给storm-base-callback,并为/storms/{storm-id}节点添加"数据观察"

(storm-base

[this storm-id callback]

(when callback

(swap! storm-base-callback assoc storm-id callback))

(maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))

;; 删除storm-id的StormBase对象,即删除/storms/{storm-id}节点

(remove-storm-base!

[this storm-id]

(delete-node cluster-state (storm-path storm-id)))

;; 更新storm-id的分配信息,即更新/assignments/{storm-id}节点数据

(set-assignment!

[this storm-id info]

(set-data cluster-state (assignment-path storm-id) (Utils/serialize info)))

;; 删除storm-id的分配信息,同时删除其StormBase信息,即删除/assignments/{storm-id}节点和/storms/{storm-id}节点

(remove-storm!

[this storm-id]

(delete-node cluster-state (assignment-path storm-id))

(remove-storm-base! this storm-id))

;; 将组件异常信息写入zookeeper

(report-error

[this storm-id component-id node port error]

;; path绑定"/errors/{storm-id}/{component-id}"

(let [path (error-path storm-id component-id)

;; data绑定异常信息,包括异常时间、异常堆栈信息、主机和端口

data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}

;; 创建/errors/{storm-id}/{component-id}节点

_ (mkdirs cluster-state path)

;; 创建/errors/{storm-id}/{component-id}的子顺序节点,并写入异常信息

_ (create-sequential cluster-state (str path "/e") (Utils/serialize data))

;; to-kill绑定除去顺序节点编号最大的前10个节点的剩余节点的集合

to-kill (->> (get-children cluster-state path false)

(sort-by parse-error-path)

reverse

(drop 10))]

;; 删除to-kill中包含的节点

(doseq [k to-kill]

(delete-node cluster-state (str path "/" k)))))

;; 得到给定的storm-id component-id下的异常信息

(errors

[this storm-id component-id]

(let [path (error-path storm-id component-id)

_ (mkdirs cluster-state path)

children (get-children cluster-state path false)

errors (dofor [c children]

(let [data (-> (get-data cluster-state (str path "/" c) fhttp://alse)

maybe-deserialize)]

(when data

(struct TaskError (:error data) (:time-secs data) (:host data) (:port data))

)))

]

(->> (filter not-nil? errors)

(sort-by (comp - :time-secs)))))

;; 关闭连接,在关闭连接前,将回调函数从cluster-state的callbacks中删除

(disconnect

[this]

(unregister cluster-state state-id)

(when solo?

(close cluster-state))))))

zookeeper.clj中mk-client函数

mk-client函数创建一个CuratorFramework实例,为该实例注册了CuratorListener,当一个后台操作完成或者指定的watch被触发时将会执行CuratorListener中的eventReceived()。eventReceived中调用的wacher函数就是mk-distributed-cluster-state中:watcher绑定的函数。

(defnk mk-client

[conf servers port

:root ""

:watcher default-watcher

:auth-conf nil]

(let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]

(.. fk

(getCuratorListenable)

(addListener

(reify CuratorListener

(^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]

(when (= (.getType e) CuratorEventType/WATCHED)

(let [^WatchedEvent event (.getWatchedEvent e)]

(watcher (zk-keeper-states (.getState event))

(zk-event-types (.getType event))

(.getPath event))))))))

(.start fk)

fk))

以上就是storm与zookeeper进行交互的源码分析,我觉得最重要的部分就是如何给zk client添加"wacher",storm的很多功能都是通过zookeeper的wacher机制实现的,如"分配信息领取"。添加"wacher"大概分为以下几个步骤:

mk-distributed-cluster-state函数创建了一个zk client,并通过:watcher给该zk client指定了"wacher"函数,这个"wacher"函数只是简单调用ClusterState的callbacks集合中的函数,这样这个"wacher"函数执行 哪些函数将由ClusterState实例决定

ClusterState实例提供register函数来更新callbacks集合,ClusterState实例被传递给了mk-storm-cluster-state函数,在mk-storm-cluster-state中调用register添加了一个函数(fn [type path] ... ),这个函数实现了"watcher"函数的全部逻辑

mk-storm-cluster-state中注册的函数执行的具体内容由StormClusterState实例决定,对zookeeper节点添加"观察"也是通过StormClusterState实例实现的,这样我们就可以通过StormClusterState实例对我们感兴趣的节点添加"观察"和"回调函数",当节点或节点数据发生变化后,zk server就会给zk client发送"通知",zk client中的"wather"函数将被调用,进而我们注册的"回到函数"将被执行。

总结

这部分源码与zookeeper联系十分紧密,涉及了很多zookeeper中的概念和特性,如"数据观察"和"节点观察"等,有关zookeeper的wacher机制请参考://jb51.net/article/124295.htm,storm并没有直接使用zookeeper的api,而是使用Curator框架,Curator框架简化了访问zookeeper的操作。关于Curator框架请参考://jb51.net/article/125785.htm。

以上就是本文关于源码阅读之storm操作zookeeper-cluster.clj的全部内容了,感兴趣的朋友可以参阅:zookeeper watch机制的理解、apache zookeeper使用方法实例详解、为zookeeper配置相应的acl权限等,希望对大家有所帮助。感谢各位的阅读!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java求解集合的子集的实例
下一篇:浅谈vue路径优化之resolve
相关文章

 发表评论

暂时没有评论,来抢沙发吧~