Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ require (
github.com/jackc/pgx/v4 v4.18.3
github.com/jhump/protoreflect v1.15.6
github.com/jmespath/go-jmespath v0.4.0
github.com/klauspost/compress v1.18.1
github.com/klauspost/compress v1.18.2
github.com/klauspost/pgzip v1.2.6
github.com/knights-analytics/hugot v0.4.3
github.com/lib/pq v1.10.9
Expand Down Expand Up @@ -232,9 +232,8 @@ require (

require (
github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect
github.com/s2-streamstore/optr v1.1.0 // indirect
github.com/s2-streamstore/s2-sdk-go v0.6.0
github.com/tidwall/btree v1.7.0 // indirect
github.com/s2-streamstore/s2-sdk-go v0.11.6
github.com/tidwall/btree v1.8.1 // indirect
)

require (
Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1464,8 +1464,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co=
github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0=
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU=
github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
Expand Down Expand Up @@ -1749,10 +1749,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w=
github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk=
github.com/s2-streamstore/optr v1.1.0 h1:ExwYuwxb4Z1mVI+717xKUEDZ1y0ikFAV55jtk8XCYQk=
github.com/s2-streamstore/optr v1.1.0/go.mod h1:ujwXWMeanje1NE1aaaylBwmBkMalZ+wxFxSFOSHmJis=
github.com/s2-streamstore/s2-sdk-go v0.6.0 h1:QtCUX0EABn7i81+i7SkVfW/wU8QvLvCJFb3fpIlcBM0=
github.com/s2-streamstore/s2-sdk-go v0.6.0/go.mod h1:VAK2MYAXfT2eMq4IpaWHRuL5ZBbf5mLyJ4X8Ee+9qdY=
github.com/s2-streamstore/s2-sdk-go v0.11.6 h1:CavBN8rtr9QFhT060wdEIQ71WmRzDIJt7MAuacqhXx0=
github.com/s2-streamstore/s2-sdk-go v0.11.6/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/schollz/progressbar/v2 v2.15.0 h1:dVzHQ8fHRmtPjD3K10jT3Qgn/+H+92jhPrhmxIJfDz8=
github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI=
Expand Down Expand Up @@ -1832,8 +1830,8 @@ github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzl
github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A=
github.com/theparanoids/crypki v1.20.9 h1:t2ePWWNWAf0lhql4g//AIodZBgZTwjelvaq6wXzAVzU=
github.com/theparanoids/crypki v1.20.9/go.mod h1:Ucf6AB4UP33wqNe8UiJX/vg7j9tXNlNna2tFouExgT0=
github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA=
github.com/tidwall/btree v1.8.1/go.mod h1:jBbTdUWhSZClZWoDg54VnvV7/54modSOzDN7VXftj1A=
github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY=
github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
Expand Down
10 changes: 5 additions & 5 deletions internal/impl/s2/bento.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

const (
basinField = "basin"
authTokenField = "auth_token"
basinField = "basin"
accessTokenField = "access_token"
)

func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) {
Expand All @@ -16,14 +16,14 @@ func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) {
return nil, err
}

authToken, err := conf.FieldString(authTokenField)
accessToken, err := conf.FieldString(accessTokenField)
if err != nil {
return nil, err
}

return &s2bentobox.Config{
Basin: basin,
AuthToken: authToken,
Basin: basin,
AccessToken: accessToken,
}, nil
}

Expand Down
13 changes: 6 additions & 7 deletions internal/impl/s2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ const (

func newInputConfigSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Beta().
Version("1.5.0").
Categories("Services").
Fields(
service.NewStringField(basinField).Description("Basin name"),
service.NewStringField(authTokenField).
Description("Authentication token for S2 account").
service.NewStringField(accessTokenField).
Description("Access token for S2 account").
Secret(),
service.NewAnyField(streamsField).
Description("Streams prefix or list of streams to subscribe to"),
Expand Down Expand Up @@ -117,7 +116,7 @@ input:
s2:
basin: my-favorite-basin
streams: my-favorite-prefix/
auth_token: "${S2_AUTH_TOKEN}"
access_token: "${S2_ACCESS_TOKEN}"
cache: s2_seq_num

output:
Expand Down Expand Up @@ -234,7 +233,7 @@ func (i *Input) Connect(ctx context.Context) error {
func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) {
i.logger.Debug("Reading batch from S2")

batch, aFn, stream, err := i.inner.ReadBatch(ctx)
records, aFn, stream, err := i.inner.ReadBatch(ctx)
if err != nil {
if errors.Is(err, s2bentobox.ErrInputClosed) {
return nil, nil, service.ErrNotConnected
Expand All @@ -243,9 +242,9 @@ func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.Ac
return nil, nil, err
}

messages := make([]*service.Message, 0, len(batch.Records))
messages := make([]*service.Message, 0, len(records))

for _, record := range batch.Records {
for _, record := range records {
msg := service.NewMessage(record.Body)

if len(record.Headers) == 1 && len(record.Headers[0].Name) == 0 {
Expand Down
33 changes: 15 additions & 18 deletions internal/impl/s2/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Setting the batch byte size max a bit conservatively since Bento does not
// take metadata size into account. Moreover, the S2 metered size of a record
// will be > bento message size.
const maxBatchBytes = 256 * 1024
const maxBatchBytesConservative = 256 * 1024

var (
errInvalidBatchingByteSize = errors.New("invalid batch policy byte size")
Expand Down Expand Up @@ -59,13 +59,12 @@ const (

func newOutputConfigSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Beta().
Version("1.5.0").
Categories("Services").
Fields(
service.NewStringField(basinField).Description("Basin name"),
service.NewStringField(authTokenField).
Description("Authentication token for S2 account").
service.NewStringField(accessTokenField).
Description("Access token for S2 account").
Secret(),
service.NewStringField(streamField).Description("Stream name"),
service.NewStringField(fencingTokenField).
Expand All @@ -84,7 +83,7 @@ root = if this.byte_size > %d {
}
`,
s2.MaxBatchRecords,
maxBatchBytes,
maxBatchBytesConservative,
),
),
service.NewOutputMaxInFlightField().Advanced(),
Expand Down Expand Up @@ -130,7 +129,7 @@ output:
s2:
basin: my-favorite-basin
stream: starwars
auth_token: "${S2_AUTH_TOKEN}"
access_token: "${S2_ACCESS_TOKEN}"
`,
)
}
Expand All @@ -144,7 +143,7 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) {
// Set required defaults

if policy.ByteSize <= 0 {
policy.ByteSize = maxBatchBytes
policy.ByteSize = maxBatchBytesConservative
}

if policy.Count <= 0 {
Expand All @@ -159,8 +158,8 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) {

// Validate limits

if policy.ByteSize > maxBatchBytes {
return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchBytes)
if policy.ByteSize > maxBatchBytesConservative {
return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchMeteredBytes)
}

if policy.Count > s2.MaxBatchRecords {
Expand All @@ -186,25 +185,28 @@ func newOutputConfig(conf *service.ParsedConfig) (*s2bentobox.OutputConfig, erro
return nil, err
}

var fencingToken []byte
var fencingToken *string

if conf.Contains(fencingTokenField) {
field, err := conf.FieldString(fencingTokenField)
if err != nil {
return nil, err
}

fencingToken, err = base64.StdEncoding.DecodeString(field)
// Decode base64 to get the raw token string
decoded, err := base64.StdEncoding.DecodeString(field)
if err != nil {
return nil, err
}
decodedStr := string(decoded)
fencingToken = &decodedStr
}

return &s2bentobox.OutputConfig{
Config: config,
Stream: stream,
MaxInFlight: maxInFlight,
FencingToken: fencingToken,
MaxInFlight: maxInFlight,
}, nil
}

Expand Down Expand Up @@ -261,12 +263,7 @@ func (o *Output) WriteBatch(ctx context.Context, batch service.MessageBatch) err
return err
}

recordBatch, leftOver := s2.NewAppendRecordBatch(records...)
if len(leftOver) > 0 {
return s2bentobox.ErrAppendRecordBatchFull
}

if err := o.inner.WriteBatch(ctx, recordBatch); err != nil {
if err := o.inner.WriteBatch(ctx, records); err != nil {
if errors.Is(err, s2bentobox.ErrOutputClosed) {
return service.ErrNotConnected
}
Expand Down
16 changes: 8 additions & 8 deletions website/docs/components/inputs/s2.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: s2
slug: s2
type: input
status: beta
status: experimental
categories: ["Services"]
---

Expand All @@ -15,8 +15,8 @@ categories: ["Services"]
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

:::caution BETA
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
:::caution EXPERIMENTAL
This component is experimental and therefore subject to change or removal outside of major version releases.
:::
Consumes records from S2 streams

Expand All @@ -36,7 +36,7 @@ input:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
access_token: "" # No default (required)
streams: null # No default (required)
cache: "" # No default (required)
```
Expand All @@ -50,7 +50,7 @@ input:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
access_token: "" # No default (required)
streams: null # No default (required)
cache: "" # No default (required)
max_in_flight: 0
Expand Down Expand Up @@ -109,7 +109,7 @@ input:
s2:
basin: my-favorite-basin
streams: my-favorite-prefix/
auth_token: "${S2_AUTH_TOKEN}"
access_token: "${S2_ACCESS_TOKEN}"
cache: s2_seq_num

output:
Expand All @@ -130,9 +130,9 @@ Basin name

Type: `string`

### `auth_token`
### `access_token`

Authentication token for S2 account
Access token for S2 account
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::
Expand Down
16 changes: 8 additions & 8 deletions website/docs/components/outputs/s2.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
title: s2
slug: s2
type: output
status: beta
status: experimental
categories: ["Services"]
---

Expand All @@ -15,8 +15,8 @@ categories: ["Services"]
import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

:::caution BETA
This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found.
:::caution EXPERIMENTAL
This component is experimental and therefore subject to change or removal outside of major version releases.
:::
Sends messages to an S2 stream.

Expand All @@ -36,7 +36,7 @@ output:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
access_token: "" # No default (required)
stream: "" # No default (required)
fencing_token: aGVsbG8gczI= # No default (optional)
```
Expand All @@ -50,7 +50,7 @@ output:
label: ""
s2:
basin: "" # No default (required)
auth_token: "" # No default (required)
access_token: "" # No default (required)
stream: "" # No default (required)
fencing_token: aGVsbG8gczI= # No default (optional)
batching:
Expand Down Expand Up @@ -112,7 +112,7 @@ output:
s2:
basin: my-favorite-basin
stream: starwars
auth_token: "${S2_AUTH_TOKEN}"
access_token: "${S2_ACCESS_TOKEN}"
```

</TabItem>
Expand All @@ -127,9 +127,9 @@ Basin name

Type: `string`

### `auth_token`
### `access_token`

Authentication token for S2 account
Access token for S2 account
:::warning Secret
This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets).
:::
Expand Down