Skip to content

Commit a60c36a

Browse files
authored
Basic conduit init and conduit pipelines init commands (#1927)
1 parent 7028b67 commit a60c36a

File tree

12 files changed

+780
-23
lines changed

12 files changed

+780
-23
lines changed

Diff for: .gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,5 @@ pkg/plugin/processor/standalone/test/wasm_processors/*/processor.wasm
9797

9898
# this one is needed for integration tests
9999
!pkg/provisioning/test/source-file.txt
100+
101+
golangci-report.xml

Diff for: cmd/cli/cli.go

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright © 2024 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cli
16+
17+
import (
18+
"fmt"
19+
"os"
20+
21+
"github.com/conduitio/conduit/pkg/conduit"
22+
"github.com/spf13/cobra"
23+
)
24+
25+
var (
26+
initArgs InitArgs
27+
pipelinesInitArgs PipelinesInitArgs
28+
)
29+
30+
type Instance struct {
31+
rootCmd *cobra.Command
32+
}
33+
34+
// New creates a new CLI Instance.
35+
func New() *Instance {
36+
return &Instance{
37+
rootCmd: buildRootCmd(),
38+
}
39+
}
40+
41+
func (i *Instance) Run() {
42+
if err := i.rootCmd.Execute(); err != nil {
43+
_, _ = fmt.Fprintf(os.Stderr, "%v\n", err)
44+
os.Exit(1)
45+
}
46+
}
47+
48+
func buildRootCmd() *cobra.Command {
49+
cfg := conduit.DefaultConfig()
50+
51+
cmd := &cobra.Command{
52+
Use: "conduit",
53+
Short: "Conduit CLI",
54+
Long: "Conduit CLI is a command-line that helps you interact with and manage Conduit.",
55+
Version: conduit.Version(true),
56+
Run: func(cmd *cobra.Command, args []string) {
57+
e := &conduit.Entrypoint{}
58+
e.Serve(cfg)
59+
},
60+
}
61+
cmd.CompletionOptions.DisableDefaultCmd = true
62+
conduit.Flags(&cfg).VisitAll(cmd.Flags().AddGoFlag)
63+
64+
// init
65+
cmd.AddCommand(buildInitCmd())
66+
67+
// pipelines
68+
cmd.AddGroup(&cobra.Group{
69+
ID: "pipelines",
70+
Title: "Pipelines",
71+
})
72+
cmd.AddCommand(buildPipelinesCmd())
73+
74+
return cmd
75+
}
76+
77+
func buildInitCmd() *cobra.Command {
78+
initCmd := &cobra.Command{
79+
Use: "init",
80+
Short: "Initialize Conduit with a configuration file and directories.",
81+
Args: cobra.NoArgs,
82+
RunE: func(cmd *cobra.Command, args []string) error {
83+
return NewConduitInit(initArgs).Run()
84+
},
85+
}
86+
initCmd.Flags().StringVar(
87+
&initArgs.Path,
88+
"config.path",
89+
"",
90+
"path where Conduit will be initialized",
91+
)
92+
93+
return initCmd
94+
}
95+
96+
func buildPipelinesCmd() *cobra.Command {
97+
pipelinesCmd := &cobra.Command{
98+
Use: "pipelines",
99+
Short: "Initialize and manage pipelines",
100+
Args: cobra.NoArgs,
101+
GroupID: "pipelines",
102+
}
103+
104+
pipelinesCmd.AddCommand(buildPipelinesInitCmd())
105+
106+
return pipelinesCmd
107+
}
108+
109+
func buildPipelinesInitCmd() *cobra.Command {
110+
pipelinesInitCmd := &cobra.Command{
111+
Use: "init [pipeline-name]",
112+
Short: "Initialize an example pipeline.",
113+
Long: `Initialize a pipeline configuration file, with all of parameters for source and destination connectors
114+
initialized and described. The source and destination connector can be chosen via flags. If no connectors are chosen, then
115+
a simple and runnable generator-to-log pipeline is configured.`,
116+
Args: cobra.MaximumNArgs(1),
117+
Example: " conduit pipelines init awesome-pipeline-name --source postgres --destination kafka --path pipelines/pg-to-kafka.yaml",
118+
RunE: func(cmd *cobra.Command, args []string) error {
119+
if len(args) > 0 {
120+
pipelinesInitArgs.Name = args[0]
121+
}
122+
return NewPipelinesInit(pipelinesInitArgs).Run()
123+
},
124+
}
125+
126+
// Add flags to pipelines init command
127+
pipelinesInitCmd.Flags().StringVar(
128+
&pipelinesInitArgs.Source,
129+
"source",
130+
"",
131+
"Source connector (any of the built-in connectors).",
132+
)
133+
pipelinesInitCmd.Flags().StringVar(
134+
&pipelinesInitArgs.Destination,
135+
"destination",
136+
"",
137+
"Destination connector (any of the built-in connectors).",
138+
)
139+
pipelinesInitCmd.Flags().StringVar(
140+
&pipelinesInitArgs.Path,
141+
"pipelines.path",
142+
"./pipelines",
143+
"Path where the pipeline will be saved.",
144+
)
145+
146+
return pipelinesInitCmd
147+
}

Diff for: cmd/cli/conduit_init.go

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright © 2024 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package cli
16+
17+
import (
18+
"flag"
19+
"fmt"
20+
"os"
21+
"path/filepath"
22+
"strings"
23+
24+
"github.com/conduitio/conduit/cmd/cli/internal"
25+
"github.com/conduitio/conduit/pkg/conduit"
26+
"github.com/conduitio/conduit/pkg/foundation/cerrors"
27+
"github.com/conduitio/yaml/v3"
28+
)
29+
30+
type InitArgs struct {
31+
Path string
32+
}
33+
34+
type ConduitInit struct {
35+
args InitArgs
36+
}
37+
38+
func NewConduitInit(args InitArgs) *ConduitInit {
39+
return &ConduitInit{args: args}
40+
}
41+
42+
func (i *ConduitInit) Run() error {
43+
err := i.createDirs()
44+
if err != nil {
45+
return err
46+
}
47+
48+
err = i.createConfigYAML()
49+
if err != nil {
50+
return fmt.Errorf("failed to create config YAML: %w", err)
51+
}
52+
53+
fmt.Println(`
54+
Conduit has been initialized!
55+
56+
To quickly create an example pipeline, run 'conduit pipelines init'.
57+
To see how you can customize your first pipeline, run 'conduit pipelines init --help'.`)
58+
59+
return nil
60+
}
61+
62+
func (i *ConduitInit) createConfigYAML() error {
63+
cfgYAML := internal.NewYAMLTree()
64+
i.conduitCfgFlags().VisitAll(func(f *flag.Flag) {
65+
if i.isHiddenFlag(f.Name) {
66+
return // hide flag from output
67+
}
68+
cfgYAML.Insert(f.Name, f.DefValue, f.Usage)
69+
})
70+
71+
yamlData, err := yaml.Marshal(cfgYAML.Root)
72+
if err != nil {
73+
return cerrors.Errorf("error marshaling YAML: %w\n", err)
74+
}
75+
76+
path := filepath.Join(i.path(), "conduit.yaml")
77+
err = os.WriteFile(path, yamlData, 0o600)
78+
if err != nil {
79+
return cerrors.Errorf("error writing conduit.yaml: %w", err)
80+
}
81+
fmt.Printf("Configuration file written to %v\n", path)
82+
83+
return nil
84+
}
85+
86+
func (i *ConduitInit) createDirs() error {
87+
dirs := []string{"processors", "connectors", "pipelines"}
88+
89+
for _, dir := range dirs {
90+
path := filepath.Join(i.path(), dir)
91+
92+
// Attempt to create the directory, skipping if it already exists
93+
if err := os.Mkdir(path, os.ModePerm); err != nil {
94+
if os.IsExist(err) {
95+
fmt.Printf("Directory '%s' already exists, skipping...\n", path)
96+
continue
97+
}
98+
return fmt.Errorf("failed to create directory '%s': %w", path, err)
99+
}
100+
101+
fmt.Printf("Created directory: %s\n", path)
102+
}
103+
104+
return nil
105+
}
106+
107+
func (i *ConduitInit) isHiddenFlag(name string) bool {
108+
return name == "dev" ||
109+
strings.HasPrefix(name, "dev.") ||
110+
conduit.DeprecatedFlags[name]
111+
}
112+
113+
func (i *ConduitInit) conduitCfgFlags() *flag.FlagSet {
114+
cfg := conduit.DefaultConfigWithBasePath(i.path())
115+
return conduit.Flags(&cfg)
116+
}
117+
118+
func (i *ConduitInit) path() string {
119+
if i.args.Path != "" {
120+
return i.args.Path
121+
}
122+
123+
path, err := os.Getwd()
124+
if err != nil {
125+
panic(cerrors.Errorf("failed to get current working directory: %w", err))
126+
}
127+
128+
return path
129+
}

Diff for: cmd/cli/internal/yaml.go

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
// Copyright © 2024 Meroxa, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package internal
16+
17+
import (
18+
"strings"
19+
20+
"github.com/conduitio/yaml/v3"
21+
)
22+
23+
// YAMLTree represents a YAML document.
24+
// It makes it possible to insert value nodes with comments.
25+
type YAMLTree struct {
26+
Root *yaml.Node
27+
}
28+
29+
func NewYAMLTree() *YAMLTree {
30+
return &YAMLTree{
31+
Root: &yaml.Node{
32+
Kind: yaml.MappingNode,
33+
},
34+
}
35+
}
36+
37+
// Insert adds a path with a value to the tree
38+
func (t *YAMLTree) Insert(path, value, comment string) {
39+
parts := strings.Split(path, ".")
40+
current := t.Root
41+
42+
// For each part of the path
43+
for i, part := range parts {
44+
// Create key node
45+
keyNode := &yaml.Node{
46+
Kind: yaml.ScalarNode,
47+
Value: part,
48+
}
49+
50+
// Find or create value node
51+
var valueNode *yaml.Node
52+
found := false
53+
54+
// Look for existing key in current mapping
55+
for i := 0; i < len(current.Content); i += 2 {
56+
if current.Content[i].Value == part {
57+
valueNode = current.Content[i+1]
58+
found = true
59+
break
60+
}
61+
}
62+
63+
// If not found, create new node
64+
if !found {
65+
// If this is the last part, create scalar value node
66+
if i == len(parts)-1 {
67+
valueNode = &yaml.Node{
68+
Kind: yaml.ScalarNode,
69+
Value: value,
70+
}
71+
keyNode.HeadComment = comment
72+
} else {
73+
// Otherwise create mapping node for nesting
74+
valueNode = &yaml.Node{
75+
Kind: yaml.MappingNode,
76+
}
77+
}
78+
// Add key-value pair to current node's content
79+
current.Content = append(current.Content, keyNode, valueNode)
80+
}
81+
82+
// Move to next level
83+
current = valueNode
84+
}
85+
}

0 commit comments

Comments
 (0)