Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions go/test/endtoend/messaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ func TestMessage(t *testing.T) {

exec(t, conn, "insert into vitess_message(id, message) values(1, 'hello world')")

// account for jitter in timings, maxJitter uses the current hardcoded value for jitter in message_manager.go
jitter := int64(0)
maxJitter := int64(1.4 * 1e9)

// Consume first message.
start := time.Now().UnixNano()
got, err := streamConn.FetchNext()
Expand All @@ -110,15 +114,16 @@ func TestMessage(t *testing.T) {

qr := exec(t, conn, "select time_next, epoch from vitess_message where id = 1")
next, epoch := getTimeEpoch(qr)
jitter += epoch * maxJitter
// epoch could be 0 or 1, depending on how fast the row is updated
switch epoch {
case 0:
if !(start-1e9 < next && next < start) {
t.Errorf("next: %d. must be within 1s of start: %d", next/1e9, start/1e9)
if !(start-1e9 < next && next < (start+jitter)) {
t.Errorf("next: %d. must be within 1s of start: %d", next/1e9, (start+jitter)/1e9)
}
case 1:
if !(start < next && next < start+3e9) {
t.Errorf("next: %d. must be about 1s after start: %d", next/1e9, start/1e9)
if !(start < next && next < (start+jitter)+3e9) {
t.Errorf("next: %d. must be about 1s after start: %d", next/1e9, (start+jitter)/1e9)
}
default:
t.Errorf("epoch: %d, must be 0 or 1", epoch)
Expand All @@ -129,15 +134,16 @@ func TestMessage(t *testing.T) {
require.NoError(t, err)
qr = exec(t, conn, "select time_next, epoch from vitess_message where id = 1")
next, epoch = getTimeEpoch(qr)
jitter += epoch * maxJitter
// epoch could be 1 or 2, depending on how fast the row is updated
switch epoch {
case 1:
if !(start < next && next < start+3e9) {
t.Errorf("next: %d. must be about 1s after start: %d", next/1e9, start/1e9)
if !(start < next && next < (start+jitter)+3e9) {
t.Errorf("next: %d. must be about 1s after start: %d", next/1e9, (start+jitter)/1e9)
}
case 2:
if !(start+2e9 < next && next < start+6e9) {
t.Errorf("next: %d. must be about 3s after start: %d", next/1e9, start/1e9)
if !(start+2e9 < next && next < (start+jitter)+6e9) {
t.Errorf("next: %d. must be about 3s after start: %d", next/1e9, (start+jitter)/1e9)
Comment on lines +145 to +146
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jitter would have happened twice here, so maxJitter would be epoch * 1.333

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, made the change to use epoch*maxJitter. There will no jitter in Epoch 0: is that correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// have backoff be +/- 33%, whenever this is injected, append (:min_backoff, :jitter)
jitteredBackoff := "FLOOR((%a<<ifnull(epoch, 0)) * %a)"

Correct, no jitter. By default, the jitter is also windowed around the min/max backoff. If there are no settings, minBackoff is the same as ackWaitTimeout and no limit on maxBackoff.

}
default:
t.Errorf("epoch: %d, must be 1 or 2", epoch)
Expand Down