Skip to content

Commit

Permalink
Allow input file read ahead
Browse files Browse the repository at this point in the history
Input file now pre-reads N requests, sort them by timestamp and emit on demand.
You can control read depth using --input-file-read-depth which is 100 by default.

It makes implementaiton faster, and it fix various issues when due to concurrenccy, or another issues requests gets addeed out of order.
  • Loading branch information
buger committed Jul 8, 2021
1 parent 889c1e6 commit 625ed54
Show file tree
Hide file tree
Showing 8 changed files with 131 additions and 38 deletions.
2 changes: 1 addition & 1 deletion emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func CopyMulty(src PluginReader, writers ...PluginWriter) error {
}
} else {
for _, dst := range writers {
if _, err := dst.PluginWrite(msg); err != nil {
if _, err := dst.PluginWrite(msg); err != nil && err != io.ErrClosedPipe {
return err
}
}
Expand Down
137 changes: 114 additions & 23 deletions input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"compress/gzip"
"container/heap"
"errors"
"fmt"
"io"
Expand All @@ -20,49 +21,123 @@ import (
"github.com/aws/aws-sdk-go/service/s3"
)

type filePayload struct {
data []byte
timestamp int64
}

// An IntHeap is a min-heap of ints.
type payloadQueue struct {
sync.RWMutex
s []*filePayload
}

func (h payloadQueue) Len() int { return len(h.s) }
func (h payloadQueue) Less(i, j int) bool { return h.s[i].timestamp < h.s[j].timestamp }
func (h payloadQueue) Swap(i, j int) { h.s[i], h.s[j] = h.s[j], h.s[i] }

func (h *payloadQueue) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
h.s = append(h.s, x.(*filePayload))
}

func (h *payloadQueue) Pop() interface{} {
old := h.s
n := len(old)
x := old[n-1]
h.s = old[0 : n-1]
return x
}

func (h payloadQueue) Idx(i int) *filePayload {
h.RLock()
defer h.RUnlock()

return h.s[i]
}

type fileInputReader struct {
reader *bufio.Reader
data []byte
file io.ReadCloser
timestamp int64
closed int32 // Value of 0 indicates that the file is still open.
s3 bool
queue payloadQueue
readDepth int
}

func (f *fileInputReader) parseNext() error {
func (f *fileInputReader) parse(init chan struct{}) error {
payloadSeparatorAsBytes := []byte(payloadSeparator)
var buffer bytes.Buffer
var initialized bool

for {
line, err := f.reader.ReadBytes('\n')

if err != nil {
if err != io.EOF {
Debug(1, err)
} else {
f.Close()
}

f.Close()

if !initialized {
close(init)
initialized = true
}

return err
}

if bytes.Equal(payloadSeparatorAsBytes[1:], line) {
asBytes := buffer.Bytes()
meta := payloadMeta(asBytes)

f.timestamp, _ = strconv.ParseInt(string(meta[2]), 10, 64)
f.data = asBytes[:len(asBytes)-1]
timestamp, _ := strconv.ParseInt(string(meta[2]), 10, 64)
data := asBytes[:len(asBytes)-1]

return nil
f.queue.Lock()
heap.Push(&f.queue, &filePayload{
timestamp: timestamp,
data: data,
})
f.queue.Unlock()

for {
if f.queue.Len() < f.readDepth {
break
}

if !initialized {
close(init)
initialized = true
}

time.Sleep(100 * time.Millisecond)
}

buffer = bytes.Buffer{}
continue
}

buffer.Write(line)
}

}

func (f *fileInputReader) ReadPayload() []byte {
defer f.parseNext()
func (f *fileInputReader) wait() {
for {
if atomic.LoadInt32(&f.closed) == 1 {
return
}

if f.queue.Len() > 0 {
return
}

time.Sleep(100 * time.Millisecond)
}

return f.data
return
}

// Close closes this plugin
Expand All @@ -75,7 +150,7 @@ func (f *fileInputReader) Close() error {
return nil
}

func newFileInputReader(path string) *fileInputReader {
func newFileInputReader(path string, readDepth int) *fileInputReader {
var file io.ReadCloser
var err error

Expand All @@ -90,7 +165,7 @@ func newFileInputReader(path string) *fileInputReader {
return nil
}

r := &fileInputReader{file: file, closed: 0}
r := &fileInputReader{file: file, closed: 0, readDepth: readDepth}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
Expand All @@ -102,7 +177,11 @@ func newFileInputReader(path string) *fileInputReader {
r.reader = bufio.NewReader(file)
}

r.parseNext()
heap.Init(&r.queue)

init := make(chan struct{})
go r.parse(init)
<-init

return r
}
Expand All @@ -116,16 +195,18 @@ type FileInput struct {
readers []*fileInputReader
speedFactor float64
loop bool
readDepth int
}

// NewFileInput constructor for FileInput. Accepts file path as argument.
func NewFileInput(path string, loop bool) (i *FileInput) {
func NewFileInput(path string, loop bool, readDepth int) (i *FileInput) {
i = new(FileInput)
i.data = make(chan []byte, 1000)
i.exit = make(chan bool)
i.path = path
i.speedFactor = 1
i.loop = loop
i.readDepth = readDepth

if err := i.init(); err != nil {
return
Expand Down Expand Up @@ -176,7 +257,7 @@ func (i *FileInput) init() (err error) {
i.readers = make([]*fileInputReader, len(matches))

for idx, p := range matches {
i.readers[idx] = newFileInputReader(p)
i.readers[idx] = newFileInputReader(p, i.readDepth)
}

return nil
Expand All @@ -201,11 +282,17 @@ func (i *FileInput) String() string {
// Find reader with smallest timestamp e.g next payload in row
func (i *FileInput) nextReader() (next *fileInputReader) {
for _, r := range i.readers {
if r == nil || atomic.LoadInt32(&r.closed) != 0 {
if r == nil {
continue
}

if next == nil || r.timestamp < next.timestamp {
r.wait()

if r.queue.Len() == 0 {
continue
}

if next == nil || r.queue.Idx(0).timestamp > next.queue.Idx(0).timestamp {
next = r
continue
}
Expand Down Expand Up @@ -236,27 +323,31 @@ func (i *FileInput) emit() {
}
}

reader.queue.RLock()
payload := heap.Pop(&reader.queue).(*filePayload)
reader.queue.RUnlock()

if lastTime != -1 {
diff := reader.timestamp - lastTime
diff := payload.timestamp - lastTime

if i.speedFactor != 1 {
diff = int64(float64(diff) / i.speedFactor)
}

if diff >= 0 {
lastTime = reader.timestamp
lastTime = payload.timestamp
time.Sleep(time.Duration(diff))
}
} else {
lastTime = reader.timestamp
lastTime = payload.timestamp
}

// Recheck if we have exited since last check.
select {
case <-i.exit:
return
default:
i.data <- reader.ReadPayload()
i.data <- payload.data
}
}

Expand Down
12 changes: 6 additions & 6 deletions input_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestInputFileMultipleFilesWithRequestsOnly(t *testing.T) {
file2.Write([]byte(payloadSeparator))
file2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100)

for i := '1'; i <= '4'; i++ {
msg, _ := input.PluginRead()
Expand All @@ -130,7 +130,7 @@ func TestInputFileRequestsWithLatency(t *testing.T) {
file.Write([]byte("1 3 250000000\nrequest3"))
file.Write([]byte(payloadSeparator))

input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false)
input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), false, 100)

start := time.Now().UnixNano()
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestInputFileMultipleFilesWithRequestsAndResponses(t *testing.T) {
file2.Write([]byte(payloadSeparator))
file2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100)

for i := '1'; i <= '4'; i++ {
msg, _ := input.PluginRead()
Expand Down Expand Up @@ -198,7 +198,7 @@ func TestInputFileLoop(t *testing.T) {
file.Write([]byte(payloadSeparator))
file.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true)
input := NewFileInput(fmt.Sprintf("/tmp/%d", rnd), true, 100)

// Even if we have just 2 requests in file, it should indifinitly loop
for i := 0; i < 1000; i++ {
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestInputFileCompressed(t *testing.T) {
name2 := output2.file.Name()
output2.Close()

input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false)
input := NewFileInput(fmt.Sprintf("/tmp/%d*", rnd), false, 100)
for i := 0; i < 2000; i++ {
input.PluginRead()
}
Expand Down Expand Up @@ -326,7 +326,7 @@ func CreateCaptureFile(requestGenerator *RequestGenerator) *CaptureFile {
func ReadFromCaptureFile(captureFile *os.File, count int, callback writeCallback) (err error) {
wg := new(sync.WaitGroup)

input := NewFileInput(captureFile.Name(), false)
input := NewFileInput(captureFile.Name(), false, 100)
output := NewTestOutput(func(msg *Message) {
callback(msg)
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions output_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (i *DummyOutput) PluginWrite(msg *Message) (int, error) {
n += nn
nn, err = os.Stdout.Write(payloadSeparatorAsBytes)
n += nn

return n, err
}

Expand Down
2 changes: 1 addition & 1 deletion output_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFileOutput(t *testing.T) {
emitter.Close()

var counter int64
input2 := NewFileInput("/tmp/test_requests.gor", false)
input2 := NewFileInput("/tmp/test_requests.gor", false, 100)
output2 := NewTestOutput(func(*Message) {
atomic.AddInt64(&counter, 1)
wg.Done()
Expand Down
3 changes: 1 addition & 2 deletions plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func (plugins *InOutPlugins) registerPlugin(constructor interface{}, options ...
plugins.Outputs = append(plugins.Outputs, w)
}
plugins.All = append(plugins.All, plugin)

}

// NewPlugins specify and initialize all available plugins
Expand Down Expand Up @@ -119,7 +118,7 @@ func NewPlugins() *InOutPlugins {
}

for _, options := range Settings.InputFile {
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop)
plugins.registerPlugin(NewFileInput, options, Settings.InputFileLoop, Settings.InputFileReadDepth)
}

for _, path := range Settings.OutputFile {
Expand Down
2 changes: 1 addition & 1 deletion s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestInputFileFromS3(t *testing.T) {
<-output.closeCh
}

input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd), false)
input := NewFileInput(fmt.Sprintf("s3://test-gor-eu/%d", rnd, 100), false)

buf := make([]byte, 1000)
for i := 0; i <= 19999; i++ {
Expand Down
10 changes: 6 additions & 4 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ type AppSettings struct {
OutputTCPConfig TCPOutputConfig
OutputTCPStats bool `json:"output-tcp-stats"`

InputFile MultiOption `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
OutputFile MultiOption `json:"output-file"`
OutputFileConfig FileOutputConfig
InputFile MultiOption `json:"input-file"`
InputFileLoop bool `json:"input-file-loop"`
InputFileReadDepth int `json:"input-file-read-depth"`
OutputFile MultiOption `json:"output-file"`
OutputFileConfig FileOutputConfig

InputRAW MultiOption `json:"input_raw"`
RAWInputConfig
Expand Down Expand Up @@ -113,6 +114,7 @@ func init() {

flag.Var(&Settings.InputFile, "input-file", "Read requests from file: \n\tgor --input-file ./requests.gor --output-http staging.com")
flag.BoolVar(&Settings.InputFileLoop, "input-file-loop", false, "Loop input files, useful for performance testing.")
flag.IntVar(&Settings.InputFileReadDepth, "input-file-read-depth", 100, "GoReplay tries to read and cache multiple records, in advance. In parallel it also perform sorting of requests, if they came out of order. Since it needs hold this buffer in memory, bigger values can cause worse performance")

flag.Var(&Settings.OutputFile, "output-file", "Write incoming requests to file: \n\tgor --input-raw :80 --output-file ./requests.gor")
flag.DurationVar(&Settings.OutputFileConfig.FlushInterval, "output-file-flush-interval", time.Second, "Interval for forcing buffer flush to the file, default: 1s.")
Expand Down

0 comments on commit 625ed54

Please sign in to comment.