Skip to content

Commit

Permalink
Created a concurrent safe Gaia logger for plugins. (#213)
Browse files Browse the repository at this point in the history
* Created a concurrent safe gaia logger for plugins.

* Linter check for exported type.

* Frigging mod file is getting on my nerves.

* Raise the timeout since tests are now longer larger better harder stronger....

* I have to figure out what is doing that.
  • Loading branch information
Skarlso authored Sep 10, 2019
1 parent c68f7c1 commit 4c6678f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 16 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
echo "" > coverage.txt
for d in $(go list ./... | grep -v vendor | grep -v /testacc); do
go test -v -timeout 20s -race -coverprofile=profile.out -covermode=atomic $d
go test -v -timeout 50s -race -coverprofile=profile.out -covermode=atomic $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ RELEASE_NAME=${NAME}
HELM_DIR=$(shell pwd)/helm
TEST=$$(go list ./... | grep -v /vendor/ | grep /testacc)
TEST_TIMEOUT_ACC?=20m
TEST_TIMEOUT?=20s
TEST_TIMEOUT?=50s

default: dev

Expand Down
55 changes: 43 additions & 12 deletions plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"os"
"os/exec"
"sync"
"time"

"github.com/gaia-pipeline/gaia"
Expand Down Expand Up @@ -39,6 +40,34 @@ var pluginMap = map[string]plugin.Plugin{
// timeFormat is the logging time format.
const timeFormat = "2006/01/02 15:04:05"

// GaiaLogWriter represents a concurrent safe log writer which can be shared with go-plugin.
type GaiaLogWriter struct {
mu sync.RWMutex
buffer *bytes.Buffer
writer *bufio.Writer
}

// Write locks and writes to the underlying writer.
func (g *GaiaLogWriter) Write(p []byte) (n int, err error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.writer.Write(p)
}

// Flush locks and flushes the underlying writer.
func (g *GaiaLogWriter) Flush() error {
g.mu.Lock()
defer g.mu.Unlock()
return g.writer.Flush()
}

// WriteString locks and passes on the string to write to the underlying writer.
func (g *GaiaLogWriter) WriteString(s string) (int, error) {
g.mu.Lock()
defer g.mu.Unlock()
return g.writer.WriteString(s)
}

// GoPlugin represents a single plugin instance which uses gRPC
// to connect to exactly one plugin.
type GoPlugin struct {
Expand All @@ -55,8 +84,7 @@ type GoPlugin struct {
logFile *os.File

// Writer used to write logs from execution to file or buffer
writer *bufio.Writer
buffer *bytes.Buffer
logger GaiaLogWriter

// CA instance used to handle certificates
ca security.CAAPI
Expand Down Expand Up @@ -108,6 +136,9 @@ func (p *GoPlugin) NewPlugin(ca security.CAAPI) Plugin {
// It's up to the caller to call plugin.Close to shutdown the plugin
// and close the gRPC connection.
func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error {
// Initialise the logger
p.logger = GaiaLogWriter{}

// Create log file and open it.
// We will close this file in the close method.
if logPath != nil {
Expand All @@ -122,11 +153,11 @@ func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error {
}

// Create new writer
p.writer = bufio.NewWriter(p.logFile)
p.logger.writer = bufio.NewWriter(p.logFile)
} else {
// If no path is provided, write output to buffer
p.buffer = new(bytes.Buffer)
p.writer = bufio.NewWriter(p.buffer)
p.logger.buffer = new(bytes.Buffer)
p.logger.writer = bufio.NewWriter(p.logger.buffer)
}

// Create and sign a new pair of certificates for the server
Expand Down Expand Up @@ -161,15 +192,15 @@ func (p *GoPlugin) Init(command *exec.Cmd, logPath *string) error {
Plugins: pluginMap,
Cmd: command,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Stderr: p.writer,
Stderr: &p.logger,
TLSConfig: tlsConfig,
})

// Connect via gRPC
p.clientProtocol, err = p.client.Client()
if err != nil {
_ = p.writer.Flush()
return fmt.Errorf("%s\n\n--- output ---\n%s", err.Error(), p.buffer.String())
_ = p.logger.Flush()
return fmt.Errorf("%s\n\n--- output ---\n%s", err.Error(), p.logger.buffer.String())
}

return nil
Expand Down Expand Up @@ -227,7 +258,7 @@ func (p *GoPlugin) Execute(j *gaia.Job) error {

// Generate error message and attach it to logs.
timeString := time.Now().Format(timeFormat)
_, _ = p.writer.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, resultObj.Message))
_, _ = p.logger.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, resultObj.Message))
} else if err != nil {
// An error occurred during the send or somewhere else.
// The job itself usually does not return an error here.
Expand All @@ -236,7 +267,7 @@ func (p *GoPlugin) Execute(j *gaia.Job) error {

// Generate error message and attach it to logs.
timeString := time.Now().Format(timeFormat)
_, _ = p.writer.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, err.Error()))
_, _ = p.logger.WriteString(fmt.Sprintf("%s Job '%s' threw an error: %s\n", timeString, j.Title, err.Error()))
} else {
j.Status = gaia.JobSuccess
}
Expand Down Expand Up @@ -319,7 +350,7 @@ func (p *GoPlugin) GetJobs() ([]*gaia.Job, error) {

// FlushLogs flushes the logs.
func (p *GoPlugin) FlushLogs() error {
return p.writer.Flush()
return p.logger.Flush()
}

// Close shutdown the plugin and kills the gRPC connection.
Expand All @@ -332,7 +363,7 @@ func (p *GoPlugin) Close() {
p.client.Kill()

// Flush the writer
_ = p.writer.Flush()
_ = p.logger.Flush()

// Close log file
_ = p.logFile.Close()
Expand Down
6 changes: 4 additions & 2 deletions plugin/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func TestExecute(t *testing.T) {
})
p := &GoPlugin{pluginConn: new(fakeGaiaPlugin)}
buf := new(bytes.Buffer)
p.writer = bufio.NewWriter(buf)
p.logger = GaiaLogWriter{}
p.logger.writer = bufio.NewWriter(buf)
j := &gaia.Job{
Args: []*gaia.Argument{
{
Expand All @@ -148,7 +149,8 @@ func TestGetJobs(t *testing.T) {
})
p := &GoPlugin{pluginConn: new(fakeGaiaPlugin)}
buf := new(bytes.Buffer)
p.writer = bufio.NewWriter(buf)
p.logger = GaiaLogWriter{}
p.logger.writer = bufio.NewWriter(buf)
_, err := p.GetJobs()
if err != nil {
t.Fatal(err)
Expand Down

0 comments on commit 4c6678f

Please sign in to comment.