Skip to content

Commit

Permalink
add offset type to NewOffset func (#962)
Browse files Browse the repository at this point in the history
* add tests for explicit offset type

* add Offset type case and return

Co-authored-by: Milind L <[email protected]>

---------

Co-authored-by: Jack Dockerty <[email protected]>
Co-authored-by: Confluent Jenkins Bot <[email protected]>
Co-authored-by: Milind L <[email protected]>
  • Loading branch information
4 people authored Mar 20, 2023
1 parent 6c1ca1c commit 3d9047a
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This is a feature release:
* Added `SetRoundtripDuration` to the mock broker for setting RTT delay for
a given mock broker (@kkoehler, #892).
* Built-in support for Linux/ arm64. (#933).
* Support for Offset types on `Offset.Set()`

### Fixes

Expand Down
16 changes: 16 additions & 0 deletions kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,22 @@ func TestOffsetAPIs(t *testing.T) {
t.Errorf("Failed to change offset. Expect (%v), got (%v)\n", 10, offset)
}

// test passing explicit 'Offset' type
err = offset.Set(OffsetBeginning)
if err != nil {
t.Errorf("Cannot set offset to (%v). Error: %s \n", OffsetBeginning, err)
} else if offset != OffsetBeginning {
t.Errorf("Failed to change offset. Expect (%v), got %v\n", OffsetBeginning, offset)
}

// test passing explicit 'Offset' type
err = offset.Set(OffsetEnd)
if err != nil {
t.Errorf("Cannot set offset to (%v). Error: %s \n", OffsetEnd, err)
} else if offset != OffsetEnd {
t.Errorf("Failed to change offset. Expect (%v), got %v\n", OffsetEnd, offset)
}

// test OffsetTail()
tail := OffsetTail(offset)
t.Logf("offset tail %v\n", tail)
Expand Down
6 changes: 4 additions & 2 deletions kafka/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (o *Offset) Set(offset interface{}) error {
return err
}

// NewOffset creates a new Offset using the provided logical string, or an
// absolute int64 offset value.
// NewOffset creates a new Offset using the provided logical string, an
// absolute int64 offset value, or a concrete Offset type.
// Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored"
func NewOffset(offset interface{}) (Offset, error) {

Expand Down Expand Up @@ -107,6 +107,8 @@ func NewOffset(offset interface{}) (Offset, error) {
return Offset((int64)(v)), nil
case int64:
return Offset(v), nil
case Offset:
return Offset(v), nil
default:
return OffsetInvalid, newErrorFromString(ErrInvalidArg,
fmt.Sprintf("Invalid offset type: %t", v))
Expand Down

0 comments on commit 3d9047a

Please sign in to comment.