Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE[581] Add Built-in Execution Context Variables #654

Merged
merged 18 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ import (
"syscall"
"time"

"github.com/daguflow/dagu/internal/logger"
"github.com/daguflow/dagu/internal/persistence"

"github.com/daguflow/dagu/internal/client"
"github.com/daguflow/dagu/internal/constants"
"github.com/daguflow/dagu/internal/dag"
"github.com/daguflow/dagu/internal/dag/scheduler"
"github.com/daguflow/dagu/internal/logger"
"github.com/daguflow/dagu/internal/mailer"
"github.com/daguflow/dagu/internal/persistence"
"github.com/daguflow/dagu/internal/persistence/model"
"github.com/daguflow/dagu/internal/sock"
)
Expand Down Expand Up @@ -202,11 +202,7 @@ func (a *Agent) Run(ctx context.Context) error {
}()

// Start the DAG execution.
lastErr := a.scheduler.Schedule(
dag.NewContext(ctx, a.dag, a.dataStore.DAGStore()),
a.graph,
done,
)
lastErr := a.scheduler.Schedule(dag.NewContext(ctx, a.dag, a.dataStore.DAGStore()), a.graph, done)

// Update the finished status to the history database.
finishedStatus := a.Status()
Expand Down Expand Up @@ -313,12 +309,28 @@ func (a *Agent) HandleHTTP(w http.ResponseWriter, r *http.Request) {
}
}

func (a *Agent) setupEnvironmentVariable() error {
var (
err error
)

if err = os.Setenv(constants.DaguSchedulerLogPathKey, a.logFile); err != nil {
return err
}

return os.Setenv(constants.DaguRequestIDKey, a.requestID)
}

// setup the agent instance for DAG execution.
func (a *Agent) setup() error {
// Lock to prevent race condition.
a.lock.Lock()
defer a.lock.Unlock()

if err := a.setupEnvironmentVariable(); err != nil {
return err
}

a.scheduler = a.newScheduler()
a.reporter = newReporter(
mailer.New(&mailer.NewMailerArgs{
Expand Down Expand Up @@ -383,11 +395,7 @@ func (a *Agent) dryRun() error {

a.logger.Info("Dry-run started", "reqId", a.requestID)

lastErr := a.scheduler.Schedule(
dag.NewContext(context.Background(), a.dag, a.dataStore.DAGStore()),
a.graph,
done,
)
lastErr := a.scheduler.Schedule(dag.NewContext(context.Background(), a.dag, a.dataStore.DAGStore()), a.graph, done)

a.reporter.report(a.Status(), lastErr)

Expand Down
6 changes: 5 additions & 1 deletion internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,9 @@
package constants

var (
Version = "0.0.1"
Version = "0.0.1"
StepDaguExecutionLogPathKeySuffix = "DAGU_EXECUTION_LOG_PATH"
StepDaguExecutionLogPathKeyPrefix = "STEP"
DaguSchedulerLogPathKey = "DAGU_SCHEDULER_LOG_PATH"
DaguRequestIDKey = "DAGU_REQUEST_ID"
)
32 changes: 28 additions & 4 deletions internal/dag/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package dag
import (
"context"
"errors"
"strconv"
"strings"

"github.com/daguflow/dagu/internal/constants"
)

// Finder finds a DAG by name.
Expand All @@ -30,17 +34,15 @@ type Finder interface {
type Context struct {
DAG *DAG
Finder Finder
Envs []string
}

// ctxKey is used as the key for storing the DAG in the context.
type ctxKey struct{}

// NewContext creates a new context with the DAG and Finder.
func NewContext(ctx context.Context, dag *DAG, finder Finder) context.Context {
return context.WithValue(ctx, ctxKey{}, Context{
DAG: dag,
Finder: finder,
})
return context.WithValue(ctx, ctxKey{}, Context{DAG: dag, Finder: finder, Envs: make([]string, 0)})
}

var (
Expand All @@ -56,3 +58,25 @@ func GetContext(ctx context.Context) (Context, error) {
}
return dagCtx, nil
}

func WithDagContext(ctx context.Context, dagContext Context) context.Context {
return context.WithValue(ctx, ctxKey{}, dagContext)
}

func GenGlobalStepLogEnvKey(stepID int) string {
var (
keyBuilder = strings.Builder{}
)

keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeyPrefix)

keyBuilder.WriteString("_")

keyBuilder.WriteString(strconv.Itoa(stepID))

keyBuilder.WriteString("_")

keyBuilder.WriteString(constants.StepDaguExecutionLogPathKeySuffix)

return keyBuilder.String()
}
9 changes: 7 additions & 2 deletions internal/dag/executor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (
"sync"
"syscall"

"github.com/daguflow/dagu/internal/util"

"github.com/daguflow/dagu/internal/dag"
"github.com/daguflow/dagu/internal/util"
)

type commandExecutor struct {
Expand All @@ -40,9 +39,15 @@ func newCommand(ctx context.Context, step dag.Step) (Executor, error) {
if len(step.Dir) > 0 && !util.FileExists(step.Dir) {
return nil, fmt.Errorf("directory %q does not exist", step.Dir)
}
dagContext, err := dag.GetContext(ctx)
if err != nil {
return nil, err
}

cmd.Dir = step.Dir
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, step.Variables...)
cmd.Env = append(cmd.Env, dagContext.Envs...)
step.OutputVariables.Range(func(_, value any) bool {
cmd.Env = append(cmd.Env, value.(string))
return true
Expand Down
22 changes: 20 additions & 2 deletions internal/dag/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (
"sync"
"time"

"golang.org/x/sys/unix"

"github.com/daguflow/dagu/internal/constants"
"github.com/daguflow/dagu/internal/dag"
"github.com/daguflow/dagu/internal/dag/executor"
"github.com/daguflow/dagu/internal/util"
"golang.org/x/sys/unix"
)

// Node is a node in a DAG. It executes a command.
Expand Down Expand Up @@ -129,6 +131,16 @@ func (n *Node) State() NodeState {

// Execute runs the command synchronously and returns error if any.
func (n *Node) Execute(ctx context.Context) error {
dagContext, err := dag.GetContext(ctx)
if err != nil {
return err
}
// set node special log path environment variable
if err = os.Setenv(dag.GenGlobalStepLogEnvKey(n.id), n.data.State.Log); err != nil {
return err
}
dagContext.Envs = append(dagContext.Envs, fmt.Sprintf("%s=%s", constants.StepDaguExecutionLogPathKeySuffix, n.data.State.Log))
ctx = dag.WithDagContext(ctx, dagContext)
cmd, err := n.setupExec(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -367,7 +379,6 @@ func (n *Node) setupLog() error {
n.logWriter = bufio.NewWriter(n.logFile)
return nil
}

func (n *Node) teardown() error {
if n.done {
return nil
Expand Down Expand Up @@ -433,6 +444,13 @@ func (n *Node) init() {
return
}
n.id = getNextNodeID()

n.data.Step.CmdWithArgs = strings.ReplaceAll(
n.data.Step.CmdWithArgs,
constants.StepDaguExecutionLogPathKeySuffix,
dag.GenGlobalStepLogEnvKey(n.id),
)

if n.data.Step.Variables == nil {
n.data.Step.Variables = []string{}
}
Expand Down
16 changes: 10 additions & 6 deletions internal/dag/scheduler/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ import (
"github.com/stretchr/testify/require"
)

func nodeTextCtxWithDagContext() context.Context {
return dag.NewContext(context.Background(), nil, nil)
}

func TestExecute(t *testing.T) {
n := &Node{data: NodeData{
Step: dag.Step{
Command: "true",
OutputVariables: &dag.SyncMap{},
}}}
require.NoError(t, n.Execute(context.Background()))
require.NoError(t, n.Execute(nodeTextCtxWithDagContext()))
require.Nil(t, n.data.State.Error)
}

Expand All @@ -45,7 +49,7 @@ func TestError(t *testing.T) {
Command: "false",
OutputVariables: &dag.SyncMap{},
}}}
err := n.Execute(context.Background())
err := n.Execute(nodeTextCtxWithDagContext())
require.True(t, err != nil)
require.Equal(t, n.data.State.Error, err)
}
Expand All @@ -64,7 +68,7 @@ func TestSignal(t *testing.T) {
}()

n.setStatus(NodeStatusRunning)
err := n.Execute(context.Background())
err := n.Execute(nodeTextCtxWithDagContext())

require.Error(t, err)
require.Equal(t, n.State().Status, NodeStatusCancel)
Expand All @@ -85,7 +89,7 @@ func TestSignalSpecified(t *testing.T) {
}()

n.setStatus(NodeStatusRunning)
err := n.Execute(context.Background())
err := n.Execute(nodeTextCtxWithDagContext())

require.Error(t, err)
require.Equal(t, n.State().Status, NodeStatusCancel)
Expand Down Expand Up @@ -346,7 +350,7 @@ func TestRunScript(t *testing.T) {
require.Equal(t, n.data.Step.Script, string(b))

require.NoError(t, err)
err = n.Execute(context.Background())
err = n.Execute(nodeTextCtxWithDagContext())
require.NoError(t, err)
err = n.teardown()
require.NoError(t, err)
Expand Down Expand Up @@ -383,7 +387,7 @@ func runTestNode(t *testing.T, n *Node) {
err := n.setup(os.Getenv("HOME"),
fmt.Sprintf("test-request-id-%d", rand.Int()))
require.NoError(t, err)
err = n.Execute(context.Background())
err = n.Execute(nodeTextCtxWithDagContext())
require.NoError(t, err)
err = n.teardown()
require.NoError(t, err)
Expand Down
Loading
Loading