Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Created a concurrent safe Gaia logger for plugins. #213

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
echo "" > coverage.txt

for d in $(go list ./... | grep -v vendor | grep -v /testacc); do
go test -v -timeout 20s -race -coverprofile=profile.out -covermode=atomic $d
go test -v -timeout 50s -race -coverprofile=profile.out -covermode=atomic $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RELEASE_NAME=${NAME}
HELM_DIR=$(shell pwd)/helm
TEST=$$(go list ./... | grep -v /vendor/ | grep /testacc)
TEST_TIMEOUT_ACC?=20m
TEST_TIMEOUT?=20s
TEST_TIMEOUT?=50s

default: dev

Expand Down
55 changes: 43 additions & 12 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"
"os/exec"
"sync"
"time"

"github.com/gaia-pipeline/gaia"
Expand Down Expand Up @@ -39,6 +40,34 @@ var pluginMap = map[string]plugin.Plugin{
// timeFormat is the logging time format.
const timeFormat = "2006/01/02 15:04:05"

// GaiaLogWriter represents a concurrent safe log writer which can be shared with go-plugin.
type GaiaLogWriter struct {
mu sync.RWMutex
buffer *bytes.Buffer
writer *bufio.Writer
}

// Write locks and writes to the underlying writer.
func (g *GaiaLogWriter) Write(p []byte) (n int, err error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.writer.Write(p)
}

// Flush locks and flushes the underlying writer.
func (g *GaiaLogWriter) Flush() error {
g.mu.Lock()
defer g.mu.Unlock()
return g.writer.Flush()
}

// WriteString locks and passes on the string to write to the underlying writer.
func (g *GaiaLogWriter) WriteString(s string) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.writer.WriteString(s)
}

// GoPlugin represents a single plugin instance which uses gRPC
// to connect to exactly one plugin.
type GoPlugin struct {
Expand All @@ -55,8 +84,7 @@ type GoPlugin struct {
logFile *os.File

// Writer used to write logs from execution to file or buffer
writer *bufio.Writer
buffer *bytes.Buffer
logger GaiaLogWriter

// CA instance used to handle certificates
ca security.CAAPI
Expand Down Expand Up @@ -108,6 +136,9 @@ func (p *GoPlugin) NewPlugin(ca security.CAAPI) Plugin {
// It's up to the caller to call plugin.Close to shutdown the plugin
// and close the gRPC connection.
func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error {
// Initialise the logger
p.logger = GaiaLogWriter{}

// Create log file and open it.
// We will close this file in the close method.
if logPath != nil {
Expand All @@ -122,11 +153,11 @@ func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error {
}

// Create new writer
p.writer = bufio.NewWriter(p.logFile)
p.logger.writer = bufio.NewWriter(p.logFile)
} else {
// If no path is provided, write output to buffer
p.buffer = new(bytes.Buffer)
p.writer = bufio.NewWriter(p.buffer)
p.logger.buffer = new(bytes.Buffer)
p.logger.writer = bufio.NewWriter(p.logger.buffer)
}

// Create and sign a new pair of certificates for the server
Expand Down Expand Up @@ -161,15 +192,15 @@ func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error {
Plugins: pluginMap,
Cmd: command,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Stderr: p.writer,
Stderr: &p.logger,
TLSConfig: tlsConfig,
})

// Connect via gRPC
p.clientProtocol, err = p.client.Client()
if err != nil {
_ = p.writer.Flush()
return fmt.Errorf("%s\n\n--- output ---\n%s", err.Error(), p.buffer.String())
_ = p.logger.Flush()
return fmt.Errorf("%s\n\n--- output ---\n%s", err.Error(), p.logger.buffer.String())
}

return nil
Expand Down Expand Up @@ -227,7 +258,7 @@ func (p *GoPlugin) Execute(j *gaia.Job) error {

// Generate error message and attach it to logs.
timeString := time.Now().Format(timeFormat)
_, _ = p.writer.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, resultObj.Message))
_, _ = p.logger.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, resultObj.Message))
} else if err != nil {
// An error occurred during the send or somewhere else.
// The job itself usually does not return an error here.
Expand All @@ -236,7 +267,7 @@ func (p *GoPlugin) Execute(j *gaia.Job) error {

// Generate error message and attach it to logs.
timeString := time.Now().Format(timeFormat)
_, _ = p.writer.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, err.Error()))
_, _ = p.logger.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, err.Error()))
} else {
j.Status = gaia.JobSuccess
}
Expand Down Expand Up @@ -319,7 +350,7 @@ func (p *GoPlugin) GetJobs() ([]*gaia.Job, error) {

// FlushLogs flushes the logs.
func (p *GoPlugin) FlushLogs() error {
return p.writer.Flush()
return p.logger.Flush()
}

// Close shutdown the plugin and kills the gRPC connection.
Expand All @@ -332,7 +363,7 @@ func (p *GoPlugin) Close() {
p.client.Kill()

// Flush the writer
_ = p.writer.Flush()
_ = p.logger.Flush()

// Close log file
_ = p.logFile.Close()
Expand Down
6 changes: 4 additions & 2 deletions plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func TestExecute(t *testing.T) {
})
p := &GoPlugin{pluginConn: new(fakeGaiaPlugin)}
buf := new(bytes.Buffer)
p.writer = bufio.NewWriter(buf)
p.logger = GaiaLogWriter{}
p.logger.writer = bufio.NewWriter(buf)
j := &gaia.Job{
Args: []*gaia.Argument{
{
Expand All @@ -148,7 +149,8 @@ func TestGetJobs(t *testing.T) {
})
p := &GoPlugin{pluginConn: new(fakeGaiaPlugin)}
buf := new(bytes.Buffer)
p.writer = bufio.NewWriter(buf)
p.logger = GaiaLogWriter{}
p.logger.writer = bufio.NewWriter(buf)
_, err := p.GetJobs()
if err != nil {
t.Fatal(err)
Expand Down