diff --git a/cmd/relui/main.go b/cmd/relui/main.go index 4ba50cbfd0..420b9f38f6 100644 --- a/cmd/relui/main.go +++ b/cmd/relui/main.go @@ -6,6 +6,7 @@ package main import ( + "context" "flag" "io/ioutil" "log" @@ -13,12 +14,17 @@ import ( "os" "path/filepath" + "cloud.google.com/go/pubsub" "github.com/golang/protobuf/proto" reluipb "golang.org/x/build/cmd/relui/protos" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) var ( devDataDir = flag.String("dev-data-directory", defaultDevDataDir(), "Development-only directory to use for storage of application state.") + projectID = flag.String("project-id", os.Getenv("PUBSUB_PROJECT_ID"), "Pubsub project ID for communicating with workers. Uses PUBSUB_PROJECT_ID if unset.") + topicID = flag.String("topic-id", "relui-development", "Pubsub topic ID for communicating with workers.") ) func main() { @@ -27,7 +33,12 @@ func main() { if err := fs.load(); err != nil { log.Fatalf("Error loading state from %q: %v", *devDataDir, err) } - s := &server{store: fs, configs: loadWorkflowConfig("./workflows")} + ctx := context.Background() + s := &server{ + configs: loadWorkflowConfig("./workflows"), + store: fs, + topic: getTopic(ctx), + } http.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler)) http.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler)) http.Handle("/", fileServerHandler(relativeFile("./static"), http.HandlerFunc(s.homeHandler))) @@ -39,6 +50,22 @@ func main() { log.Fatal(http.ListenAndServe(":"+port, http.DefaultServeMux)) } +// getTopic creates and returns a pubsub topic from the project specified in projectId, which is to be used for +// communicating with relui workers. +// +// It is safe to call if a topic already exists. A reference to the topic will be returned. +func getTopic(ctx context.Context) *pubsub.Topic { + client, err := pubsub.NewClient(ctx, *projectID) + if err != nil { + log.Fatalf("pubsub.NewClient(_, %q) = %v, wanted no error", *projectID, err) + } + _, err = client.CreateTopic(ctx, *topicID) + if err != nil && status.Code(err) != codes.AlreadyExists { + log.Fatalf("client.CreateTopic(_, %q) = %v, wanted no error", *topicID, err) + } + return client.Topic(*topicID) +} + // loadWorkflowConfig loads Workflow configuration files from dir. It expects all files to be in textproto format. func loadWorkflowConfig(dir string) []*reluipb.Workflow { fs, err := filepath.Glob(filepath.Join(relativeFile(dir), "*.textpb")) diff --git a/cmd/relui/web.go b/cmd/relui/web.go index a20d2fdefa..31cba38eb5 100644 --- a/cmd/relui/web.go +++ b/cmd/relui/web.go @@ -15,6 +15,7 @@ import ( "path" "path/filepath" + "cloud.google.com/go/pubsub" "github.com/golang/protobuf/proto" reluipb "golang.org/x/build/cmd/relui/protos" ) @@ -55,6 +56,9 @@ type server struct { // store is for persisting application state. store store + + // topic is for communicating with relui workers. + topic *pubsub.Topic } type homeResponse struct { diff --git a/go.mod b/go.mod index 22f943e274..8674d2c8ac 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go v0.54.0 cloud.google.com/go/bigquery v1.4.0 cloud.google.com/go/datastore v1.1.0 + cloud.google.com/go/pubsub v1.2.0 cloud.google.com/go/storage v1.6.0 github.com/NYTimes/gziphandler v1.1.1 github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect