Skip to content
136 changes: 0 additions & 136 deletions internal/worker/clienterrors/errors.go

This file was deleted.

66 changes: 0 additions & 66 deletions pkg/customizations/remotefile/resolver.go

This file was deleted.

9 changes: 0 additions & 9 deletions pkg/customizations/remotefile/spec.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remotefile

import (
"context"
"fmt"
"io"
"net/http"
Expand All @@ -17,8 +18,8 @@ func NewClient() *Client {
}
}

func (c *Client) makeRequest(u *url.URL) ([]byte, error) {
req, err := http.NewRequest("GET", u.String(), nil)
func (c *Client) makeRequest(ctx context.Context, u *url.URL) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
if err != nil {
return nil, err
}
Expand All @@ -27,8 +28,12 @@ func (c *Client) makeRequest(u *url.URL) ([]byte, error) {
if err != nil {
return nil, err
}

defer resp.Body.Close()

if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, u.String())
}

output, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
Expand All @@ -50,11 +55,11 @@ func (c *Client) validateURL(u string) (*url.URL, error) {

// resolve and return the contents of a remote file
// which can be used later, in the pipeline
func (c *Client) Resolve(u string) ([]byte, error) {
func (c *Client) Resolve(ctx context.Context, u string) ([]byte, error) {
parsedURL, err := c.validateURL(u)
if err != nil {
return nil, err
}

return c.makeRequest(parsedURL)
return c.makeRequest(ctx, parsedURL)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package remotefile

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -19,6 +20,9 @@ func makeTestServer() *httptest.Server {
if r.URL.Path == "/key2" {
fmt.Fprintln(w, "key2")
}
if r.URL.Path == "/notfound" {
w.WriteHeader(http.StatusNotFound)
}
}))
}

Expand All @@ -29,14 +33,23 @@ func TestClientResolve(t *testing.T) {

client := NewClient()

output, err := client.Resolve(url)
output, err := client.Resolve(context.Background(), url)
assert.NoError(t, err)

expectedOutput := "key1\n"

assert.Equal(t, expectedOutput, string(output))
}

func TestClientResolveNonOKStatus(t *testing.T) {
server := makeTestServer()

client := NewClient()
_, err := client.Resolve(context.Background(), server.URL+"/notfound")
assert.Error(t, err)
assert.Contains(t, err.Error(), "unexpected status 404")
}

func TestInputSpecValidation(t *testing.T) {
server := makeTestServer()

Expand Down
72 changes: 72 additions & 0 deletions pkg/remotefile/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package remotefile

import (
"context"
"fmt"
"slices"
"sort"
"strings"
"sync"
)

type resolveResult struct {
url string
content []byte
err error
}

// TODO: could make this more generic since this is shared with the container
// resolver
type Resolver struct {
queue chan resolveResult
wg sync.WaitGroup
ctx context.Context
}

func NewResolver(ctx context.Context) *Resolver {
return &Resolver{
queue: make(chan resolveResult),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: why not rename the field to resultsQueue if the name was confusing to you and made the wrong implications at the first sight?

wg: sync.WaitGroup{},
ctx: ctx,
}
}

// Add a URL to the resolver queue. When called after Finish was called,
// it may panic.
func (r *Resolver) Add(url string) {
r.wg.Add(1)
client := NewClient()

go func() {
defer r.wg.Done()

content, err := client.Resolve(r.ctx, url)
r.queue <- resolveResult{url: url, content: content, err: err}
}()
}

// Finish starts collecting of results and returns them. No further calls to Add
// are allowed after this call. It blocks until all results are collected.
func (r *Resolver) Finish() ([]Spec, error) {
go func() {
r.wg.Wait()
close(r.queue)
}()

var resultItems []Spec
var errs []string
for result := range r.queue {
if result.err == nil {
resultItems = append(resultItems, Spec{URL: result.url, Content: result.content})
} else {
errs = append(errs, result.err.Error())
}
}

if len(errs) > 0 {
sort.Strings(errs)
return resultItems, fmt.Errorf("failed to resolve remote files: %s", strings.Join(slices.Compact(errs), "; "))
}

return resultItems, nil
}
Loading
Loading