Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adapting to helpers in go-kit #4560

Merged
merged 1 commit into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

backendconfig "github.com/rudderlabs/rudder-server/backend-config"
configenv "github.com/rudderlabs/rudder-server/enterprise/config-env"
"github.com/rudderlabs/rudder-server/enterprise/replay"
Expand Down
8 changes: 4 additions & 4 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ type embeddedApp struct {
log logger.Logger
config struct {
enableReplay bool
processorDSLimit misc.ValueLoader[int]
routerDSLimit misc.ValueLoader[int]
batchRouterDSLimit misc.ValueLoader[int]
gatewayDSLimit misc.ValueLoader[int]
processorDSLimit config.ValueLoader[int]
routerDSLimit config.ValueLoader[int]
batchRouterDSLimit config.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
}
}

Expand Down
3 changes: 2 additions & 1 deletion app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand All @@ -33,7 +34,7 @@ type gatewayApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
gatewayDSLimit misc.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
}
}

Expand Down
8 changes: 4 additions & 4 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ type processorApp struct {
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
processorDSLimit misc.ValueLoader[int]
routerDSLimit misc.ValueLoader[int]
batchRouterDSLimit misc.ValueLoader[int]
gatewayDSLimit misc.ValueLoader[int]
processorDSLimit config.ValueLoader[int]
routerDSLimit config.ValueLoader[int]
batchRouterDSLimit config.ValueLoader[int]
gatewayDSLimit config.ValueLoader[int]
http struct {
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
Expand Down
1 change: 1 addition & 0 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app/cluster"
arc "github.com/rudderlabs/rudder-server/archiver"
Expand Down
4 changes: 2 additions & 2 deletions backend-config/backend-config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/rruntime"
"github.com/rudderlabs/rudder-server/services/controlplane/identity"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/sysUtils"
"github.com/rudderlabs/rudder-server/utils/types"
Expand All @@ -34,7 +34,7 @@ var (
// environment variables
configBackendURL string
cpRouterURL string
pollInterval misc.ValueLoader[time.Duration]
pollInterval config.ValueLoader[time.Duration]
configJSONPath string
configFromFile bool
configEnvReplacementEnabled bool
Expand Down
4 changes: 2 additions & 2 deletions backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

adminpkg "github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/pubsub"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestWaitForConfig(t *testing.T) {
defer ctrl.Finish()

pkgLogger = logger.NOP
pollInterval = misc.SingleValueLoader(time.Millisecond)
pollInterval = config.SingleValueLoader(time.Millisecond)
bc := &backendConfigImpl{initialized: false}

var done int32
Expand Down
4 changes: 3 additions & 1 deletion cmd/backupfilemigrator/file_migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strings"
"time"

"github.com/rudderlabs/rudder-go-kit/stringify"
obskit "github.com/rudderlabs/rudder-observability-kit/go/labels"

kitconfig "github.com/rudderlabs/rudder-go-kit/config"
Expand All @@ -31,6 +32,7 @@ import (

"github.com/rudderlabs/rudder-go-kit/filemanager"
kitlogger "github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/enterprise/replay/utils"
)

Expand Down Expand Up @@ -116,7 +118,7 @@ func (m *fileMigrator) convertToNewFormat(lineBytes []byte, createdAt time.Time)
j.UserID = userID
j.EventPayload = payloadBytes
j.CreatedAt = createdAt
j.MessageID = misc.GetStringifiedData(singleEvent["messageId"])
j.MessageID = stringify.Data(singleEvent["messageId"])
listOfNewEvents = append(listOfNewEvents, j)
}
return listOfNewEvents, nil
Expand Down
3 changes: 1 addition & 2 deletions enterprise/reporting/error_extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

const (
Expand Down Expand Up @@ -69,7 +68,7 @@ var lowercasedDeprecationKeywords = lo.MapKeys(deprecationKeywords, func(_ int,
type ExtractorHandle struct {
log logger.Logger
ErrorMessageKeys []string // the keys where in we may have error message
versionDeprecationThresholdScore misc.ValueLoader[int]
versionDeprecationThresholdScore config.ValueLoader[int]
}

func NewErrorDetailExtractor(log logger.Logger) *ExtractorHandle {
Expand Down
4 changes: 2 additions & 2 deletions enterprise/reporting/error_index/error_index_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
kitsync "github.com/rudderlabs/rudder-go-kit/sync"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/workerpool"
Expand All @@ -43,7 +43,7 @@ type ErrorIndexReporter struct {
update kitsync.Limiter
}

concurrency misc.ValueLoader[int]
concurrency config.ValueLoader[int]

statsFactory stats.Stats
stats struct {
Expand Down
6 changes: 3 additions & 3 deletions enterprise/reporting/error_index/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ type worker struct {
lastUploadTime time.Time

config struct {
parquetParallelWriters, parquetRowGroupSize, parquetPageSize misc.ValueLoader[int64]
parquetParallelWriters, parquetRowGroupSize, parquetPageSize config.ValueLoader[int64]
bucketName, instanceID string
payloadLimit, eventsLimit misc.ValueLoader[int64]
payloadLimit, eventsLimit config.ValueLoader[int64]
minWorkerSleep, uploadFrequency, jobsDBCommandTimeout time.Duration
jobsDBMaxRetries misc.ValueLoader[int]
jobsDBMaxRetries config.ValueLoader[int]
}
}

Expand Down
20 changes: 10 additions & 10 deletions enterprise/reporting/error_index/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import (
kitsync "github.com/rudderlabs/rudder-go-kit/sync"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/minio"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/utils/misc"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand Down Expand Up @@ -84,9 +84,9 @@ func TestWorkerWriter(t *testing.T) {
buf := bytes.NewBuffer(make([]byte, 0, 1024))

w := worker{}
w.config.parquetRowGroupSize = misc.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8))
w.config.parquetRowGroupSize = config.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = config.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = config.SingleValueLoader(int64(8))

require.NoError(t, w.encodeToParquet(buf, toEncode))

Expand Down Expand Up @@ -125,9 +125,9 @@ func TestWorkerWriter(t *testing.T) {
require.NoError(t, err)

w := worker{}
w.config.parquetRowGroupSize = misc.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8))
w.config.parquetRowGroupSize = config.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = config.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = config.SingleValueLoader(int64(8))

require.NoError(t, w.encodeToParquet(fw, toEncode))

Expand Down Expand Up @@ -713,9 +713,9 @@ func BenchmarkFileFormat(b *testing.B) {
}

w := worker{}
w.config.parquetRowGroupSize = misc.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = misc.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = misc.SingleValueLoader(int64(8))
w.config.parquetRowGroupSize = config.SingleValueLoader(512 * bytesize.MB)
w.config.parquetPageSize = config.SingleValueLoader(8 * bytesize.KB)
w.config.parquetParallelWriters = config.SingleValueLoader(int64(8))

buf := bytes.NewBuffer(make([]byte, 0, 1024))

Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/error_reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -65,9 +65,9 @@ type ErrorDetailReporter struct {

instanceID string
region string
sleepInterval misc.ValueLoader[time.Duration]
mainLoopSleepInterval misc.ValueLoader[time.Duration]
maxConcurrentRequests misc.ValueLoader[int]
sleepInterval config.ValueLoader[time.Duration]
mainLoopSleepInterval config.ValueLoader[time.Duration]
maxConcurrentRequests config.ValueLoader[int]
maxOpenConnections int

httpClient *http.Client
Expand Down
8 changes: 4 additions & 4 deletions enterprise/reporting/reporting.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"

migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
"github.com/rudderlabs/rudder-server/utils/httputil"
"github.com/rudderlabs/rudder-server/utils/misc"
. "github.com/rudderlabs/rudder-server/utils/tx" //nolint:staticcheck
"github.com/rudderlabs/rudder-server/utils/types"
)
Expand Down Expand Up @@ -61,12 +61,12 @@ type DefaultReporter struct {
instanceID string
whActionsOnly bool
region string
sleepInterval misc.ValueLoader[time.Duration]
mainLoopSleepInterval misc.ValueLoader[time.Duration]
sleepInterval config.ValueLoader[time.Duration]
mainLoopSleepInterval config.ValueLoader[time.Duration]
dbQueryTimeout *config.Reloadable[time.Duration]
sourcesWithEventNameTrackingDisabled []string
maxOpenConnections int
maxConcurrentRequests misc.ValueLoader[int]
maxConcurrentRequests config.ValueLoader[int]

getMinReportedAtQueryTime stats.Measurement
getReportsQueryTime stats.Measurement
Expand Down
2 changes: 1 addition & 1 deletion enterprise/suppress-user/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (m *Factory) retryIndefinitely(ctx context.Context, f func() error, wait ti
}
}

func (m *Factory) newSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), maxSeedWaitTime time.Duration, identity identity.Identifier, pollInterval misc.ValueLoader[time.Duration]) (*Syncer, Repository, error) {
func (m *Factory) newSyncerWithBadgerRepo(repoPath string, seederSource func() (io.ReadCloser, error), maxSeedWaitTime time.Duration, identity identity.Identifier, pollInterval config.ValueLoader[time.Duration]) (*Syncer, Repository, error) {
repo, err := NewBadgerRepository(
repoPath,
m.Log,
Expand Down
4 changes: 3 additions & 1 deletion gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

kithelper "github.com/rudderlabs/rudder-go-kit/testhelper"
kituuid "github.com/rudderlabs/rudder-go-kit/uuid"

"github.com/golang/mock/gomock"
"github.com/google/uuid"
Expand All @@ -30,6 +31,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/memstats"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app"
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
Expand Down Expand Up @@ -219,7 +221,7 @@ var _ = Describe("Reconstructing JSON for ServerSide SDK", func() {
{"anonymousId":"anon_id_1","event":"event_1_3"}
]}`
response, payloadError := getUsersPayload([]byte(testValidBody))
key, err := misc.GetMD5UUID(inputKey)
key, err := kituuid.GetMD5UUID(inputKey)
Expect(string(response[key.String()])).To(Equal(value))
Expect(err).To(BeNil())
Expect(payloadError).To(BeNil())
Expand Down
22 changes: 13 additions & 9 deletions gateway/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"

"github.com/rudderlabs/rudder-go-kit/sanitize"
"github.com/rudderlabs/rudder-go-kit/stringify"
kituuid "github.com/rudderlabs/rudder-go-kit/uuid"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
Expand Down Expand Up @@ -97,18 +101,18 @@ type Handle struct {
conf struct { // configuration parameters
webPort, maxUserWebRequestWorkerProcess, maxDBWriterProcess int
maxUserWebRequestBatchSize, maxDBBatchSize, maxHeaderBytes, maxConcurrentRequests int
userWebRequestBatchTimeout, dbBatchWriteTimeout misc.ValueLoader[time.Duration]
userWebRequestBatchTimeout, dbBatchWriteTimeout config.ValueLoader[time.Duration]

maxReqSize misc.ValueLoader[int]
enableRateLimit misc.ValueLoader[bool]
maxReqSize config.ValueLoader[int]
enableRateLimit config.ValueLoader[bool]
enableSuppressUserFeature bool
diagnosisTickerTime time.Duration
ReadTimeout time.Duration
ReadHeaderTimeout time.Duration
WriteTimeout time.Duration
IdleTimeout time.Duration
allowReqsWithoutUserIDAndAnonymousID misc.ValueLoader[bool]
gwAllowPartialWriteWithErrors misc.ValueLoader[bool]
allowReqsWithoutUserIDAndAnonymousID config.ValueLoader[bool]
gwAllowPartialWriteWithErrors config.ValueLoader[bool]
}

// additional internal http handlers
Expand Down Expand Up @@ -280,7 +284,7 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,

fillMessageID := func(event map[string]interface{}) {
messageID, _ := event["messageId"].(string)
messageID = strings.TrimSpace(misc.SanitizeUnicode(messageID))
messageID = strings.TrimSpace(sanitize.Unicode(messageID))
if messageID == "" {
event["messageId"] = uuid.New().String()
} else {
Expand Down Expand Up @@ -330,8 +334,8 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,
return
}

anonIDFromReq := strings.TrimSpace(misc.SanitizeUnicode(misc.GetStringifiedData(toSet["anonymousId"])))
userIDFromReq := strings.TrimSpace(misc.SanitizeUnicode(misc.GetStringifiedData(toSet["userId"])))
anonIDFromReq := strings.TrimSpace(sanitize.Unicode(stringify.Data(toSet["anonymousId"])))
userIDFromReq := strings.TrimSpace(sanitize.Unicode(stringify.Data(toSet["userId"])))
eventTypeFromReq, _ := misc.MapLookup(
toSet,
"type",
Expand Down Expand Up @@ -388,7 +392,7 @@ func (gw *Handle) getJobDataFromRequest(req *webRequestT) (jobData *jobFromReq,

// hashing combination of userIDFromReq + anonIDFromReq, using colon as a delimiter
var rudderId uuid.UUID
rudderId, err = misc.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
rudderId, err = kituuid.GetMD5UUID(userIDFromReq + ":" + anonIDFromReq)
if err != nil {
err = errors.New(response.NonIdentifiableRequest)
return
Expand Down
Loading
Loading