Skip to content

Commit

Permalink
Fix https_batch deadlock due to golang timer changes
Browse files Browse the repository at this point in the history
It makes tests hopefully more robus

It also replaces most sleeps with Consistently and Eventually.
It makes the timings more forgiving.

This should make it reliable on weak hardware.

Add functional options pattern to allow test configuration
  • Loading branch information
nicklas-dohrn committed Feb 24, 2025
1 parent 58e63a4 commit e7e9fba
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 51 deletions.
105 changes: 70 additions & 35 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,35 @@ import (
"bytes"
"crypto/tls"
"log"
"sync"
"time"

"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
metrics "code.cloudfoundry.org/go-metric-registry"
"code.cloudfoundry.org/loggregator-agent-release/src/pkg/egress"
)

const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
msgs chan []byte
batchSize int
sendInterval time.Duration
egrMsgCount float64
msgChan chan []byte
quit chan struct{}
wg sync.WaitGroup
}

type Option func(*HTTPSBatchWriter)

func WithBatchSize(size int) Option {
return func(w *HTTPSBatchWriter) {
w.batchSize = size
}
}

func WithSendInterval(interval time.Duration) Option {
return func(w *HTTPSBatchWriter) {
w.sendInterval = interval
}
}

func NewHTTPSBatchWriter(
Expand All @@ -27,10 +41,12 @@ func NewHTTPSBatchWriter(
tlsConf *tls.Config,
egressMetric metrics.Counter,
c *Converter,
options ...Option,
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
BatchWriter := &HTTPSBatchWriter{
binding.URL.Scheme = "https"

writer := &HTTPSBatchWriter{
HTTPSWriter: HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
Expand All @@ -39,60 +55,79 @@ func NewHTTPSBatchWriter(
egressMetric: egressMetric,
syslogConverter: c,
},
batchSize: BATCHSIZE,
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
batchSize: 256 * 1024, // Default value
sendInterval: 1 * time.Second, // Default value
msgChan: make(chan []byte), // Buffered channel for messages
quit: make(chan struct{}),
}

for _, opt := range options {
opt(writer)
}
go BatchWriter.startSender()
return BatchWriter

writer.wg.Add(1)
go writer.startSender()

return writer
}

// Modified Write function
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
log.Printf("Failed to parse syslog, dropping faulty message, err: %s", err)
log.Printf("Failed to parse syslog, dropping message, err: %s", err)
return nil
}

for _, msg := range msgs {
//There is no correct way of implementing error based retries in the current architecture.
//Retries for https-batching will be implemented at a later point in time.
w.msgs <- msg
w.msgChan <- msg
}

return nil
}

func (w *HTTPSBatchWriter) startSender() {
t := time.NewTimer(w.sendInterval)
defer w.wg.Done()

ticker := time.NewTicker(w.sendInterval)
defer ticker.Stop()

var msgBatch bytes.Buffer
var msgCount float64
reset := func() {
msgBatch.Reset()
msgCount = 0
t.Reset(w.sendInterval)

sendBatch := func() {
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) // nolint:errcheck
msgBatch.Reset()
msgCount = 0
}
}

for {
select {
case msg := <-w.msgs:
length, buffer_err := msgBatch.Write(msg)
if buffer_err != nil {
log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", length, buffer_err)
reset()
case msg := <-w.msgChan:
_, err := msgBatch.Write(msg)
if err != nil {
log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", msgBatch.Len(), err)
msgBatch.Reset()
msgCount = 0
} else {
msgCount++
if length >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
if msgBatch.Len() >= w.batchSize {
sendBatch()
}
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
}
case <-ticker.C:
sendBatch()
case <-w.quit:
sendBatch()
return
}
}
}

func (w *HTTPSBatchWriter) Close() error {
close(w.quit)
w.wg.Wait() // Ensure sender finishes processing before closing
close(w.msgChan)
return nil
}
51 changes: 35 additions & 16 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@ import (
. "github.com/onsi/gomega"
)

var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf"
var stringTo256Chars string

func init() {
//With the rest of the syslog, this results in a syslogenvelope of the size 400
for i := 0; i < 256; i++ {
stringTo256Chars += "a"
}
}

var _ = Describe("HTTPS_batch", func() {
var (
Expand All @@ -30,10 +37,10 @@ var _ = Describe("HTTPS_batch", func() {
b *syslog.URLBinding
writer egress.WriteCloser
)
string_to_1024_chars += string_to_1024_chars

BeforeEach(func() {
drain = newBatchMockDrain(200)
drain.Reset()
b = buildURLBinding(
drain.URL,
"test-app-id",
Expand All @@ -45,6 +52,8 @@ var _ = Describe("HTTPS_batch", func() {
skipSSLTLSConfig,
&metricsHelpers.SpyMetric{},
c,
syslog.WithBatchSize(5000),
syslog.WithSendInterval(100*time.Millisecond),
)
})

Expand All @@ -53,7 +62,7 @@ var _ = Describe("HTTPS_batch", func() {
Expect(writer.Write(env1)).To(Succeed())
env2 := buildLogEnvelope("APP", "2", "message 2", loggregator_v2.Log_OUT)
Expect(writer.Write(env2)).To(Succeed())
time.Sleep(1050 * time.Millisecond)
time.Sleep(150 * time.Millisecond)

Expect(drain.getMessagesSize()).Should(Equal(2))
expected := &rfc5424.Message{
Expand Down Expand Up @@ -83,24 +92,36 @@ var _ = Describe("HTTPS_batch", func() {
})

It("test batch dispatching with all logs in a given timeframe", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
env1 := buildLogEnvelope("APP", "1", "string to get log to 400 characters:"+stringTo256Chars, loggregator_v2.Log_OUT)
for i := 0; i < 10; i++ {
Expect(writer.Write(env1)).To(Succeed())
time.Sleep(99 * time.Millisecond)
}
Expect(drain.getMessagesSize()).Should(Equal(0))
time.Sleep(100 * time.Millisecond)
Expect(drain.getMessagesSize()).Should(Equal(10))
Expect(drain.getMessagesSize()).To(Equal(0))
Eventually(drain.getMessagesSize, 180*time.Millisecond).Should(Equal(10))
})

It("probabilistic test for race condition", func() {
env1 := buildLogEnvelope("APP", "1", "string to get log to 1024 characters:"+string_to_1024_chars, loggregator_v2.Log_OUT)
for i := 0; i < 10; i++ {
It("test dispatching for batches before timewindow is finished", func() {
// One envelope has the size of 400byte
env1 := buildLogEnvelope("APP", "1", "string to get log to 400 characters:"+stringTo256Chars, loggregator_v2.Log_OUT)

for i := 0; i < 20; i++ {
Expect(writer.Write(env1)).To(Succeed())
time.Sleep(99 * time.Millisecond)
}
time.Sleep(100 * time.Millisecond)
Expect(drain.getMessagesSize()).Should(Equal(10))
// DefaultBatchSize = 5000byte, 12 * 400byte = 4800byte, 13 * 400byte = 5200byte
// -> The batch will trigger after 13 messages, and this is not a direct hit to prevent inconsistencies.
Eventually(drain.getMessagesSize, 50*time.Millisecond).Should(Equal(13))
Eventually(drain.getMessagesSize, 120*time.Millisecond).Should(Equal(20))
})

It("test for hanging after some ticks", func() {
// This test will not succeed on the timer based implementation,
// it works fine with a ticker based implementation.
env1 := buildLogEnvelope("APP", "1", "only a short test message", loggregator_v2.Log_OUT)
for i := 0; i < 5; i++ {
Expect(writer.Write(env1)).To(Succeed())
time.Sleep(220 * time.Millisecond) // this sleeps at least 2 ticks, to trigger once without events
}
Eventually(drain.getMessagesSize, 120*time.Millisecond).Should(Equal(5))
})
})

Expand All @@ -112,8 +133,6 @@ func newBatchMockDrain(status int) *SpyDrain {
Expect(err).ToNot(HaveOccurred())
defer r.Body.Close()

println(body)

message := &rfc5424.Message{}

messages := bytes.SplitAfter(body, []byte("\n"))
Expand Down
5 changes: 5 additions & 0 deletions src/pkg/egress/syslog/https_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ type SpyDrain struct {
headers []http.Header
}

func (d *SpyDrain) Reset() {
d.messages = nil
d.headers = nil
}

func (d *SpyDrain) appendMessage(message *rfc5424.Message) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down

0 comments on commit e7e9fba

Please sign in to comment.