From c96287f7bb79c18ffb932433d77cb44bd8c5a825 Mon Sep 17 00:00:00 2001 From: Noah Harrison Date: Tue, 23 Jul 2024 10:30:19 -0400 Subject: [PATCH] Persist dependencies so that `Workflow#configure` is not required on load Previously, `Workflow#configure` was called every time a Workflow was instantiated. This could create broken dependency graphs, e.g. if a configure method sets up job dependencies based on some mutable data like a timestamp argument or a database value. Instead, serialize workflow dependencies along with the rest of a workflow's data and reload it via `Client#workflow_from_hash` and tell `Workflow#initialize` not to run setup/configure. Note that for backwards compatibility with workflows persisted before this change, the setup method will still be called if dependencies in the deserialized hash are nil. --- lib/gush/client.rb | 27 +++++++++++++--------- lib/gush/workflow.rb | 18 ++++++++------- spec/features/integration_spec.rb | 4 ++-- spec/gush/client_spec.rb | 3 +++ spec/gush/workflow_spec.rb | 37 +++++++++++++++++++++++++++++++ 5 files changed, 68 insertions(+), 21 deletions(-) diff --git a/lib/gush/client.rb b/lib/gush/client.rb index c8e36d6..20c273a 100644 --- a/lib/gush/client.rb +++ b/lib/gush/client.rb @@ -183,20 +183,25 @@ def find_job_by_klass(workflow_id, job_name) end def workflow_from_hash(hash, nodes = []) - flow = hash[:klass].constantize.new( - *hash[:arguments], - **hash[:kwargs], - globals: hash[:globals] - ) - flow.jobs = [] - flow.stopped = hash.fetch(:stopped, false) - flow.id = hash[:id] - - flow.jobs = nodes.map do |node| + jobs = nodes.map do |node| Gush::Job.from_hash(node) end - flow + internal_state = { + persisted: true, + jobs: jobs, + # For backwards compatibility, setup can only be skipped for a persisted + # workflow if there is no data missing from the persistence. + # 2024-07-23: dependencies added to persistence + skip_setup: !hash[:dependencies].nil? + }.merge(hash) + + hash[:klass].constantize.new( + *hash[:arguments], + **hash[:kwargs], + globals: hash[:globals], + internal_state: internal_state + ) end def redis diff --git a/lib/gush/workflow.rb b/lib/gush/workflow.rb index e00c77f..55ade08 100644 --- a/lib/gush/workflow.rb +++ b/lib/gush/workflow.rb @@ -2,19 +2,20 @@ module Gush class Workflow - attr_accessor :id, :jobs, :stopped, :persisted, :arguments, :kwargs, :globals + attr_accessor :id, :jobs, :dependencies, :stopped, :persisted, :arguments, :kwargs, :globals - def initialize(*args, globals: nil, **kwargs) - @id = id - @jobs = [] - @dependencies = [] - @persisted = false - @stopped = false + def initialize(*args, globals: nil, internal_state: {}, **kwargs) @arguments = args @kwargs = kwargs @globals = globals || {} - setup + @id = internal_state[:id] || id + @jobs = internal_state[:jobs] || [] + @dependencies = internal_state[:dependencies] || [] + @persisted = internal_state[:persisted] || false + @stopped = internal_state[:stopped] || false + + setup unless internal_state[:skip_setup] end def self.find(id) @@ -179,6 +180,7 @@ def to_hash arguments: @arguments, kwargs: @kwargs, globals: @globals, + dependencies: @dependencies, total: jobs.count, finished: jobs.count(&:finished?), klass: name, diff --git a/spec/features/integration_spec.rb b/spec/features/integration_spec.rb index e7544d9..45c96a2 100644 --- a/spec/features/integration_spec.rb +++ b/spec/features/integration_spec.rb @@ -169,8 +169,8 @@ def configure INTERNAL_CONFIGURE_SPY = double('configure spy') expect(INTERNAL_SPY).to receive(:some_method).exactly(110).times - # One time when persisting, second time when reloading in the spec - expect(INTERNAL_CONFIGURE_SPY).to receive(:some_method).exactly(2).times + # One time when persisting; reloading does not call configure again + expect(INTERNAL_CONFIGURE_SPY).to receive(:some_method).exactly(1).time class SimpleJob < Gush::Job def perform diff --git a/spec/gush/client_spec.rb b/spec/gush/client_spec.rb index 2fe72f4..415dd08 100644 --- a/spec/gush/client_spec.rb +++ b/spec/gush/client_spec.rb @@ -19,9 +19,12 @@ it "returns Workflow object" do expected_workflow = TestWorkflow.create workflow = client.find_workflow(expected_workflow.id) + dependencies = workflow.dependencies expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.persisted).to eq(true) expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name)) + expect(workflow.dependencies).to eq(dependencies) end context "when workflow has parameters" do diff --git a/spec/gush/workflow_spec.rb b/spec/gush/workflow_spec.rb index 720323d..8bbb413 100644 --- a/spec/gush/workflow_spec.rb +++ b/spec/gush/workflow_spec.rb @@ -32,6 +32,39 @@ def configure(*args, **kwargs) flow = TestWorkflow.new(globals: { global1: 'foo' }) expect(flow.globals[:global1]).to eq('foo') end + + it "accepts internal_state" do + flow = TestWorkflow.new + + internal_state = { + id: flow.id, + jobs: flow.jobs, + dependencies: flow.dependencies, + persisted: true, + stopped: true, + } + + flow_copy = TestWorkflow.new(internal_state: internal_state) + + expect(flow_copy.id).to eq(flow.id) + expect(flow_copy.jobs).to eq(flow.jobs) + expect(flow_copy.dependencies).to eq(flow.dependencies) + expect(flow_copy.persisted).to eq(true) + expect(flow_copy.stopped).to eq(true) + end + + it "does not call #configure if needs_setup is false" do + INTERNAL_SETUP_SPY = double('configure spy') + klass = Class.new(Gush::Workflow) do + def configure(*args) + INTERNAL_SETUP_SPY.some_method + end + end + + expect(INTERNAL_SETUP_SPY).to receive(:some_method).never + + flow = TestWorkflow.new(internal_state: { needs_setup: false }) + end end describe "#status" do @@ -118,6 +151,10 @@ def configure(*args) "started_at" => nil, "finished_at" => nil, "stopped" => false, + "dependencies" => [{ + "from" => "FetchFirstJob", + "to" => job_with_id("PersistFirstJob"), + }], "arguments" => ["arg1", "arg2"], "kwargs" => {"arg3" => 123}, "globals" => {}