Skip to content

Commit

Permalink
feat: api methods for service logs
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <[email protected]>
  • Loading branch information
vsukhin committed Nov 25, 2024
1 parent 8266e56 commit bd39964
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 28 deletions.
2 changes: 2 additions & 0 deletions internal/app/api/v1/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ func (s *TestkubeAPI) Init(server server.HTTPServer) {
testWorkflowExecutions.Post("/", s.ExecuteTestWorkflowHandler())
testWorkflowExecutions.Get("/:executionID", s.GetTestWorkflowExecutionHandler())
testWorkflowExecutions.Get("/:executionID/notifications", s.StreamTestWorkflowExecutionNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream", s.StreamTestWorkflowExecutionNotificationsWebSocketHandler())
testWorkflowExecutions.Get("/:executionID/notifications/stream:serviceName/:serviceIndex", s.StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler())
testWorkflowExecutions.Post("/:executionID/abort", s.AbortTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/pause", s.PauseTestWorkflowExecutionHandler())
testWorkflowExecutions.Post("/:executionID/resume", s.ResumeTestWorkflowExecutionHandler())
Expand Down
133 changes: 105 additions & 28 deletions internal/app/api/v1/testworkflowexecutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
"github.com/pkg/errors"
"github.com/valyala/fasthttp"

"github.com/kubeshop/testkube/internal/app/api/apiutils"
"github.com/kubeshop/testkube/internal/common"
Expand All @@ -23,6 +24,41 @@ import (
"github.com/kubeshop/testkube/pkg/testworkflows/executionworker/executionworkertypes"
)

func (s *TestkubeAPI) streamNotifications(ctx *fasthttp.RequestCtx, id string, notifications executionworkertypes.NotificationsWatcher) {
// Initiate processing event stream
ctx.SetContentType("text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Transfer-Encoding", "chunked")

// Stream the notifications
ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
err := w.Flush()
if err != nil {
s.Log.Errorw("could not flush stream body", "error", err, "id", id)
}

enc := json.NewEncoder(w)

for n := range notifications.Channel() {
err := enc.Encode(n)
if err != nil {
s.Log.Errorw("could not encode value", "error", err, "id", id)
}

_, err = fmt.Fprintf(w, "\n")
if err != nil {
s.Log.Errorw("could not print new line", "error", err, "id", id)
}

err = w.Flush()
if err != nil {
s.Log.Errorw("could not flush stream body", "error", err, "id", id)
}
}
})
}

func (s *TestkubeAPI) StreamTestWorkflowExecutionNotificationsHandler() fiber.Handler {
return func(c *fiber.Ctx) error {
ctx := c.Context()
Expand All @@ -47,39 +83,40 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionNotificationsHandler() fiber.Ha
return s.BadRequest(c, errPrefix, "fetching notifications", notifications.Err())
}

// Initiate processing event stream
ctx.SetContentType("text/event-stream")
ctx.Response.Header.Set("Cache-Control", "no-cache")
ctx.Response.Header.Set("Connection", "keep-alive")
ctx.Response.Header.Set("Transfer-Encoding", "chunked")

// Stream the notifications
ctx.SetBodyStreamWriter(func(w *bufio.Writer) {
err := w.Flush()
if err != nil {
s.Log.Errorw("could not flush stream body", "error", err, "id", id)
}

enc := json.NewEncoder(w)
s.streamNotifications(ctx, id, notifications)
return nil
}
}

for n := range notifications.Channel() {
err := enc.Encode(n)
if err != nil {
s.Log.Errorw("could not encode value", "error", err, "id", id)
}
func (s *TestkubeAPI) StreamTestWorkflowExecutionServiceNotificationsHandler() fiber.Handler {
return func(c *fiber.Ctx) error {
ctx := c.Context()
executionID := c.Params("executionID")
serviceName := c.Params("serviceName")
serviceIndex := c.Params("serviceIndex")
errPrefix := fmt.Sprintf("failed to stream test workflow execution service '%s' instance '%s' notifications '%s'",
serviceName, serviceIndex, executionID)

_, err = fmt.Fprintf(w, "\n")
if err != nil {
s.Log.Errorw("could not print new line", "error", err, "id", id)
}
// Fetch execution from database
execution, err := s.TestWorkflowResults.Get(ctx, executionID)
if err != nil {
return s.ClientError(c, errPrefix, err)
}

err = w.Flush()
if err != nil {
s.Log.Errorw("could not flush stream body", "error", err, "id", id)
}
}
// Check for the logs
id := fmt.Sprintf("%s-%s-%s", execution.Id, serviceName, serviceIndex)
notifications := s.ExecutionWorkerClient.Notifications(ctx, id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
Signature: execution.Signature,
},
})
if notifications.Err() != nil {
return s.BadRequest(c, errPrefix, "fetching notifications", notifications.Err())
}

s.streamNotifications(ctx, id, notifications)
return nil
}
}
Expand Down Expand Up @@ -121,6 +158,46 @@ func (s *TestkubeAPI) StreamTestWorkflowExecutionNotificationsWebSocketHandler()
})
}

func (s *TestkubeAPI) StreamTestWorkflowExecutionServiceNotificationsWebSocketHandler() fiber.Handler {
return websocket.New(func(c *websocket.Conn) {
ctx, ctxCancel := context.WithCancel(context.Background())
executionID := c.Params("executionID")
serviceName := c.Params("serviceName")
serviceIndex := c.Params("serviceIndex")

// Stop reading when the WebSocket connection is already closed
originalClose := c.CloseHandler()
c.SetCloseHandler(func(code int, text string) error {
ctxCancel()
return originalClose(code, text)
})
defer c.Conn.Close()

// Fetch execution from database
execution, err := s.TestWorkflowResults.Get(ctx, executionID)
if err != nil {
return
}

// Check for the logs
id := fmt.Sprintf("%s-%s-%s", execution.Id, serviceName, serviceIndex)
notifications := s.ExecutionWorkerClient.Notifications(ctx, id, executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
Signature: execution.Signature,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return
}

for n := range notifications.Channel() {
_ = c.WriteJSON(n)
}
})
}

func (s *TestkubeAPI) ListTestWorkflowExecutionsHandler() fiber.Handler {
return func(c *fiber.Ctx) error {
errPrefix := "failed to list test workflow executions"
Expand Down

0 comments on commit bd39964

Please sign in to comment.