Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,6 @@ func NewGrpcClient(ctx context.Context, conn *grpc.ClientConn) rpc.Peer {
return client
}

func (c *client) Close() error {
close(c.logs)
return c.conn.Close()
}

func (c *client) IsConnected() bool {
state := c.conn.GetState()
connected := state == connectivity.Ready || state == connectivity.Idle
Expand Down Expand Up @@ -506,9 +501,10 @@ func (c *client) processLogs(ctx context.Context) {
bytes = 0
}

// ctx.Done() is covered by the log channel being closed
for {
select {
case <-ctx.Done():
Comment thread
6543 marked this conversation as resolved.
return
case entry, ok := <-c.logs:
if !ok {
log.Info().Msg("log drain: channel closed")
Expand Down
8 changes: 4 additions & 4 deletions e2e/scenarios/agent_routing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ func TestAgentLabelRouting(t *testing.T) {
})

// Plain agent: wildcard repo label only — cannot satisfy gpu=true.
plainAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
plainAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("plain-agent"),
)

// GPU agent: carries gpu=true — the only agent that can accept the task.
gpuAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
gpuAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("gpu-agent"),
setup.WithCustomLabels(map[string]string{"gpu": "true"}),
)
Expand Down Expand Up @@ -108,12 +108,12 @@ Func TestOrgAgentPreferredOverGlobal(t *testing.T) {
})

// Global agent: matches org-id=* (score 1).
globalAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
globalAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("global-agent"),
)

// Org agent: will be patched with the repo's OrgID (score 10).
orgAgent := setup.StartAgent(t.Context(), t, env.GRPCAddr,
orgAgent := setup.StartAgent(t, env.GRPCAddr,
setup.WithHostname("org-agent"),
setup.WithOrgID(env.Fixtures.Repo.OrgID),
)
Expand Down
2 changes: 1 addition & 1 deletion e2e/scenarios/cancel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestCancelRunningPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: cancelPipelineYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Expand Down
2 changes: 1 addition & 1 deletion e2e/scenarios/infra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestInfraSmoke(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: simpleSuccessYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

draftPipeline := &model.Pipeline{
Expand Down
8 changes: 4 additions & 4 deletions e2e/scenarios/matrix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestMatrixPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: matrixPipelineYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestMatrixIncludePipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: matrixIncludePipelineYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Expand Down Expand Up @@ -211,7 +211,7 @@ steps:
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: yaml},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Expand Down Expand Up @@ -259,7 +259,7 @@ steps:
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: yaml},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Expand Down
2 changes: 1 addition & 1 deletion e2e/scenarios/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestRestartPipeline(t *testing.T) {
env := setup.StartServer(t.Context(), t, []*forge_types.FileMeta{
{Name: ".woodpecker.yaml", Data: simpleSuccessYAML},
})
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

// First run.
Expand Down
2 changes: 1 addition & 1 deletion e2e/scenarios/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func runScenario(t *testing.T, sc Scenario) {
t.Helper()

env := setup.StartServer(t.Context(), t, sc.Files)
agent := setup.StartAgent(t.Context(), t, env.GRPCAddr)
agent := setup.StartAgent(t, env.GRPCAddr)
setup.WaitForAgentRegistered(t, env.Store, agent)

created, err := pipeline.Create(t.Context(), env.Store, env.Fixtures.Repo, &model.Pipeline{
Expand Down
26 changes: 13 additions & 13 deletions e2e/setup/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func WithOrgID(id int64) AgentOption {
// server at grpcAddr and returns an *AgentEnv whose AgentID is populated once
// the agent has registered. Pass AgentOption values to configure labels, hostname,
// or org-scoping; multiple agents can be started in the same test.
func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...AgentOption) *AgentEnv { //nolint:contextcheck
func StartAgent(t *testing.T, grpcAddr string, opts ...AgentOption) *AgentEnv {
t.Helper()

cfg := &agentConfig{
Expand All @@ -128,8 +128,8 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
Timeout: shortTimeout,
})

authCtx, authCancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { authCancel(nil) })
agentCtx, agentCancel := context.WithCancelCause(t.Context())
t.Cleanup(func() { agentCancel(nil) })

authConn, err := grpc.NewClient(grpcAddr, transport, keepaliveOpts)
if err != nil {
Expand All @@ -138,7 +138,7 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
t.Cleanup(func() { authConn.Close() })

authClient := agent_rpc.NewAuthGrpcClient(authConn, TestAgentToken, -1)
authInterceptor, err := agent_rpc.NewAuthInterceptor(authCtx, authClient, agentAuthRefreshEvery) //nolint:contextcheck
authInterceptor, err := agent_rpc.NewAuthInterceptor(agentCtx, authClient, agentAuthRefreshEvery)
if err != nil {
t.Fatalf("StartAgent(%s): authenticate with server: %v", cfg.hostname, err)
}
Expand All @@ -155,20 +155,20 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
}
t.Cleanup(func() { conn.Close() })

client := agent_rpc.NewGrpcClient(ctx, conn)
client := agent_rpc.NewGrpcClient(agentCtx, conn)

grpcCtx := metadata.NewOutgoingContext(authCtx, metadata.Pairs("hostname", cfg.hostname))
grpcCtx := metadata.NewOutgoingContext(agentCtx, metadata.Pairs("hostname", cfg.hostname))

backend := dummy.New()
if !backend.IsAvailable(ctx) {
if !backend.IsAvailable(agentCtx) {
t.Fatalf("StartAgent(%s): dummy backend is not available", cfg.hostname)
}
engInfo, err := backend.Load(ctx)
engInfo, err := backend.Load(agentCtx)
if err != nil {
t.Fatalf("StartAgent(%s): load dummy backend: %v", cfg.hostname, err)
}

env.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{ //nolint:contextcheck
env.AgentID, err = client.RegisterAgent(grpcCtx, rpc.AgentInfo{
Version: version.String(),
Backend: backend.Name(),
Platform: engInfo.Platform,
Expand Down Expand Up @@ -218,16 +218,16 @@ func StartAgent(ctx context.Context, t *testing.T, grpcAddr string, opts ...Agen
runner := agent.NewRunner(client, filter, cfg.hostname, counter, backend)
log.Debug().Int("slot", slot).Str("hostname", cfg.hostname).Msg("test agent: runner started")
for {
if ctx.Err() != nil {
if agentCtx.Err() != nil {
return
}
if err := runner.Run(ctx); err != nil {
if ctx.Err() != nil {
if err := runner.Run(agentCtx); err != nil {
if agentCtx.Err() != nil {
return
}
log.Error().Err(err).Int("slot", slot).Str("hostname", cfg.hostname).Msg("test agent: runner error, retrying")
select {
case <-ctx.Done():
case <-agentCtx.Done():
return
case <-time.After(500 * time.Millisecond):
}
Expand Down
7 changes: 6 additions & 1 deletion e2e/setup/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,22 @@ func startGRPCServer(ctx context.Context, t *testing.T, s store.Store) string {
s,
))

stopped := make(chan struct{})
grpcCtx, grpcCancel := context.WithCancelCause(ctx)
go func() {
<-grpcCtx.Done()
grpcServer.GracefulStop()
close(stopped)
}()
go func() {
if err := grpcServer.Serve(lis); err != nil {
grpcCancel(err)
}
}()

t.Cleanup(func() { grpcCancel(nil) })
t.Cleanup(func() {
grpcCancel(nil)
<-stopped
})
return addr
}