Skip to content

Commit

Permalink
Merge pull request #31 from klothoplatform/orchestration-rework
Browse files Browse the repository at this point in the history
orchestration-rework
  • Loading branch information
DavidSeptimus authored Jun 12, 2024
2 parents 4be1069 + 91523c6 commit 167e9c3
Show file tree
Hide file tree
Showing 31 changed files with 1,359 additions and 1,101 deletions.
4 changes: 2 additions & 2 deletions cmd/k2/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ func planCmd() string {
func irCmd(filePath string) string {
ir, err := model.ReadIRFile(filePath)
if err != nil {
return fmt.Sprintf("Error reading IR file: %s", err)
return fmt.Sprintf("error reading IR file: %s", err)
}

res, err := yaml.Marshal(ir)
if err != nil {
return fmt.Sprintf("Error marshalling IR: %s", err)
return fmt.Sprintf("error marshalling IR: %s", err)
}
return string(res)
}
43 changes: 28 additions & 15 deletions cmd/k2/down.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,22 @@ package main

import (
"fmt"
"log"
"os"
"path/filepath"

"github.com/klothoplatform/klotho/pkg/k2/deployment"
"github.com/klothoplatform/klotho/pkg/k2/model"
"github.com/klothoplatform/klotho/pkg/k2/orchestrator"
"github.com/klothoplatform/klotho/pkg/k2/orchestration"
"github.com/klothoplatform/klotho/pkg/k2/pulumi"
"github.com/spf13/cobra"
"go.uber.org/zap"
"log"
"os"
"path/filepath"
)

var downConfig struct {
inputPath string
outputPath string
project string
app string
env string
}

func newDownCmd() *cobra.Command {
Expand All @@ -34,13 +35,15 @@ func newDownCmd() *cobra.Command {
fmt.Println("couldn't convert to absolute path")
os.Exit(1)
}
downConfig.inputPath = absolutePath
downConfig.project = args[1]
downConfig.app = args[2]
downConfig.env = args[3]

if downConfig.outputPath == "" {
(&downConfig).outputPath = filepath.Join(filepath.Dir(absolutePath), ".k2")
}

downCmd(downConfig.outputPath)
downCmd(downConfig)
},
}
flags := downCommand.Flags()
Expand All @@ -49,30 +52,40 @@ func newDownCmd() *cobra.Command {

}

func downCmd(outputPath string) string {
entries, err := os.ReadDir(outputPath)
func downCmd(args struct {
outputPath string
project string
app string
env string
}) string {

projectPath := filepath.Join(args.outputPath, args.project, args.app, args.env)

entries, err := os.ReadDir(projectPath)
if err != nil {
zap.L().Error("failed to read directory", zap.Error(err))
log.Fatalf("failed to read directory: %v", err)
return "failure"
}

var stackReferences []pulumi.StackReference
for _, entry := range entries {
if entry.IsDir() {
constructPath := filepath.Join(projectPath, entry.Name())

stackReference := pulumi.StackReference{
ConstructURN: &model.URN{},
ConstructURN: model.URN{},
Name: entry.Name(),
IacDirectory: filepath.Join(outputPath, entry.Name()),
IacDirectory: constructPath,
}
stackReferences = append(stackReferences, stackReference)
}
}

var o orchestrator.Orchestrator
var o orchestration.Orchestrator
err = o.RunDownCommand(deployment.DownRequest{StackReferences: stackReferences, DryRun: commonCfg.dryRun})

if err != nil {
log.Fatalf("failed to run down command: %v", err)
zap.L().Error("failed to read directory", zap.Error(err))
return "failure"
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/k2/language_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func startPythonClient() *exec.Cmd {
"pipenv", "run", "python", "python_language_host.py",
)
cmd.Dir = "pkg/k2/language_host/python"
// spawn the python process as a subprocess of the CLI so it is guaranteed to be killed when the CLI exits
// spawn the python process as a subprocess of the CLI, so it is guaranteed to be killed when the CLI exits
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}

zap.S().Debugf("Executing: %s for %v", cmd.Path, cmd.Args)
Expand Down
142 changes: 36 additions & 106 deletions cmd/k2/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ import (
"path/filepath"
"time"

"github.com/klothoplatform/klotho/pkg/engine/constraints"
"github.com/klothoplatform/klotho/pkg/k2/constructs"
"github.com/klothoplatform/klotho/pkg/k2/deployment"
pb "github.com/klothoplatform/klotho/pkg/k2/language_host/go"
"github.com/klothoplatform/klotho/pkg/k2/model"
"github.com/klothoplatform/klotho/pkg/k2/orchestrator"
"github.com/klothoplatform/klotho/pkg/k2/pulumi"
"github.com/klothoplatform/klotho/pkg/k2/orchestration"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -48,7 +44,7 @@ func newUpCmd() *cobra.Command {
upConfig.outputPath = filepath.Join(filepath.Dir(absolutePath), ".k2")
}

updCmd(upConfig)
fmt.Println(updCmd(upConfig))
},
}
flags := upCommand.Flags()
Expand All @@ -74,7 +70,14 @@ func updCmd(args struct {
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

defer func(conn *grpc.ClientConn) {
err := conn.Close()
if err != nil {
log.Fatalf("failed to close connection: %v", err)
}
}(conn)

client := pb.NewKlothoServiceClient(conn)

// Wait for the server to be ready
Expand All @@ -94,127 +97,54 @@ func updCmd(args struct {

ir, err := model.ParseIRFile([]byte(res.GetYamlPayload()))
if err != nil {
return fmt.Sprintf("Error reading IR file: %s", err)
return fmt.Sprintf("error reading IR file: %s", err)
}

// Take the IR -- generate and save a state file and stored in the
// output directory, the path should include the environment name and
// the project URN
statefile := filepath.Join(args.outputPath, fmt.Sprintf("%s-%s-state.yaml", ir.ProjectURN, ir.Environment))

appUrn, err := model.ParseURN(ir.AppURN)
if err != nil {
return fmt.Sprintf("error parsing app URN: %s", err)
}

appUrnPath, err := model.UrnPath(*appUrn)
if err != nil {
return fmt.Sprintf("error getting URN path: %s", err)
}
appDir := filepath.Join(args.outputPath, appUrnPath)

// Create the app state directory
if err := os.MkdirAll(appDir, 0755); err != nil {
return fmt.Sprintf("error creating app directory: %s", err)
}

stateFile := filepath.Join(appDir, "state.yaml")

// Create a new state manager
sm := model.NewStateManager(statefile)
sm := model.NewStateManager(stateFile)

// Initialize the state if it doesn't exist
if !sm.CheckStateFileExists() {
sm.InitState(ir)
// Save the state
if err = sm.SaveState(); err != nil {
return fmt.Sprintf("Error saving state: %s", err)
return fmt.Sprintf("error saving state: %s", err)
}
} else {
// Load the state
if err = sm.LoadState(); err != nil {
return fmt.Sprintf("Error loading state: %s", err)
}
}

o := orchestrator.NewOrchestrator(sm, client)

// Apply constraints
for _, c := range ir.Constructs {
var allConstraints constraints.ConstraintList
var id constructs.ConstructId
err = id.FromURN(c.URN)
if err != nil {
return fmt.Sprintf("Error parsing URN: %s", err)
}
constructOutDir := filepath.Join(args.outputPath, id.InstanceId)
inputs := make(map[string]interface{})
for k, v := range c.Inputs {
if v.Status != "" && v.Status != model.Resolved {
zap.S().Warnf("Input %s is not resolved", k)
continue
}

inputs[k] = v.Value
}
ctx := constructs.NewContext(inputs, id)
ci := ctx.EvaluateConstruct()
if ci == nil {
return fmt.Sprintf("Error evaluating construct: %s", err)
}
marshaller := constructs.ConstructMarshaller{Context: ctx, Construct: ci}
cs, err := marshaller.Marshal()
if err != nil {
return fmt.Sprintf("Error marshalling construct: %s", err)
}
allConstraints = append(allConstraints, cs...)

// Marshal constructs to constraints
marshalledConstraints, err := allConstraints.ToConstraints()
if err != nil {
return fmt.Sprintf("Error marshalling constraints: %s", err)
}

// Read existing state
inputGraph, err := orchestrator.ReadInputGraph(constructOutDir)
if err != nil {
return fmt.Sprintf("Error reading input graph: %s", err)
}

// Run the engine
// TODO the engine currently assumes only 1 run globally, so the debug graphs and other files
// will get overwritten with each run. We should fix this.
engineContext, errs := o.RunEngine(orchestrator.EngineRequest{
Provider: "aws",
InputGraph: inputGraph,
Constraints: marshalledConstraints,
OutputDir: constructOutDir,
GlobalTag: "k2",
})
if errs != nil {
zap.S().Errorf("Engine returned with errors: %s", errs)
return fmt.Sprintf("Engine returned with errors: %s", errs)
}

// GenerateIac
err = o.GenerateIac(orchestrator.IacRequest{
PulumiAppName: id.InstanceId,
Context: engineContext,
OutputDir: constructOutDir,
})
if err != nil {
zap.S().Errorf("Error generating IaC: %s", err)
return fmt.Sprintf("Error generating IaC: %s", err)
return fmt.Sprintf("error loading state: %s", err)
}
}

var refs []pulumi.StackReference
for _, c := range ir.Constructs {
var id constructs.ConstructId
err = id.FromURN(c.URN)
if err != nil {
return fmt.Sprintf("Error parsing URN: %s", err)
}
constructOutDir := filepath.Join(args.outputPath, id.InstanceId)
refs = append(refs, pulumi.StackReference{
ConstructURN: c.URN,
Name: id.InstanceId,
IacDirectory: constructOutDir,
AwsRegion: args.region,
})
}

upRequest := deployment.UpRequest{
StackReferences: refs,
DryRun: commonCfg.dryRun,
}
o := orchestration.NewOrchestrator(sm, client, appDir)

err = o.RunUpCommand(upRequest)
err = o.RunUpCommand(ir, commonCfg.dryRun)
if err != nil {
zap.S().Errorf("Error running up command: %s", err)
return fmt.Sprintf("Error running up command: %s", err)
zap.S().Errorf("error running up command: %s", err)
return fmt.Sprintf("error running up command: %s", err)
}

return "success"
Expand Down
31 changes: 1 addition & 30 deletions pkg/k2/constructs/construct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@ package constructs

import (
"fmt"
"strings"

"github.com/klothoplatform/klotho/pkg/construct"
"github.com/klothoplatform/klotho/pkg/k2/model"
)

type (
Construct struct {
Id ConstructId `yaml:"id"`
URN *model.URN `yaml:"id"`
Inputs map[string]any `yaml:"inputs"`
Resources map[string]*Resource `yaml:"resources"`
Edges []*Edge `yaml:"edges"`
Outputs map[string]any `yaml:"outputs"`
}

ConstructId struct {
TemplateId ConstructTemplateId `yaml:"template_id"`
InstanceId string `yaml:"instance_id"`
}

Resource struct {
Id construct.ResourceId `yaml:"id"`
Properties map[string]any `yaml:"properties"`
Expand All @@ -41,25 +34,3 @@ func (e *Edge) PrettyPrint() string {
func (e *Edge) String() string {
return e.PrettyPrint() + " :: " + fmt.Sprintf("%v", e.Data)
}

func (c *ConstructId) FromURN(urn *model.URN) error {
if urn.Type != "construct" {
return fmt.Errorf("invalid URN type: %s", urn.Type)
}

parts := strings.Split(urn.Subtype, ".")
packageName := strings.Join(parts[:len(parts)-1], ".")
constructType := parts[len(parts)-1]

if packageName == "" || constructType == "" {
return fmt.Errorf("invalid URN subtype: %s", urn.Subtype)
}

c.TemplateId = ConstructTemplateId{
Package: packageName,
Name: constructType,
}

c.InstanceId = urn.ResourceID
return nil
}
Loading

0 comments on commit 167e9c3

Please sign in to comment.