|
| 1 | +// Copyright 2020 Google LLC |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// https://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | + |
| 14 | +package wire |
| 15 | + |
| 16 | +import ( |
| 17 | + "container/list" |
| 18 | + "fmt" |
| 19 | + "sync" |
| 20 | +) |
| 21 | + |
| 22 | +// AckConsumer is the interface exported from this package for acking messages. |
| 23 | +type AckConsumer interface { |
| 24 | + Ack() |
| 25 | +} |
| 26 | + |
| 27 | +// ackedFunc is invoked when a message has been acked by the user. Note: if the |
| 28 | +// ackedFunc implementation calls any ackConsumer methods, it needs to run in a |
| 29 | +// goroutine to avoid a deadlock. |
| 30 | +type ackedFunc func(*ackConsumer) |
| 31 | + |
| 32 | +// ackConsumer is used for handling message acks. It is attached to a Message |
| 33 | +// and also stored within the ackTracker until the message has been acked by the |
| 34 | +// user. |
| 35 | +type ackConsumer struct { |
| 36 | + // The message offset. |
| 37 | + Offset int64 |
| 38 | + // Bytes released to the flow controller once the message has been acked. |
| 39 | + MsgBytes int64 |
| 40 | + |
| 41 | + // Guards access to fields below. |
| 42 | + mu sync.Mutex |
| 43 | + acked bool |
| 44 | + onAck ackedFunc |
| 45 | +} |
| 46 | + |
| 47 | +func newAckConsumer(offset, msgBytes int64, onAck ackedFunc) *ackConsumer { |
| 48 | + return &ackConsumer{Offset: offset, MsgBytes: msgBytes, onAck: onAck} |
| 49 | +} |
| 50 | + |
| 51 | +func (ac *ackConsumer) Ack() { |
| 52 | + ac.mu.Lock() |
| 53 | + defer ac.mu.Unlock() |
| 54 | + |
| 55 | + if ac.acked { |
| 56 | + return |
| 57 | + } |
| 58 | + ac.acked = true |
| 59 | + if ac.onAck != nil { |
| 60 | + // Not invoked in a goroutine here for ease of testing. |
| 61 | + ac.onAck(ac) |
| 62 | + } |
| 63 | +} |
| 64 | + |
| 65 | +func (ac *ackConsumer) IsAcked() bool { |
| 66 | + ac.mu.Lock() |
| 67 | + defer ac.mu.Unlock() |
| 68 | + return ac.acked |
| 69 | +} |
| 70 | + |
| 71 | +// Clear onAck when the ack can no longer be processed. The user's ack would be |
| 72 | +// ignored. |
| 73 | +func (ac *ackConsumer) Clear() { |
| 74 | + ac.mu.Lock() |
| 75 | + defer ac.mu.Unlock() |
| 76 | + ac.onAck = nil |
| 77 | +} |
| 78 | + |
| 79 | +// Represents an uninitialized cursor offset. A sentinel value is used instead |
| 80 | +// if an optional to simplify cursor comparisons (i.e. -1 works without the need |
| 81 | +// to check for nil and then convert to int64). |
| 82 | +const nilCursorOffset int64 = -1 |
| 83 | + |
| 84 | +// ackTracker manages outstanding message acks, i.e. messages that have been |
| 85 | +// delivered to the user, but not yet acked. It is used by the committer and |
| 86 | +// wireSubscriber, so requires its own mutex. |
| 87 | +type ackTracker struct { |
| 88 | + // Guards access to fields below. |
| 89 | + mu sync.Mutex |
| 90 | + // All offsets before and including this prefix have been acked by the user. |
| 91 | + ackedPrefixOffset int64 |
| 92 | + // Outstanding message acks, strictly ordered by increasing message offsets. |
| 93 | + outstandingAcks *list.List // Value = *ackConsumer |
| 94 | +} |
| 95 | + |
| 96 | +func newAckTracker() *ackTracker { |
| 97 | + return &ackTracker{ |
| 98 | + ackedPrefixOffset: nilCursorOffset, |
| 99 | + outstandingAcks: list.New(), |
| 100 | + } |
| 101 | +} |
| 102 | + |
| 103 | +// Push adds an outstanding ack to the tracker. |
| 104 | +func (at *ackTracker) Push(ack *ackConsumer) error { |
| 105 | + at.mu.Lock() |
| 106 | + defer at.mu.Unlock() |
| 107 | + |
| 108 | + // These errors should not occur unless there is a bug in the client library |
| 109 | + // as message ordering should have been validated by subscriberOffsetTracker. |
| 110 | + if ack.Offset <= at.ackedPrefixOffset { |
| 111 | + return errOutOfOrderMessages |
| 112 | + } |
| 113 | + if elem := at.outstandingAcks.Back(); elem != nil { |
| 114 | + lastOutstandingAck, _ := elem.Value.(*ackConsumer) |
| 115 | + if ack.Offset <= lastOutstandingAck.Offset { |
| 116 | + return errOutOfOrderMessages |
| 117 | + } |
| 118 | + } |
| 119 | + |
| 120 | + at.outstandingAcks.PushBack(ack) |
| 121 | + return nil |
| 122 | +} |
| 123 | + |
| 124 | +// CommitOffset returns the cursor offset that should be committed. May return |
| 125 | +// nilCursorOffset if no messages have been acked thus far. |
| 126 | +func (at *ackTracker) CommitOffset() int64 { |
| 127 | + at.mu.Lock() |
| 128 | + defer at.mu.Unlock() |
| 129 | + |
| 130 | + // Process outstanding acks and update `ackedPrefixOffset` until an unacked |
| 131 | + // message is found. |
| 132 | + for { |
| 133 | + elem := at.outstandingAcks.Front() |
| 134 | + if elem == nil { |
| 135 | + break |
| 136 | + } |
| 137 | + ack, _ := elem.Value.(*ackConsumer) |
| 138 | + if !ack.IsAcked() { |
| 139 | + break |
| 140 | + } |
| 141 | + at.ackedPrefixOffset = ack.Offset |
| 142 | + at.outstandingAcks.Remove(elem) |
| 143 | + ack.Clear() |
| 144 | + } |
| 145 | + |
| 146 | + if at.ackedPrefixOffset == nilCursorOffset { |
| 147 | + return nilCursorOffset |
| 148 | + } |
| 149 | + // Convert from last acked to first unacked, which is the commit offset. |
| 150 | + return at.ackedPrefixOffset + 1 |
| 151 | +} |
| 152 | + |
| 153 | +// Release clears and invalidates any outstanding acks. This should be called |
| 154 | +// when the subscriber terminates. |
| 155 | +func (at *ackTracker) Release() { |
| 156 | + at.mu.Lock() |
| 157 | + defer at.mu.Unlock() |
| 158 | + |
| 159 | + for elem := at.outstandingAcks.Front(); elem != nil; elem = elem.Next() { |
| 160 | + ack, _ := elem.Value.(*ackConsumer) |
| 161 | + ack.Clear() |
| 162 | + } |
| 163 | + at.outstandingAcks.Init() |
| 164 | +} |
| 165 | + |
| 166 | +// commitCursorTracker tracks pending and last successful committed offsets. |
| 167 | +// It is only accessed by the committer. |
| 168 | +type commitCursorTracker struct { |
| 169 | + // Used to obtain the desired commit offset based on messages acked by the |
| 170 | + // user. |
| 171 | + acks *ackTracker |
| 172 | + // Last offset for which the server confirmed (acknowledged) the commit. |
| 173 | + lastConfirmedOffset int64 |
| 174 | + // Queue of committed offsets awaiting confirmation from the server. |
| 175 | + pendingOffsets *list.List // Value = int64 |
| 176 | +} |
| 177 | + |
| 178 | +func newCommitCursorTracker(acks *ackTracker) *commitCursorTracker { |
| 179 | + return &commitCursorTracker{ |
| 180 | + acks: acks, |
| 181 | + lastConfirmedOffset: nilCursorOffset, |
| 182 | + pendingOffsets: list.New(), |
| 183 | + } |
| 184 | +} |
| 185 | + |
| 186 | +func extractOffsetFromElem(elem *list.Element) int64 { |
| 187 | + if elem == nil { |
| 188 | + return nilCursorOffset |
| 189 | + } |
| 190 | + offset, _ := elem.Value.(int64) |
| 191 | + return offset |
| 192 | +} |
| 193 | + |
| 194 | +// NextOffset is the commit offset to be sent to the stream. Returns |
| 195 | +// nilCursorOffset if the commit offset does not need to be updated. |
| 196 | +func (ct *commitCursorTracker) NextOffset() int64 { |
| 197 | + desiredCommitOffset := ct.acks.CommitOffset() |
| 198 | + if desiredCommitOffset <= ct.lastConfirmedOffset { |
| 199 | + // The server has already confirmed the commit offset. |
| 200 | + return nilCursorOffset |
| 201 | + } |
| 202 | + if desiredCommitOffset <= extractOffsetFromElem(ct.pendingOffsets.Back()) { |
| 203 | + // The commit offset has already been sent to the commit stream and is |
| 204 | + // awaiting confirmation. |
| 205 | + return nilCursorOffset |
| 206 | + } |
| 207 | + return desiredCommitOffset |
| 208 | +} |
| 209 | + |
| 210 | +// AddPending adds a sent, but not yet confirmed, committed offset. |
| 211 | +func (ct *commitCursorTracker) AddPending(offset int64) { |
| 212 | + ct.pendingOffsets.PushBack(offset) |
| 213 | +} |
| 214 | + |
| 215 | +// ClearPending discards old pending offsets. Should be called when the commit |
| 216 | +// stream reconnects, as the server acknowledgments for these would not be |
| 217 | +// received. |
| 218 | +func (ct *commitCursorTracker) ClearPending() { |
| 219 | + ct.pendingOffsets.Init() |
| 220 | +} |
| 221 | + |
| 222 | +// ConfirmOffsets processes the server's acknowledgment of the first |
| 223 | +// `numConfirmed` pending offsets. |
| 224 | +func (ct *commitCursorTracker) ConfirmOffsets(numConfirmed int64) error { |
| 225 | + if numPending := int64(ct.pendingOffsets.Len()); numPending < numConfirmed { |
| 226 | + return fmt.Errorf("pubsublite: server acknowledged %d cursor commits, but only %d were sent", numConfirmed, numPending) |
| 227 | + } |
| 228 | + |
| 229 | + for i := int64(0); i < numConfirmed; i++ { |
| 230 | + front := ct.pendingOffsets.Front() |
| 231 | + ct.lastConfirmedOffset = extractOffsetFromElem(front) |
| 232 | + ct.pendingOffsets.Remove(front) |
| 233 | + } |
| 234 | + return nil |
| 235 | +} |
| 236 | + |
| 237 | +// Done when the server has confirmed the desired commit offset. |
| 238 | +func (ct *commitCursorTracker) Done() bool { |
| 239 | + return ct.acks.CommitOffset() <= ct.lastConfirmedOffset |
| 240 | +} |
0 commit comments