Skip to content

Commit

Permalink
Add concurrencyThread setting for SQS ingestion. (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
atlas-comstock authored Jun 16, 2022
1 parent c3e8f5a commit dccb5b7
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
1 change: 1 addition & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type S3SQS struct {
WaitTimeout int64 `json:"waitTimeout,omitempty" yaml:"waitTimeout" env:"WAITTIMEOUT"` // in seconds
VisibilityTimeout int64 `json:"visibilityTimeout,omitempty" yaml:"visibilityTimeout" env:"VISIBILITYTIMEOUT"` // in seconds
Retries int `json:"retries" yaml:"retries" env:"RETRIES"`
ConcurrencyThread int `json:"concurrencyThread" yaml:"concurrencyThread" env:"concurrencyThread"`
}

// Presto represents the Presto configuration
Expand Down
6 changes: 5 additions & 1 deletion internal/ingress/s3sqs/s3sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/url"
"runtime"
"time"
Expand Down Expand Up @@ -55,7 +56,10 @@ func New(conf *config.S3SQS, region string, monitor monitor.Monitor) (*Ingress,
if err != nil {
return nil, err
}

if conf.ConcurrencyThread != 0 {
concurrency = int64(runtime.NumCPU() * conf.ConcurrencyThread)
log.Printf("new sqs: use config ConcurrencyThread(%d) to set the threads per CPU, total threads are %d", conf.ConcurrencyThread, concurrency)
}
return NewWith(reader, loader, monitor), nil
}

Expand Down
6 changes: 3 additions & 3 deletions internal/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (s *Storage) Open(dir string, options config.Badger) error {
case config.BadgerIngestion:
opts = opts.
WithBaseLevelSize(204800000).
WithMaxLevels(3).
WithMaxLevels(7).
WithSyncWrites(false)
}

Expand All @@ -96,15 +96,15 @@ func (s *Storage) Open(dir string, options config.Badger) error {
}

// max size of lsm tree in bytes after which data is propagated to disk.
// The default is 64 MB.
// The default is 2 MB.
if options.MaxTableSize != nil {
opts = opts.WithBaseTableSize(*options.MaxTableSize)
}

// ValueLogMaxEntries sets the maximum number of entries a value log file can hold approximately.
// A actual size limit of a value log file is the minimum of ValueLogFileSize and
// ValueLogMaxEntries.
// The default value of ValueLogMaxEntries is 1 Million
// The default value of ValueLogMaxEntries is one million (1000000)
if options.ValueLogMaxEntries != nil {
opts = opts.WithValueLogMaxEntries(*options.ValueLogMaxEntries)
}
Expand Down

0 comments on commit dccb5b7

Please sign in to comment.