Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist dependencies so that Workflow#configure is not required on load #118

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,25 @@ def find_job_by_klass(workflow_id, job_name)
end

def workflow_from_hash(hash, nodes = [])
flow = hash[:klass].constantize.new(
*hash[:arguments],
**hash[:kwargs],
globals: hash[:globals]
)
flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]

flow.jobs = nodes.map do |node|
jobs = nodes.map do |node|
Gush::Job.from_hash(node)
end

flow
internal_state = {
persisted: true,
jobs: jobs,
# For backwards compatibility, setup can only be skipped for a persisted
# workflow if there is no data missing from the persistence.
# 2024-07-23: dependencies added to persistence
skip_setup: !hash[:dependencies].nil?
}.merge(hash)

hash[:klass].constantize.new(
*hash[:arguments],
**hash[:kwargs],
globals: hash[:globals],
internal_state: internal_state
)
end

def redis
Expand Down
18 changes: 10 additions & 8 deletions lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

module Gush
class Workflow
attr_accessor :id, :jobs, :stopped, :persisted, :arguments, :kwargs, :globals
attr_accessor :id, :jobs, :dependencies, :stopped, :persisted, :arguments, :kwargs, :globals

def initialize(*args, globals: nil, **kwargs)
@id = id
@jobs = []
@dependencies = []
@persisted = false
@stopped = false
def initialize(*args, globals: nil, internal_state: {}, **kwargs)
@arguments = args
@kwargs = kwargs
@globals = globals || {}

setup
@id = internal_state[:id] || id
@jobs = internal_state[:jobs] || []
@dependencies = internal_state[:dependencies] || []
@persisted = internal_state[:persisted] || false
@stopped = internal_state[:stopped] || false

setup unless internal_state[:skip_setup]
end

def self.find(id)
Expand Down Expand Up @@ -179,6 +180,7 @@ def to_hash
arguments: @arguments,
kwargs: @kwargs,
globals: @globals,
dependencies: @dependencies,
total: jobs.count,
finished: jobs.count(&:finished?),
klass: name,
Expand Down
4 changes: 2 additions & 2 deletions spec/features/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ def configure
INTERNAL_CONFIGURE_SPY = double('configure spy')
expect(INTERNAL_SPY).to receive(:some_method).exactly(110).times

# One time when persisting, second time when reloading in the spec
expect(INTERNAL_CONFIGURE_SPY).to receive(:some_method).exactly(2).times
# One time when persisting; reloading does not call configure again
expect(INTERNAL_CONFIGURE_SPY).to receive(:some_method).exactly(1).time

class SimpleJob < Gush::Job
def perform
Expand Down
3 changes: 3 additions & 0 deletions spec/gush/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
it "returns Workflow object" do
expected_workflow = TestWorkflow.create
workflow = client.find_workflow(expected_workflow.id)
dependencies = workflow.dependencies

expect(workflow.id).to eq(expected_workflow.id)
expect(workflow.persisted).to eq(true)
expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name))
expect(workflow.dependencies).to eq(dependencies)
end

context "when workflow has parameters" do
Expand Down
37 changes: 37 additions & 0 deletions spec/gush/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ def configure(*args, **kwargs)
flow = TestWorkflow.new(globals: { global1: 'foo' })
expect(flow.globals[:global1]).to eq('foo')
end

it "accepts internal_state" do
flow = TestWorkflow.new

internal_state = {
id: flow.id,
jobs: flow.jobs,
dependencies: flow.dependencies,
persisted: true,
stopped: true,
}

flow_copy = TestWorkflow.new(internal_state: internal_state)

expect(flow_copy.id).to eq(flow.id)
expect(flow_copy.jobs).to eq(flow.jobs)
expect(flow_copy.dependencies).to eq(flow.dependencies)
expect(flow_copy.persisted).to eq(true)
expect(flow_copy.stopped).to eq(true)
end

it "does not call #configure if needs_setup is false" do
INTERNAL_SETUP_SPY = double('configure spy')
klass = Class.new(Gush::Workflow) do
def configure(*args)
INTERNAL_SETUP_SPY.some_method
end
end

expect(INTERNAL_SETUP_SPY).to receive(:some_method).never

flow = TestWorkflow.new(internal_state: { needs_setup: false })
end
end

describe "#status" do
Expand Down Expand Up @@ -118,6 +151,10 @@ def configure(*args)
"started_at" => nil,
"finished_at" => nil,
"stopped" => false,
"dependencies" => [{
"from" => "FetchFirstJob",
"to" => job_with_id("PersistFirstJob"),
}],
"arguments" => ["arg1", "arg2"],
"kwargs" => {"arg3" => 123},
"globals" => {}
Expand Down