diff --git a/go.mod b/go.mod index 73c79f479a..edad864be6 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/jackc/pgx/v4 v4.18.3 github.com/jhump/protoreflect v1.15.6 github.com/jmespath/go-jmespath v0.4.0 - github.com/klauspost/compress v1.18.1 + github.com/klauspost/compress v1.18.2 github.com/klauspost/pgzip v1.2.6 github.com/knights-analytics/hugot v0.4.3 github.com/lib/pq v1.10.9 @@ -232,9 +232,8 @@ require ( require ( github.com/orcaman/concurrent-map/v2 v2.0.1 // indirect - github.com/s2-streamstore/optr v1.1.0 // indirect - github.com/s2-streamstore/s2-sdk-go v0.6.0 - github.com/tidwall/btree v1.7.0 // indirect + github.com/s2-streamstore/s2-sdk-go v0.11.6 + github.com/tidwall/btree v1.8.1 // indirect ) require ( diff --git a/go.sum b/go.sum index 00675c78d5..d737ab8144 100644 --- a/go.sum +++ b/go.sum @@ -1464,8 +1464,8 @@ github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.14.4/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.18.1 h1:bcSGx7UbpBqMChDtsF28Lw6v/G94LPrrbMbdC3JH2co= -github.com/klauspost/compress v1.18.1/go.mod h1:ZQFFVG+MdnR0P+l6wpXgIL4NTtwiKIdBnrBd8Nrxr+0= +github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= +github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.11 h1:0OwqZRYI2rFrjS4kvkDnqJkKHdHaRnCm68/DY4OxRzU= github.com/klauspost/cpuid/v2 v2.2.11/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= @@ -1749,10 +1749,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ruudk/golang-pdf417 v0.0.0-20181029194003-1af4ab5afa58/go.mod h1:6lfFZQK844Gfx8o5WFuvpxWRwnSoipWe/p622j1v06w= github.com/ruudk/golang-pdf417 v0.0.0-20201230142125-a7e3863a1245/go.mod h1:pQAZKsJ8yyVxGRWYNEm9oFB8ieLgKFnamEyDmSA0BRk= -github.com/s2-streamstore/optr v1.1.0 h1:ExwYuwxb4Z1mVI+717xKUEDZ1y0ikFAV55jtk8XCYQk= -github.com/s2-streamstore/optr v1.1.0/go.mod h1:ujwXWMeanje1NE1aaaylBwmBkMalZ+wxFxSFOSHmJis= -github.com/s2-streamstore/s2-sdk-go v0.6.0 h1:QtCUX0EABn7i81+i7SkVfW/wU8QvLvCJFb3fpIlcBM0= -github.com/s2-streamstore/s2-sdk-go v0.6.0/go.mod h1:VAK2MYAXfT2eMq4IpaWHRuL5ZBbf5mLyJ4X8Ee+9qdY= +github.com/s2-streamstore/s2-sdk-go v0.11.6 h1:CavBN8rtr9QFhT060wdEIQ71WmRzDIJt7MAuacqhXx0= +github.com/s2-streamstore/s2-sdk-go v0.11.6/go.mod h1:1a+v2sGqU+s5neI8XwqRJz78ktStkR+mZH/JEi9HNSo= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/schollz/progressbar/v2 v2.15.0 h1:dVzHQ8fHRmtPjD3K10jT3Qgn/+H+92jhPrhmxIJfDz8= github.com/schollz/progressbar/v2 v2.15.0/go.mod h1:UdPq3prGkfQ7MOzZKlDRpYKcFqEMczbD7YmbPgpzKMI= @@ -1832,8 +1830,8 @@ github.com/tetratelabs/wazero v1.6.0 h1:z0H1iikCdP8t+q341xqepY4EWvHEw8Es7tlqiVzl github.com/tetratelabs/wazero v1.6.0/go.mod h1:0U0G41+ochRKoPKCJlh0jMg1CHkyfK8kDqiirMmKY8A= github.com/theparanoids/crypki v1.20.9 h1:t2ePWWNWAf0lhql4g//AIodZBgZTwjelvaq6wXzAVzU= github.com/theparanoids/crypki v1.20.9/go.mod h1:Ucf6AB4UP33wqNe8UiJX/vg7j9tXNlNna2tFouExgT0= -github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= -github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= +github.com/tidwall/btree v1.8.1 h1:27ehoXvm5AG/g+1VxLS1SD3vRhp/H7LuEfwNvddEdmA= +github.com/tidwall/btree v1.8.1/go.mod h1:jBbTdUWhSZClZWoDg54VnvV7/54modSOzDN7VXftj1A= github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= diff --git a/internal/impl/s2/bento.go b/internal/impl/s2/bento.go index ad5698537a..14b04531da 100644 --- a/internal/impl/s2/bento.go +++ b/internal/impl/s2/bento.go @@ -6,8 +6,8 @@ import ( ) const ( - basinField = "basin" - authTokenField = "auth_token" + basinField = "basin" + accessTokenField = "access_token" ) func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) { @@ -16,14 +16,14 @@ func newConfig(conf *service.ParsedConfig) (*s2bentobox.Config, error) { return nil, err } - authToken, err := conf.FieldString(authTokenField) + accessToken, err := conf.FieldString(accessTokenField) if err != nil { return nil, err } return &s2bentobox.Config{ - Basin: basin, - AuthToken: authToken, + Basin: basin, + AccessToken: accessToken, }, nil } diff --git a/internal/impl/s2/input.go b/internal/impl/s2/input.go index ec240539f0..7b08068488 100644 --- a/internal/impl/s2/input.go +++ b/internal/impl/s2/input.go @@ -47,13 +47,12 @@ const ( func newInputConfigSpec() *service.ConfigSpec { return service.NewConfigSpec(). - Beta(). Version("1.5.0"). Categories("Services"). Fields( service.NewStringField(basinField).Description("Basin name"), - service.NewStringField(authTokenField). - Description("Authentication token for S2 account"). + service.NewStringField(accessTokenField). + Description("Access token for S2 account"). Secret(), service.NewAnyField(streamsField). Description("Streams prefix or list of streams to subscribe to"), @@ -117,7 +116,7 @@ input: s2: basin: my-favorite-basin streams: my-favorite-prefix/ - auth_token: "${S2_AUTH_TOKEN}" + access_token: "${S2_ACCESS_TOKEN}" cache: s2_seq_num output: @@ -234,7 +233,7 @@ func (i *Input) Connect(ctx context.Context) error { func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.AckFunc, error) { i.logger.Debug("Reading batch from S2") - batch, aFn, stream, err := i.inner.ReadBatch(ctx) + records, aFn, stream, err := i.inner.ReadBatch(ctx) if err != nil { if errors.Is(err, s2bentobox.ErrInputClosed) { return nil, nil, service.ErrNotConnected @@ -243,9 +242,9 @@ func (i *Input) ReadBatch(ctx context.Context) (service.MessageBatch, service.Ac return nil, nil, err } - messages := make([]*service.Message, 0, len(batch.Records)) + messages := make([]*service.Message, 0, len(records)) - for _, record := range batch.Records { + for _, record := range records { msg := service.NewMessage(record.Body) if len(record.Headers) == 1 && len(record.Headers[0].Name) == 0 { diff --git a/internal/impl/s2/output.go b/internal/impl/s2/output.go index 0fa99c016f..2125d497b4 100644 --- a/internal/impl/s2/output.go +++ b/internal/impl/s2/output.go @@ -15,7 +15,7 @@ import ( // Setting the batch byte size max a bit conservatively since Bento does not // take metadata size into account. Moreover, the S2 metered size of a record // will be > bento message size. -const maxBatchBytes = 256 * 1024 +const maxBatchBytesConservative = 256 * 1024 var ( errInvalidBatchingByteSize = errors.New("invalid batch policy byte size") @@ -59,13 +59,12 @@ const ( func newOutputConfigSpec() *service.ConfigSpec { return service.NewConfigSpec(). - Beta(). Version("1.5.0"). Categories("Services"). Fields( service.NewStringField(basinField).Description("Basin name"), - service.NewStringField(authTokenField). - Description("Authentication token for S2 account"). + service.NewStringField(accessTokenField). + Description("Access token for S2 account"). Secret(), service.NewStringField(streamField).Description("Stream name"), service.NewStringField(fencingTokenField). @@ -84,7 +83,7 @@ root = if this.byte_size > %d { } `, s2.MaxBatchRecords, - maxBatchBytes, + maxBatchBytesConservative, ), ), service.NewOutputMaxInFlightField().Advanced(), @@ -130,7 +129,7 @@ output: s2: basin: my-favorite-basin stream: starwars - auth_token: "${S2_AUTH_TOKEN}" + access_token: "${S2_ACCESS_TOKEN}" `, ) } @@ -144,7 +143,7 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { // Set required defaults if policy.ByteSize <= 0 { - policy.ByteSize = maxBatchBytes + policy.ByteSize = maxBatchBytesConservative } if policy.Count <= 0 { @@ -159,8 +158,8 @@ func parseBatchPolicy(conf *service.ParsedConfig) (service.BatchPolicy, error) { // Validate limits - if policy.ByteSize > maxBatchBytes { - return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchBytes) + if policy.ByteSize > maxBatchBytesConservative { + return policy, fmt.Errorf("%w: must not exceed %d", errInvalidBatchingByteSize, s2.MaxBatchMeteredBytes) } if policy.Count > s2.MaxBatchRecords { @@ -186,7 +185,7 @@ func newOutputConfig(conf *service.ParsedConfig) (*s2bentobox.OutputConfig, erro return nil, err } - var fencingToken []byte + var fencingToken *string if conf.Contains(fencingTokenField) { field, err := conf.FieldString(fencingTokenField) @@ -194,17 +193,20 @@ func newOutputConfig(conf *service.ParsedConfig) (*s2bentobox.OutputConfig, erro return nil, err } - fencingToken, err = base64.StdEncoding.DecodeString(field) + // Decode base64 to get the raw token string + decoded, err := base64.StdEncoding.DecodeString(field) if err != nil { return nil, err } + decodedStr := string(decoded) + fencingToken = &decodedStr } return &s2bentobox.OutputConfig{ Config: config, Stream: stream, - MaxInFlight: maxInFlight, FencingToken: fencingToken, + MaxInFlight: maxInFlight, }, nil } @@ -261,12 +263,7 @@ func (o *Output) WriteBatch(ctx context.Context, batch service.MessageBatch) err return err } - recordBatch, leftOver := s2.NewAppendRecordBatch(records...) - if len(leftOver) > 0 { - return s2bentobox.ErrAppendRecordBatchFull - } - - if err := o.inner.WriteBatch(ctx, recordBatch); err != nil { + if err := o.inner.WriteBatch(ctx, records); err != nil { if errors.Is(err, s2bentobox.ErrOutputClosed) { return service.ErrNotConnected } diff --git a/website/docs/components/inputs/s2.md b/website/docs/components/inputs/s2.md index b9215a4a7a..9236c924bd 100644 --- a/website/docs/components/inputs/s2.md +++ b/website/docs/components/inputs/s2.md @@ -2,7 +2,7 @@ title: s2 slug: s2 type: input -status: beta +status: experimental categories: ["Services"] --- @@ -15,8 +15,8 @@ categories: ["Services"] import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -:::caution BETA -This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found. +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. ::: Consumes records from S2 streams @@ -36,7 +36,7 @@ input: label: "" s2: basin: "" # No default (required) - auth_token: "" # No default (required) + access_token: "" # No default (required) streams: null # No default (required) cache: "" # No default (required) ``` @@ -50,7 +50,7 @@ input: label: "" s2: basin: "" # No default (required) - auth_token: "" # No default (required) + access_token: "" # No default (required) streams: null # No default (required) cache: "" # No default (required) max_in_flight: 0 @@ -109,7 +109,7 @@ input: s2: basin: my-favorite-basin streams: my-favorite-prefix/ - auth_token: "${S2_AUTH_TOKEN}" + access_token: "${S2_ACCESS_TOKEN}" cache: s2_seq_num output: @@ -130,9 +130,9 @@ Basin name Type: `string` -### `auth_token` +### `access_token` -Authentication token for S2 account +Access token for S2 account :::warning Secret This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). ::: diff --git a/website/docs/components/outputs/s2.md b/website/docs/components/outputs/s2.md index 3c863586e2..2d14934bd4 100644 --- a/website/docs/components/outputs/s2.md +++ b/website/docs/components/outputs/s2.md @@ -2,7 +2,7 @@ title: s2 slug: s2 type: output -status: beta +status: experimental categories: ["Services"] --- @@ -15,8 +15,8 @@ categories: ["Services"] import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; -:::caution BETA -This component is mostly stable but breaking changes could still be made outside of major version releases if a fundamental problem with the component is found. +:::caution EXPERIMENTAL +This component is experimental and therefore subject to change or removal outside of major version releases. ::: Sends messages to an S2 stream. @@ -36,7 +36,7 @@ output: label: "" s2: basin: "" # No default (required) - auth_token: "" # No default (required) + access_token: "" # No default (required) stream: "" # No default (required) fencing_token: aGVsbG8gczI= # No default (optional) ``` @@ -50,7 +50,7 @@ output: label: "" s2: basin: "" # No default (required) - auth_token: "" # No default (required) + access_token: "" # No default (required) stream: "" # No default (required) fencing_token: aGVsbG8gczI= # No default (optional) batching: @@ -112,7 +112,7 @@ output: s2: basin: my-favorite-basin stream: starwars - auth_token: "${S2_AUTH_TOKEN}" + access_token: "${S2_ACCESS_TOKEN}" ``` @@ -127,9 +127,9 @@ Basin name Type: `string` -### `auth_token` +### `access_token` -Authentication token for S2 account +Access token for S2 account :::warning Secret This field contains sensitive information that usually shouldn't be added to a config directly, read our [secrets page for more info](/docs/configuration/secrets). :::