Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async nginx config reload #70

Merged
merged 20 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 87 additions & 63 deletions src/plugins/nginx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ var (

// Nginx is the metadata of our nginx binary
type Nginx struct {
messagePipeline core.MessagePipeInterface
nginxBinary core.NginxBinary
processes []core.Process
env core.Environment
cmdr client.Commander
config *config.Config
isNAPEnabled bool
isConfUploadEnabled bool
messagePipeline core.MessagePipeInterface
nginxBinary core.NginxBinary
processes []core.Process
env core.Environment
cmdr client.Commander
config *config.Config
isNAPEnabled bool
isConfUploadEnabled bool
configApplyStatusChannel chan *proto.Command_NginxConfigResponse
}

type ConfigRollbackResponse struct {
Expand All @@ -59,6 +60,7 @@ type NginxConfigValidationResponse struct {
nginxDetails *proto.NginxDetails
config *proto.NginxConfig
configApply *sdk.ConfigApply
elapsedTime time.Duration
}

func NewNginx(cmdr client.Commander, nginxBinary core.NginxBinary, env core.Environment, loadedConfig *config.Config) *Nginx {
Expand All @@ -69,7 +71,16 @@ func NewNginx(cmdr client.Commander, nginxBinary core.NginxBinary, env core.Envi

isConfUploadEnabled := isConfUploadEnabled(loadedConfig)

return &Nginx{nginxBinary: nginxBinary, processes: env.Processes(), env: env, cmdr: cmdr, config: loadedConfig, isNAPEnabled: isNAPEnabled, isConfUploadEnabled: isConfUploadEnabled}
return &Nginx{
nginxBinary: nginxBinary,
processes: env.Processes(),
env: env,
cmdr: cmdr,
config: loadedConfig,
isNAPEnabled: isNAPEnabled,
isConfUploadEnabled: isConfUploadEnabled,
configApplyStatusChannel: make(chan *proto.Command_NginxConfigResponse, 1),
}
}

// Init initializes the plugin
Expand Down Expand Up @@ -112,12 +123,25 @@ func (n *Nginx) Process(message *core.Message) {
case core.NginxConfigValidationSucceeded:
switch response := message.Data().(type) {
case *NginxConfigValidationResponse:
n.completeConfigApply(response)
status := n.completeConfigApply(response)
if response.elapsedTime < validationTimeout {
n.configApplyStatusChannel <- status
}
}
case core.NginxConfigValidationFailed:
switch response := message.Data().(type) {
case *NginxConfigValidationResponse:
n.rollbackConfigApply(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same potential race between rollback message handling, and inline.

status := &proto.Command_NginxConfigResponse{
NginxConfigResponse: &proto.NginxConfigResponse{
Status: newErrStatus(fmt.Sprintf("Config apply failed (write): " + response.err.Error())).CmdStatus,
Action: proto.NginxConfigAction_APPLY,
ConfigData: response.config.ConfigData,
},
}
if response.elapsedTime < validationTimeout {
n.configApplyStatusChannel <- status
}
}
case core.EnableExtension:
switch data := message.Data().(type) {
Expand Down Expand Up @@ -293,7 +317,7 @@ func (n *Nginx) applyConfig(cmd *proto.Command, cfg *proto.Command_NginxConfig)

message := fmt.Sprintf("Config apply failed (write): " + err.Error())

n.messagePipeline.Process(core.NewMessage(core.NginxConfigValidationPending, &proto.AgentActivityStatus{
n.messagePipeline.Process(core.NewMessage(core.NginxConfigApplyFailed, &proto.AgentActivityStatus{
Status: &proto.AgentActivityStatus_NginxConfigStatus{
NginxConfigStatus: &proto.NginxConfigStatus{
CorrelationId: cmd.Meta.MessageId,
Expand All @@ -306,70 +330,55 @@ func (n *Nginx) applyConfig(cmd *proto.Command, cfg *proto.Command_NginxConfig)
return n.handleErrorStatus(status, message)
}

validationResponse := n.validateConfig(nginx, cmd.Meta.MessageId, config, configApply)
if validationResponse != nil {
if validationResponse.err == nil {
agentActivityStatus := n.completeConfigApply(validationResponse)
if agentActivityStatus.GetNginxConfigStatus().GetStatus() == proto.NginxConfigStatus_ERROR {
status.NginxConfigResponse.Status = newErrStatus(agentActivityStatus.GetNginxConfigStatus().GetMessage()).CmdStatus
} else {
status.NginxConfigResponse.Status = newOKStatus(agentActivityStatus.GetNginxConfigStatus().GetMessage()).CmdStatus
}
go n.validateConfig(nginx, cmd.Meta.MessageId, config, configApply)

select {
case result := <-n.configApplyStatusChannel:
return result
case <-time.After(validationTimeout):
log.Debug("Validation of nginx config in progress")
return status
} else {
n.rollbackConfigApply(validationResponse)
message := fmt.Sprintf("Config apply failed (write): " + validationResponse.err.Error())
return n.handleErrorStatus(status, message)
}
} else {
log.Debug("Validation of nginx config in progress")
return status
}
}

// This function will run a nginx config validation in a separate go routine. If the validation takes less than 15 seconds then the result is returned straight away,
// otherwise nil is returned and the validation continues on in the background until it is complete. The result is always added to the message pipeline for other plugins
// to use.
func (n *Nginx) validateConfig(nginx *proto.NginxDetails, correlationId string, config *proto.NginxConfig, configApply *sdk.ConfigApply) *NginxConfigValidationResponse {
validationChannel := make(chan *NginxConfigValidationResponse, 1)
func (n *Nginx) validateConfig(nginx *proto.NginxDetails, correlationId string, config *proto.NginxConfig, configApply *sdk.ConfigApply) {
start := time.Now()

err := n.nginxBinary.ValidateConfig(nginx.NginxId, nginx.ProcessPath, nginx.ConfPath, config, configApply)
if err == nil {
_, err = n.nginxBinary.ReadConfig(nginx.GetConfPath(), config.GetConfigData().GetNginxId(), n.env.GetSystemUUID())
}

go func() {
err := n.nginxBinary.ValidateConfig(nginx.NginxId, nginx.ProcessPath, nginx.ConfPath, config, configApply)
if err == nil {
_, err = n.nginxBinary.ReadConfig(nginx.GetConfPath(), config.GetConfigData().GetNginxId(), n.env.GetSystemUUID())
elapsedTime := time.Since(start)
log.Tracef("nginx config validation took %s to complete", elapsedTime)

if err != nil {
response := &NginxConfigValidationResponse{
err: fmt.Errorf("error running nginx -t -c %s:\n %v", nginx.ConfPath, err),
correlationId: correlationId,
nginxDetails: nginx,
config: config,
configApply: configApply,
elapsedTime: elapsedTime,
}
if err != nil {
response := &NginxConfigValidationResponse{
err: fmt.Errorf("error running nginx -t -c %s:\n %v", nginx.ConfPath, err),
correlationId: correlationId,
nginxDetails: nginx,
config: config,
configApply: configApply,
}
n.messagePipeline.Process(core.NewMessage(core.NginxConfigValidationFailed, response))
validationChannel <- response
} else {
response := &NginxConfigValidationResponse{
err: nil,
correlationId: correlationId,
nginxDetails: nginx,
config: config,
configApply: configApply,
}
n.messagePipeline.Process(core.NewMessage(core.NginxConfigValidationSucceeded, response))
validationChannel <- response
n.messagePipeline.Process(core.NewMessage(core.NginxConfigValidationFailed, response))
} else {
response := &NginxConfigValidationResponse{
err: nil,
correlationId: correlationId,
nginxDetails: nginx,
config: config,
configApply: configApply,
elapsedTime: elapsedTime,
}
}()

select {
case result := <-validationChannel:
return result
case <-time.After(validationTimeout):
return nil
n.messagePipeline.Process(core.NewMessage(core.NginxConfigValidationSucceeded, response))
}
}

func (n *Nginx) completeConfigApply(response *NginxConfigValidationResponse) *proto.AgentActivityStatus {
func (n *Nginx) completeConfigApply(response *NginxConfigValidationResponse) *proto.Command_NginxConfigResponse {
nginxConfigStatusMessage := "Config applied successfully"
if response.configApply != nil {
if err := response.configApply.Complete(); err != nil {
Expand Down Expand Up @@ -433,8 +442,23 @@ func (n *Nginx) completeConfigApply(response *NginxConfigValidationResponse) *pr

n.messagePipeline.Process(core.NewMessage(core.NginxConfigApplySucceeded, agentActivityStatus))

status := &proto.Command_NginxConfigResponse{
NginxConfigResponse: &proto.NginxConfigResponse{
Status: newOKStatus("config apply request successfully processed").CmdStatus,
Action: proto.NginxConfigAction_APPLY,
ConfigData: response.config.ConfigData,
},
}

if agentActivityStatus.GetNginxConfigStatus().GetStatus() == proto.NginxConfigStatus_ERROR {
status.NginxConfigResponse.Status = newErrStatus(agentActivityStatus.GetNginxConfigStatus().GetMessage()).CmdStatus
} else {
status.NginxConfigResponse.Status = newOKStatus(agentActivityStatus.GetNginxConfigStatus().GetMessage()).CmdStatus
}

log.Debug("Config Apply Complete")
return agentActivityStatus

return status
}

func (n *Nginx) rollbackConfigApply(response *NginxConfigValidationResponse) {
Expand Down
59 changes: 57 additions & 2 deletions src/plugins/nginx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,13 @@ func TestNginxConfigApply(t *testing.T) {
assert.Eventually(
tt,
func() bool { return len(messagePipe.GetProcessedMessages()) != len(test.msgTopics) },
time.Duration(2*time.Second),
time.Duration(5*time.Second),
3*time.Millisecond,
)
assert.Eventually(
tt,
func() bool { return binary.AssertExpectations(tt) },
time.Duration(2*time.Second),
time.Duration(5*time.Second),
3*time.Millisecond,
)
env.AssertExpectations(tt)
Expand Down Expand Up @@ -439,6 +439,61 @@ func TestNginx_Info(t *testing.T) {
assert.Equal(t, "NginxBinary", pluginUnderTest.Info().Name())
}

func TestNginx_validateConfig(t *testing.T) {
tests := []struct {
name string
validationResult error
expectedTopic string
expectedError error
}{
{
name: "successful validation",
validationResult: nil,
expectedTopic: core.NginxConfigValidationSucceeded,
},
{
name: "failed validation",
validationResult: errors.New("failure"),
expectedTopic: core.NginxConfigValidationFailed,
},
}

for _, test := range tests {
t.Run(test.name, func(tt *testing.T) {

env := tutils.GetMockEnvWithProcess()
binary := tutils.NewMockNginxBinary()
binary.On("ValidateConfig", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.validationResult)
binary.On("ReadConfig", mock.Anything, mock.Anything, mock.Anything).Return(&proto.NginxConfig{}, nil)
binary.On("GetNginxDetailsMapFromProcesses", env.Processes()).Return((tutils.GetDetailsMap()))
binary.On("UpdateNginxDetailsFromProcesses", env.Processes())

pluginUnderTest := NewNginx(&tutils.MockCommandClient{}, binary, env, &loadedConfig.Config{Features: []string{loadedConfig.FeatureNginxConfig}})

messagePipe := core.SetupMockMessagePipe(t, context.TODO(), pluginUnderTest)
messagePipe.Run()

pluginUnderTest.validateConfig(&proto.NginxDetails{}, "123", &proto.NginxConfig{}, &sdk.ConfigApply{})

assert.Eventually(
t,
func() bool { return len(messagePipe.GetMessages()) == 1 },
time.Duration(2*time.Second),
3*time.Millisecond,
)

assert.Equal(t, test.expectedTopic, messagePipe.GetMessages()[0].Topic())
assert.Equal(t, "123", messagePipe.GetMessages()[0].Data().(*NginxConfigValidationResponse).correlationId)
if test.validationResult == nil {
assert.Nil(t, messagePipe.GetMessages()[0].Data().(*NginxConfigValidationResponse).err)
} else {
assert.NotNil(t, messagePipe.GetMessages()[0].Data().(*NginxConfigValidationResponse).err)
}
assert.Greater(t, messagePipe.GetMessages()[0].Data().(*NginxConfigValidationResponse).elapsedTime, 0*time.Second)
})
}
}

func TestNginx_completeConfigApply(t *testing.T) {
expectedTopics := []string{
core.NginxConfigValidationSucceeded,
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading