Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changes so that event bus only reports step start/end on steps that run. #232

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 25 additions & 24 deletions src/drake/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -485,21 +485,35 @@
(when (not (realized? promise))
(deliver promise 0))))))

(defn- post
[^com.google.common.eventbus.EventBus event-bus event]
(when event-bus (.post event-bus event)))

(defn- sanitize-step
[step]
(dissoc step :function :promise :exception-promise))

(defn- function-for-step
"Returns an anonymous function that can be triggered in its own thread to execute a step.
Each step delivers its own promise. Dependent steps will block on that promise. "
[parse-tree steps promises-indexed step]
[parse-tree event-bus steps promises-indexed step]
(fn []
; wait for parent promises in the tree promises to be delivered
; accumulate successful parent tasks into a sum : successful-parent-steps
(let [{:keys [promise deps exception-promise]} step]
(let [{:keys [promise deps exception-promise]} step
sanitized-step (sanitize-step step)]
(try
(let [successful-parent-steps (reduce +
(map (fn [i]
@(promises-indexed i))
deps))]
(if (= successful-parent-steps (count deps))
(attempt-run-step parse-tree step)
(try
(post event-bus (EventStepBegin sanitized-step))
(attempt-run-step parse-tree step)
(finally (if (realized? (:exception-promise step))
(post event-bus (EventStepError sanitized-step @(:exception-promise step)))
(post event-bus (EventStepEnd sanitized-step)))))
(deliver promise 0)))
(catch Exception e
(deliver exception-promise e))
Expand All @@ -509,48 +523,35 @@

(defn- assoc-function
"Associates a future (anonymous function) for each step"
[parse-tree steps]
[parse-tree event-bus steps]
; for quickly accessing promises via a map :index => :promise
(let [promises-indexed (zipmap (map :index steps) (map :promise steps))]
; associate a :function on each step
(map (fn [step]
(assoc step
:function
(function-for-step parse-tree steps promises-indexed step)))
(function-for-step parse-tree event-bus steps promises-indexed step)))
steps)))

(defn- post
[^com.google.common.eventbus.EventBus event-bus event]
(when event-bus (.post event-bus event)))

(defn- sanitize-step
[step]
(dissoc step :function :promise :exception-promise))

(defn- trigger-futures-helper
[jobs lazy-steps state-atom event-bus]
[jobs lazy-steps state-atom]
(let [semaphore (new Semaphore jobs true)]
(loop [steps lazy-steps]
(.acquire semaphore)
(when (seq steps)
(let [step (first steps)
sanitized-step (sanitize-step step)]
(let [step (first steps)]
(future (try
(post event-bus (EventStepBegin sanitized-step))
((:function step))
(finally
(swap! state-atom update-state-atom-when-step-finishes step)
(when (realized? (:exception-promise step))
(post event-bus (EventStepError sanitized-step @(:exception-promise step))))
(post event-bus (EventStepEnd sanitized-step))
(.release semaphore))))
(recur (rest steps)))))))

(defn- trigger-futures
"Run all the steps in (jobs) number of threads"
[jobs steps event-bus]
[jobs steps]
(let [state-atom (create-state-atom steps)]
(trigger-futures-helper jobs (lazy-step-list state-atom) state-atom event-bus)))
(trigger-futures-helper jobs (lazy-step-list state-atom) state-atom)))

(defn- await-promises
"waits for all the promises to be fullfilled otherwise
Expand Down Expand Up @@ -586,11 +587,11 @@
steps-data
assoc-exception-promise
assoc-promise
(assoc-function parse-tree))]
(assoc-function parse-tree event-bus))]

(post event-bus (EventWorkflowBegin steps-data))

(trigger-futures jobs steps-future event-bus)
(trigger-futures jobs steps-future)

(let [successful-steps (await-promises steps-future)]
(info (format "Done (%d steps run)." successful-steps))
Expand Down