Skip to content

Commit

Permalink
Include offset in consumer poll response
Browse files Browse the repository at this point in the history
Resolves #40.
  • Loading branch information
jorgebay committed Oct 7, 2022
1 parent a832944 commit 11d3f91
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 30 deletions.
48 changes: 24 additions & 24 deletions docs/developer/NETWORK_FORMATS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,28 @@ All network encoding is big-endian.
## Consumer poll response

```
+------------------------------------------------------------------------------+
| items length (uint16) |
+------------------------------------------------------------------------------+
| response items |
| +--------------------------------------------------------------------------+ |
| | item | |
| | +----------------------+---------------------+------------------------+ | |
| | | token (int64) | range index (uint8) | gen version (uint32) | | |
| | +----------------------+---------------------+------------------------+ | |
| | | topic length (uint8) | topic name (bytes) | payload length (int32) | | |
| | +----------------------+---------------------+------------------------+ | |
| | | compressed payload (zstd crc) (bytes) | | |
| | +---------------------------------------------------------------------+ | |
| +--------------------------------------------------------------------------+ |
| |
| +--------------------------------------------------------------------------+ |
| | item | |
| +--------------------------------------------------------------------------+ |
| |
| . |
| . |
| . |
| |
+------------------------------------------------------------------------------+
+--------------------------------------------------------------------------------+
| items length (uint16) |
+--------------------------------------------------------------------------------+
| response items |
| +----------------------------------------------------------------------------+ |
| | item | |
| | +------------------------+---------------------+------------------------+ | |
| | | token (int64) | range index (uint8) | gen version (uint32) | | |
| | +------------------------+---------------------+------------------------+ | |
| | | topic length (uint8) | topic name (bytes) | offset (int64) | | |
| | +------------------------+---------------------+------------------------+ | |
| | | payload length (int32) | compressed payload (zstd crc) (bytes) | | |
| | +------------------------+----------------------------------------------+ | |
| +----------------------------------------------------------------------------+ |
| |
| +----------------------------------------------------------------------------+ |
| | item | |
| +----------------------------------------------------------------------------+ |
| |
| . |
| . |
| . |
| |
+--------------------------------------------------------------------------------+
```
3 changes: 3 additions & 0 deletions internal/consuming/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (i *consumerResponseItem) Marshal(w io.Writer) error {
if _, err := w.Write([]byte(i.topic.Name)); err != nil {
return err
}
if err := binary.Write(w, conf.Endianness, i.chunk.StartOffset()); err != nil {
return err
}
payload := i.chunk.DataBlock()
if err := binary.Write(w, conf.Endianness, int32(len(payload))); err != nil {
return err
Expand Down
20 changes: 14 additions & 6 deletions internal/test/integration/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,15 +233,19 @@ var _ = Describe("A 3 node cluster", func() {
time.Sleep(ConsumerAddDelay)

records := make([]string, 0, totalMessages)
expectedOffset := int64(0)
for {
resp := client.ConsumerPoll(0);
if resp.StatusCode == http.StatusOK {
messages := readConsumerResponse(resp)
for _, m := range messages {
for _, r := range m.records {
consumerResponses := readConsumerResponse(resp)
for _, c := range consumerResponses {
Expect(c.startOffset).To(Equal(expectedOffset))
for _, r := range c.records {
// Store only the last characters to avoid blowing up memory in the test
records = append(records, r.body[len(r.body) - 10:])
}
// Increase the next one
expectedOffset += int64(len(c.records))
}
}
if resp.StatusCode == http.StatusNoContent {
Expand Down Expand Up @@ -351,8 +355,9 @@ func expectOkOrMessage(resp *http.Response, message string, description... inter
}

type consumerResponseItem struct {
topic *TopicDataId
records []record
topic *TopicDataId
startOffset int64
records []record
}

type record struct {
Expand All @@ -378,8 +383,11 @@ func generateString(length int) string {
func unmarshalConsumerResponseItem(r io.Reader) consumerResponseItem {
item := consumerResponseItem{}
item.topic = unmarshalTopicId(r)
err := binary.Read(r, conf.Endianness, &item.startOffset)
Expect(err).NotTo(HaveOccurred())
payloadLength := int32(0)
binary.Read(r, conf.Endianness, &payloadLength)
err = binary.Read(r, conf.Endianness, &payloadLength)
Expect(err).NotTo(HaveOccurred())
payload := make([]byte, payloadLength)
n, err := io.ReadFull(r, payload)
Expect(n).To(Equal(int(payloadLength)))
Expand Down

0 comments on commit 11d3f91

Please sign in to comment.