diff --git a/event-schema/counter.go b/event-schema/counter.go deleted file mode 100644 index a62e68e328..0000000000 --- a/event-schema/counter.go +++ /dev/null @@ -1,76 +0,0 @@ -package event_schema - -import ( - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-server/event-schema/countish" -) - -type CounterTypeT string - -const ( - LossyCount CounterTypeT = "LossyCount" - StickySampler CounterTypeT = "StickySampler" -) - -var ( - defaultCounterType CounterTypeT - counterSupport, counterErrorTolerance, counterFailureProb, counterThreshold float64 - counterTypeStr string -) - -type FrequencyCounter struct { - Name string - CounterType CounterTypeT - Counter countish.LossyCounter -} - -func (fc *FrequencyCounter) getCounter() countish.Counter { - return &fc.Counter -} - -func (fc *FrequencyCounter) setCounter(counterType CounterTypeT, counter *countish.LossyCounter) { - fc.Counter = *counter - fc.CounterType = counterType -} - -func Init() { - config.RegisterStringConfigVariable("LossyCount", &counterTypeStr, false, "EventSchemas.counterType") - // Output every elem has appeared at least (N * support) times - config.RegisterFloat64ConfigVariable(0.01, &counterSupport, false, "EventSchemas.counterSupport") - // We can start with support/10 - config.RegisterFloat64ConfigVariable(0.001, &counterErrorTolerance, false, "EventSchemas.counterErrorTolerance") - // - config.RegisterFloat64ConfigVariable(0.01, &counterFailureProb, false, "EventSchemas.counterFailureProb") - - // Check this? - config.RegisterFloat64ConfigVariable(0.01, &counterThreshold, false, "EventSchemas.counterThreshold") - - if counterTypeStr == string(StickySampler) { - defaultCounterType = StickySampler - } else { - defaultCounterType = LossyCount - } -} - -func NewFrequencyCounter(name string) *FrequencyCounter { - fc := FrequencyCounter{} - fc.Name = name - counter := countish.NewLossyCounter(counterSupport, counterErrorTolerance) - fc.setCounter(defaultCounterType, counter) - return &fc -} - -func NewPeristedFrequencyCounter(persistedFc *FrequencyCounter) *FrequencyCounter { - fc := FrequencyCounter{} - fc.Name = persistedFc.Name - fc.setCounter(LossyCount, &persistedFc.Counter) - return &fc -} - -func (fc *FrequencyCounter) Observe(key *string) { - fc.getCounter().Observe(*key) -} - -func (fc *FrequencyCounter) ItemsAboveThreshold() []countish.Entry { - return fc.getCounter().ItemsAboveThreshold(counterThreshold) -} diff --git a/event-schema/counter_manager.go b/event-schema/counter_manager.go deleted file mode 100644 index 5702557106..0000000000 --- a/event-schema/counter_manager.go +++ /dev/null @@ -1,115 +0,0 @@ -package event_schema - -// schemaHash -> Key -> FrequencyCounter -var countersCache map[string]map[string]*FrequencyCounter - -type CounterItem struct { - Value string - Frequency float64 -} - -func init() { - if countersCache == nil { - countersCache = make(map[string]map[string]*FrequencyCounter) - } -} - -// populateFrequencyCountersBounded is responsible for capturing the frequency counters which -// are available in the db and store them in memory but in a bounded manner. -func populateFrequencyCounters(schemaHash string, frequencyCounters []*FrequencyCounter, bound int) { - frequencyCountersMap := make(map[string]*FrequencyCounter) - for idx, fc := range frequencyCounters { - // If count exceeds for a particular schema hash, break - // the loop - if idx >= bound { - break - } - - frequencyCountersMap[fc.Name] = NewPeristedFrequencyCounter(fc) - } - countersCache[schemaHash] = frequencyCountersMap -} - -func getAllFrequencyCounters(schemaHash string) []*FrequencyCounter { - schemaVersionCounters, ok := countersCache[schemaHash] - if !ok { - return []*FrequencyCounter{} - } - - frequencyCounters := make([]*FrequencyCounter, 0, len(schemaVersionCounters)) - - for _, v := range schemaVersionCounters { - frequencyCounters = append(frequencyCounters, v) - } - return frequencyCounters -} - -// pruneFrequencyCounters brings the frequency counters back to desired bound. -func pruneFrequencyCounters(schemaHash string, bound int) { - countersMap := countersCache[schemaHash] - diff := bound - len(countersMap) - - if diff >= 0 { - return - } - - toDelete := -1 * diff - for k := range countersMap { - if toDelete > 0 { - delete(countersMap, k) - toDelete-- - - continue - } - - break - } -} - -// getFrequencyCounter simply returns frequency counter for flattened -// event key. It creates a new fc in case the key doesn't exist in map. -func getFrequencyCounter(schemaHash, key string, bound int) *FrequencyCounter { - schemaVersionCounters, ok := countersCache[schemaHash] - if !ok { - schemaVersionCounters = make(map[string]*FrequencyCounter) - countersCache[schemaHash] = schemaVersionCounters - } - - // Here we add a new frequency counter for schemaVersionCounter - frequencyCounter, ok := schemaVersionCounters[key] - if !ok { - if len(schemaVersionCounters) >= bound { - return nil - } - - frequencyCounter = NewFrequencyCounter(key) - schemaVersionCounters[key] = frequencyCounter - } - - return frequencyCounter -} - -func getSchemaVersionCounters(schemaHash string) map[string][]*CounterItem { - schemaVersionCounters := countersCache[schemaHash] - counters := make(map[string][]*CounterItem) - - for key, fc := range schemaVersionCounters { - - entries := fc.ItemsAboveThreshold() - counterItems := make([]*CounterItem, 0, len(entries)) - for _, entry := range entries { - - freq := entry.Frequency - // Capping the freq to 1 - if freq > 1 { - freq = 1.0 - } - counterItems = append(counterItems, &CounterItem{Value: entry.Key, Frequency: freq}) - } - - if len(counterItems) > 0 { - counters[key] = counterItems - } - } - return counters -} diff --git a/event-schema/counter_manager_test.go b/event-schema/counter_manager_test.go deleted file mode 100644 index 59dc4cc807..0000000000 --- a/event-schema/counter_manager_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package event_schema - -import ( - "testing" - - "github.com/google/uuid" - "github.com/stretchr/testify/require" -) - -func TestPopulateFrequencyCounterBounded(t *testing.T) { - t.Parallel() - - hash := "my-unique-schema-hash" - counters := []*FrequencyCounter{ - {Name: "prop1"}, - {Name: "prop2"}, - } - populateFrequencyCounters(hash, counters, 1) - - // only one frequency counter should be loaded in the cache. - loadedCounters := countersCache[hash] - require.Equal(t, 1, len(loadedCounters), "loaded frequency counters into memory should be equal to 1") -} - -func TestGetFreqCounterWhenLimitBreached(t *testing.T) { - t.Parallel() - - hash := uuid.New() - countersCache[hash.String()] = map[string]*FrequencyCounter{ - "key1": {Name: "key1"}, - "key2": {Name: "key2"}, - } - - // Updated frequencyCounterLimit bound - frequencyCounterLimit := 2 - // This should ideally result in a new key being added to the countersCache - // but since the bound is already reached, it simply returns nil for the freq counter - fq := getFrequencyCounter(hash.String(), "new-key", frequencyCounterLimit) - require.Nil(t, fq) - - // FrequencyCounter against the original key, - // should still be returned. - fq = getFrequencyCounter(hash.String(), "key1", frequencyCounterLimit) - require.NotNil(t, fq) - require.Equal(t, "key1", fq.Name) - - // Create room for the new key - fq = getFrequencyCounter(hash.String(), "new-key-1", 3) - require.NotNil(t, fq) - require.Equal(t, "new-key-1", fq.Name) -} diff --git a/event-schema/countish/README.org b/event-schema/countish/README.org deleted file mode 100644 index 175f723e0e..0000000000 --- a/event-schema/countish/README.org +++ /dev/null @@ -1,53 +0,0 @@ -* Approximate frequency counts over data streams for Go - -Countish implements two approximate counting algorithms outlined in "Approximate Frequency Counts over Data Streams". - -http://www.vldb.org/conf/2002/S10P03.pdf - - -** Use cases - -Have you ever needed to do something like calculate the top -URLs or top ips from an infinite stream? This package provides probabalistic -frequency counters, with accuracy guarantees and low memory usage. - -countish provides an extremely simple interface consisting of an "Observe" method and -a "ItemsAboveThreshold" method. - - - -Example: - -#+BEGIN_SRC bash :exports both -cat urls.txt | go run ./cmd/countish/main.go -threshold .3 -#+END_SRC - -#+RESULTS: -: 0.428671 / - -3 counting implementations are provided. - -1) Naive: exact counts are held in a map -2) Lossy: corresponding to "lossy counting" -3) Sticky: corresponding to "sticky sampling" - -** Example: - -#+BEGIN_SRC go :imports '("github.com/shanemhansen/countish" "fmt") :exports both - counter := countish.NewLossyCounter(.01, .01) - for i:=0;i<100;i++ { - counter.Observe("value") - } - counter.Observe("another value") - // print all items which *might* occur more than 90% of the time, - // along with their estimated frequency - entries := counter.ItemsAboveThreshold(.9) - fmt.Println(entries) -#+END_SRC - -#+RESULTS: -: [{value 1.00009900990099}] - -** TODO examples showing memory usage comparisons - - diff --git a/event-schema/countish/entry.go b/event-schema/countish/entry.go deleted file mode 100644 index 94ae33350e..0000000000 --- a/event-schema/countish/entry.go +++ /dev/null @@ -1,6 +0,0 @@ -package countish - -type Entry struct { - Key string - Frequency float64 -} diff --git a/event-schema/countish/lossy.go b/event-schema/countish/lossy.go deleted file mode 100644 index 1ed70d6567..0000000000 --- a/event-schema/countish/lossy.go +++ /dev/null @@ -1,68 +0,0 @@ -package countish - -import ( - "math" -) - -type FDeltaPair struct { - F float64 - Delta float64 -} - -type LossyCounter struct { - Support float64 - ErrorTolerance float64 - D map[string]FDeltaPair - N uint64 - BucketWidth uint64 -} - -func NewLossyCounter(support, errorTolerance float64) *LossyCounter { - return &LossyCounter{ - Support: support, - ErrorTolerance: errorTolerance, - D: make(map[string]FDeltaPair), - BucketWidth: uint64(math.Ceil(1 / errorTolerance)), - N: 0, - } -} - -func (lc *LossyCounter) prune(bucket uint64) { - fbucket := float64(bucket) - for key, value := range lc.D { - if value.F+value.Delta <= fbucket { - delete(lc.D, key) - } - } -} - -// ItemsAboveThreshold returns a list of items that occur more than threshold, along -// with their frequencies. threshold is in the range [0,1] -func (lc *LossyCounter) ItemsAboveThreshold(threshold float64) []Entry { - var results []Entry - fN := float64(lc.N) - for key, val := range lc.D { - if val.F >= (threshold-lc.ErrorTolerance)*fN { - results = append(results, Entry{Key: key, Frequency: val.F/fN + lc.Support}) - } - } - return results -} - -// Observe records a new sample -func (lc *LossyCounter) Observe(key string) { - lc.N++ - bucket := lc.N / lc.BucketWidth - val, exists := lc.D[key] - if exists { - val.F++ - } else { - // reuse 0 val from lookup. - val.F = 1 - val.Delta = float64(bucket - 1) // this doesn't make much sense - } - lc.D[key] = val - if lc.N%lc.BucketWidth == 0 { - lc.prune(bucket) - } -} diff --git a/event-schema/countish/naive.go b/event-schema/countish/naive.go deleted file mode 100644 index 8b214b3416..0000000000 --- a/event-schema/countish/naive.go +++ /dev/null @@ -1,33 +0,0 @@ -package countish - -type Counter interface { - Observe(string) - ItemsAboveThreshold(float64) []Entry -} - -type NaiveSampler struct { - vals map[string]uint64 - N uint64 -} - -func NewNaiveSampler() *NaiveSampler { - return &NaiveSampler{ - vals: make(map[string]uint64), - } -} - -func (ns *NaiveSampler) Observe(key string) { - ns.vals[key]++ - ns.N++ -} - -func (ns *NaiveSampler) ItemsAboveThreshold(val float64) []Entry { - count := uint64(val * float64(ns.N)) - var entries []Entry - for key, val := range ns.vals { - if val >= count { - entries = append(entries, Entry{Key: key, Frequency: float64(val) / float64(ns.N)}) - } - } - return entries -} diff --git a/event-schema/countish/sticky.go b/event-schema/countish/sticky.go deleted file mode 100644 index 9e8f1649c1..0000000000 --- a/event-schema/countish/sticky.go +++ /dev/null @@ -1,93 +0,0 @@ -package countish - -import ( - "math" - "math/rand" -) - -var ( - Rand = rand.Float64 - RandCoin = rand.Int31n -) - -type StickySampler struct { - ErrorTolerance float64 - Support float64 - S map[string]float64 - R float64 - FailureProb float64 - N float64 - T float64 - RequiredSamples int -} - -func NewSampler(Support, ErrorTolerance, FailureProb float64) *StickySampler { - twoT := 2 / ErrorTolerance * math.Log(1/(Support*FailureProb)) - return &StickySampler{ - ErrorTolerance: ErrorTolerance, - Support: Support, - R: 1, - FailureProb: FailureProb, - T: twoT, - RequiredSamples: int(twoT), - S: make(map[string]float64), - } -} - -const successful = 0 - -func (s *StickySampler) prune() { - for key, val := range s.S { - // repeatedly toss coin - // until coin toss is successful. - // todo this can probably be derived - // by looking at how close to 0 - // a number in [0, 1) is. - for { - if RandCoin(2) == successful { - break - } - // diminish by one for every - // unsuccessful outcome - val-- - // delete if needed - if val <= 0 { - delete(s.S, key) - } else { - s.S[key] = val - } - - } - } -} - -// ItemsAboveThreshold returns a list of items that occur more than threshold, along -// with their frequencies. threshold is in the range [0,1] -func (s *StickySampler) ItemsAboveThreshold(threshold float64) []Entry { - var results []Entry - for key, f := range s.S { - if f >= (threshold-s.ErrorTolerance)*s.N { - results = append(results, Entry{Key: key, Frequency: f/s.N + s.Support}) - } - } - return results -} - -// Observe records a new sample -func (s *StickySampler) Observe(key string) { - s.N++ - count := s.N - if count > s.T { - s.T *= 2 - s.R *= 2 - s.prune() - } - if _, exists := s.S[key]; !exists { - // determine if value should be sampled - shouldSample := Rand() <= 1/s.R - if !shouldSample { - return - } - } - s.S[key]++ -} diff --git a/event-schema/event_schema.go b/event-schema/event_schema.go deleted file mode 100644 index 646c288edc..0000000000 --- a/event-schema/event_schema.go +++ /dev/null @@ -1,1171 +0,0 @@ -// Event schemas uses countish algorithm by https://github.com/shanemhansen/countish - -/* - * -Table: event_models - -| id | uuid | write_key | event_type | event_model_identifier | created_at | -| --- | ------ | --------- | ---------- | ---------------------- | ----------------- | -| 1 | uuid-1 | ksuid-1 | track | logged_in | 01, Jan 12: 00 PM | -| 2 | uuid-2 | ksuid-1 | track | signed_up | 01, Jan 12: 00 PM | -| 3 | uuid-3 | ksuid-1 | page | Home Page | 01, Jan 12: 00 PM | -| 4 | uuid-4 | ksuid-2 | identify | | 01, Jan 12: 00 PM | - - -Table: schema_versions - -| id | uuid | event_model_id | schema_hash | schema | metadata | first_seen | last_seen | -| --- | ------ | -------------- | ----------- | ------------------------------- | -------- | ----------------- | ------------------ | -| 1 | uuid-9 | uuid-1 | hash-1 | {"a": "string", "b": "float64"} | {} | 01, Jan 12: 00 PM | 01, June 12: 00 PM | -| 2 | uuid-8 | uuid-2 | hash-2 | {"a": "string", "b": "string"} | {} | 01, Jan 12: 00 PM | 01, June 12: 00 PM | -| 3 | uuid-7 | uuid-3 | hash-3 | {"a": "string", "c": "float64"} | {} | 01, Jan 12: 00 PM | 01, June 12: 00 PM | -| 4 | uuid-6 | uuid-2 | hash-1 | {"a": "string", "b": "float64"} | {} | 01, Jan 12: 00 PM | 01, June 12: 00 PM | - -*/ - -package event_schema - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "reflect" - "sort" - "strings" - "sync" - "time" - - "github.com/google/uuid" - "github.com/jeremywohl/flatten" - "github.com/lib/pq" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/rruntime" - "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/utils/timeutil" -) - -func init() { - // Following data structures store events and schemas since last flush - updatedEventModels = make(map[string]*EventModelT) - updatedSchemaVersions = make(map[string]*SchemaVersionT) - offloadedEventModels = make(map[string]map[string]*OffloadedModelT) - offloadedSchemaVersions = make(map[string]map[string]*OffloadedSchemaVersionT) - archivedEventModels = make(map[string]map[string]*OffloadedModelT) - archivedSchemaVersions = make(map[string]map[string]*OffloadedSchemaVersionT) -} - -// EventModelT is a struct that represents EVENT_MODELS_TABLE -type EventModelT struct { - ID int - UUID string `json:"EventID"` - WriteKey string `json:"WriteKey"` - EventType string `json:"EventType"` - EventIdentifier string `json:"EventIdentifier"` - CreatedAt time.Time - Schema json.RawMessage - Metadata json.RawMessage `json:"-"` - PrivateData json.RawMessage `json:"-"` - LastSeen time.Time - reservoirSample *ReservoirSample - TotalCount int64 - Archived bool -} - -// SchemaVersionT is a struct that represents SCHEMA_VERSIONS_TABLE -type SchemaVersionT struct { - ID int64 - UUID string `json:"VersionID"` - SchemaHash string `json:"-"` - EventModelID string - Schema json.RawMessage - Metadata json.RawMessage `json:"-"` - PrivateData json.RawMessage `json:"-"` - FirstSeen time.Time - LastSeen time.Time - reservoirSample *ReservoirSample - TotalCount int64 - Archived bool -} - -type MetaDataT struct { - SampledEvents []interface{} - TotalCount int64 - Counters map[string][]*CounterItem `json:"FrequentValues"` -} - -type PrivateDataT struct { - FrequencyCounters []*FrequencyCounter -} - -type ( - WriteKey string - EventType string - EventIdentifier string -) - -// EventModelMapT : to EventModel Mapping -type EventModelMapT map[WriteKey]map[EventType]map[EventIdentifier]*EventModelT - -// SchemaVersionMapT : to SchemaVersion Mapping -type SchemaVersionMapT map[string]map[string]*SchemaVersionT - -// EventSchemaManagerT handles all event-schemas related features -type EventSchemaManagerT struct { - dbHandle *sql.DB - eventModelMap EventModelMapT - schemaVersionMap SchemaVersionMapT - eventModelLock sync.RWMutex - schemaVersionLock sync.RWMutex - disableInMemoryCache bool -} - -type OffloadedModelT struct { - UUID string - LastSeen time.Time - WriteKey string - EventType string - EventIdentifier string -} - -type OffloadedSchemaVersionT struct { - UUID string - EventModelID string - LastSeen time.Time - SchemaHash string -} - -var ( - flushInterval time.Duration - adminUser string - adminPassword string - reservoirSampleSize int - eventSchemaChannel chan *GatewayEventBatchT - updatedEventModels map[string]*EventModelT - updatedSchemaVersions map[string]*SchemaVersionT - offloadedEventModels map[string]map[string]*OffloadedModelT - offloadedSchemaVersions map[string]map[string]*OffloadedSchemaVersionT - archivedEventModels map[string]map[string]*OffloadedModelT - archivedSchemaVersions map[string]map[string]*OffloadedSchemaVersionT - toDeleteEventModelIDs []string - toDeleteSchemaVersionIDs []string - pkgLogger logger.Logger - noOfWorkers int - shouldCaptureNilAsUnknowns bool - eventModelLimit int - frequencyCounterLimit int - schemaVersionPerEventModelLimit int - offloadLoopInterval time.Duration - offloadThreshold time.Duration - areEventSchemasPopulated bool -) - -const ( - EVENT_MODELS_TABLE = "event_models" - SCHEMA_VERSIONS_TABLE = "schema_versions" -) - -// GatewayEventBatchT : Type sent from gateway -type GatewayEventBatchT struct { - writeKey string - eventBatch string -} - -// EventT : Generic type for singular event -type EventT map[string]interface{} - -// EventPayloadT : Generic type for gateway event payload -type EventPayloadT struct { - WriteKey string - ReceivedAt string - Batch []EventT -} - -func loadConfig() { - adminUser = config.GetString("RUDDER_ADMIN_USER", "rudder") - adminPassword = config.GetString("RUDDER_ADMIN_PASSWORD", "rudderstack") - noOfWorkers = config.GetInt("EventSchemas.noOfWorkers", 128) - config.RegisterDurationConfigVariable(240, &flushInterval, true, time.Second, []string{"EventSchemas.syncInterval", "EventSchemas.syncIntervalInS"}...) - - config.RegisterIntConfigVariable(5, &reservoirSampleSize, true, 1, "EventSchemas.sampleEventsSize") - config.RegisterIntConfigVariable(200, &eventModelLimit, true, 1, "EventSchemas.eventModelLimit") - config.RegisterIntConfigVariable(2000, &frequencyCounterLimit, true, 1, "EventSchemas.frequencyCounterLimit") - config.RegisterIntConfigVariable(20, &schemaVersionPerEventModelLimit, true, 1, "EventSchemas.schemaVersionPerEventModelLimit") - config.RegisterBoolConfigVariable(false, &shouldCaptureNilAsUnknowns, true, "EventSchemas.captureUnknowns") - config.RegisterDurationConfigVariable(60, &offloadLoopInterval, true, time.Second, []string{"EventSchemas.offloadLoopInterval"}...) - config.RegisterDurationConfigVariable(1800, &offloadThreshold, true, time.Second, []string{"EventSchemas.offloadThreshold"}...) - - if adminPassword == "rudderstack" { - pkgLogger.Warn("[EventSchemas] You are using default password. Please change it by setting env variable RUDDER_ADMIN_PASSWORD") - } -} - -func Init2() { - pkgLogger = logger.NewLogger().Child("event-schema") - loadConfig() -} - -// RecordEventSchema : Records event schema for every event in the batch -func (*EventSchemaManagerT) RecordEventSchema(writeKey, eventBatch string) bool { - select { - case eventSchemaChannel <- &GatewayEventBatchT{writeKey, eventBatch}: - default: - stats.Default.NewTaggedStat("dropped_events_count", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": writeKey}).Increment() - } - return true -} - -func (manager *EventSchemaManagerT) updateEventModelCache(eventModel *EventModelT, toCreateOrUpdate bool) { - eventModelID := eventModel.UUID - writeKey := eventModel.WriteKey - eventType := eventModel.EventType - eventIdentifier := eventModel.EventIdentifier - - _, ok := manager.eventModelMap[WriteKey(writeKey)] - if !ok { - manager.eventModelMap[WriteKey(writeKey)] = make(map[EventType]map[EventIdentifier]*EventModelT) - } - _, ok = manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)] - if !ok { - manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)] = make(map[EventIdentifier]*EventModelT) - } - manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)][EventIdentifier(eventIdentifier)] = eventModel - - if toCreateOrUpdate { - updatedEventModels[eventModelID] = eventModel - } -} - -func (manager *EventSchemaManagerT) deleteFromEventModelCache(eventModel *EventModelT) { - writeKey := eventModel.WriteKey - eventType := eventModel.EventType - eventIdentifier := eventModel.EventIdentifier - - delete(updatedEventModels, eventModel.UUID) - delete(offloadedEventModels[eventModel.WriteKey], eventTypeIdentifier(eventType, eventIdentifier)) - delete(manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)], EventIdentifier(eventIdentifier)) - delete(countersCache, eventModel.UUID) -} - -func (manager *EventSchemaManagerT) deleteFromSchemaVersionCache(schemaVersion *SchemaVersionT) { - eventModelID := schemaVersion.EventModelID - schemaHash := schemaVersion.SchemaHash - - delete(updatedSchemaVersions, schemaVersion.UUID) - delete(offloadedSchemaVersions[eventModelID], schemaHash) - delete(manager.schemaVersionMap[eventModelID], schemaHash) - delete(countersCache, schemaHash) -} - -func (manager *EventSchemaManagerT) updateSchemaVersionCache(schemaVersion *SchemaVersionT, toCreateOrUpdate bool) { - eventModelID := schemaVersion.EventModelID - schemaHash := schemaVersion.SchemaHash - - _, ok := manager.schemaVersionMap[eventModelID] - if !ok { - manager.schemaVersionMap[eventModelID] = make(map[string]*SchemaVersionT) - } - manager.schemaVersionMap[eventModelID][schemaHash] = schemaVersion - - if toCreateOrUpdate { - updatedSchemaVersions[schemaVersion.UUID] = schemaVersion - } -} - -/* - * - -| Event Type | event_type | event_model_identfier | -| ---------- | ---------- | --------------------- | -| track | track | event["event"] | -| page | page | event["name"] | -| screen | screen | event["name"] | -| identify | identify | "" | -| alias | alias | "" | -| group | group | "" | -* -* All event types and schema versions are generated by grouping according to the table above. -* Eg: -* will always be of same event_model. Different payloads will result in different schema_versions -* will always be of same event_model. Different payloads will result in different schema_versions -* will always be of same event_model. Different payloads will result in different schema_versions -* There will be only identify event_model per source. Schema versions can change with different traits. -* -* This function is goroutine-safe. We can scale multiple go-routines calling this function, -* but since this method does mostly in-memory operations and has locks, there might not be much performance improvement. -*/ -func (manager *EventSchemaManagerT) handleEvent(writeKey string, event EventT) { - eventType, ok := event["type"].(string) - if !ok { - pkgLogger.Debugf("[EventSchemas] Invalid or no eventType") - return - } - eventIdentifier := "" - if eventType == "track" { - eventIdentifier, ok = event["event"].(string) - } - if !ok { - pkgLogger.Debugf("[EventSchemas] Invalid event idenitfier") - return - } - - processingTimer := stats.Default.NewTaggedStat("archive_event_model", stats.TimerType, stats.Tags{"module": "event_schemas", "writeKey": writeKey, "eventIdentifier": eventIdentifier}) - defer processingTimer.RecordDuration()() - - // TODO: Create locks on every event_model to improve scaling this - manager.eventModelLock.Lock() - manager.schemaVersionLock.Lock() - defer manager.eventModelLock.Unlock() - defer manager.schemaVersionLock.Unlock() - totalEventModels := 0 - for _, v := range manager.eventModelMap[WriteKey(writeKey)] { - totalEventModels += len(v) - } - totalEventModels += len(offloadedEventModels[writeKey]) - eventModel, ok := manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)][EventIdentifier(eventIdentifier)] - if !ok { - // check in offloaded models - var wasOffloaded bool - var offloadedModel *OffloadedModelT - if byEventTypeIdentifier, ok := offloadedEventModels[writeKey]; ok { - offloadedModel, wasOffloaded = byEventTypeIdentifier[eventTypeIdentifier(eventType, eventIdentifier)] - } - - archiveOldestLastSeenModel := func() { - oldestModel := manager.oldestSeenModel(writeKey) - toDeleteEventModelIDs = append(toDeleteEventModelIDs, oldestModel.UUID) - manager.deleteFromEventModelCache(oldestModel) - if _, ok := archivedEventModels[oldestModel.WriteKey]; !ok { - archivedEventModels[oldestModel.WriteKey] = make(map[string]*OffloadedModelT) - } - archivedEventModels[oldestModel.WriteKey][eventTypeIdentifier(oldestModel.EventType, oldestModel.EventIdentifier)] = &OffloadedModelT{UUID: oldestModel.UUID, LastSeen: oldestModel.LastSeen, WriteKey: oldestModel.WriteKey, EventType: oldestModel.EventType, EventIdentifier: oldestModel.EventIdentifier} - stats.Default.NewTaggedStat("archive_event_model", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": oldestModel.WriteKey, "eventIdentifier": oldestModel.EventIdentifier}).Increment() - } - - // check in archived models - var wasArchived bool - var archivedModel *OffloadedModelT - if byEventTypeIdentifier, ok := archivedEventModels[writeKey]; ok { - archivedModel, wasArchived = byEventTypeIdentifier[eventTypeIdentifier(eventType, eventIdentifier)] - } - - if wasOffloaded { - // TODO: Handling of err needs to be done as this might cause a panic in statement below ! - manager.reloadModel(offloadedModel) - eventModel, ok = manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)][EventIdentifier(eventIdentifier)] - if !ok { - pkgLogger.Errorf(`[EventSchemas] Failed to reload event +%v, writeKey: %s, eventType: %s, eventIdentifier: %s`, offloadedModel.UUID, writeKey, eventType, eventIdentifier) - return - } - stats.Default.NewTaggedStat("reload_offloaded_event_model", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": eventModel.WriteKey, "eventIdentifier": eventModel.EventIdentifier}).Increment() - } else if wasArchived { - // If we saw event from an archived event model, reload the model into memory - // and archive the oldest model then. TODO: A test case for this ? - if totalEventModels >= eventModelLimit { - archiveOldestLastSeenModel() - } - err := manager.reloadModel(archivedModel) - if err != nil { - eventModel = manager.createModel(writeKey, eventType, eventIdentifier, totalEventModels, archiveOldestLastSeenModel) - } else { - eventModel, ok = manager.eventModelMap[WriteKey(writeKey)][EventType(eventType)][EventIdentifier(eventIdentifier)] - if !ok { - pkgLogger.Errorf(`[EventSchemas] Failed to reload event +%v, writeKey: %s, eventType: %s, eventIdentifier: %s`, archivedModel.UUID, writeKey, eventType, eventIdentifier) - return - } - stats.Default.NewTaggedStat("reload_archived_event_model", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": eventModel.WriteKey, "eventIdentifier": eventModel.EventIdentifier}).Increment() - } - } else { - eventModel = manager.createModel(writeKey, eventType, eventIdentifier, totalEventModels, archiveOldestLastSeenModel) - } - } - eventModel.LastSeen = timeutil.Now() - - eventMap := map[string]interface{}(event) - flattenedEvent, err := flatten.Flatten(eventMap, "", flatten.DotStyle) - if err != nil { - pkgLogger.Debugf("[EventSchemas] Failed to flatten the event %+v with error: %v", eventMap, err) - return - } - - schema := getSchema(flattenedEvent) - schemaHash := getSchemaHash(schema) - computeFrequencies(flattenedEvent, schemaHash) - computeFrequencies(flattenedEvent, eventModel.UUID) - - // In case we have changed the limit of frequency counter - // mid process, we need to make sure the counters are pruned effectively. - pruneFrequencyCounters(schemaHash, frequencyCounterLimit) - pruneFrequencyCounters(eventModel.UUID, frequencyCounterLimit) - - var schemaVersion *SchemaVersionT - var schemaFoundInCache bool - schemaVersion, schemaFoundInCache = manager.schemaVersionMap[eventModel.UUID][schemaHash] - - if !schemaFoundInCache { - // check in offloaded schema versions - var wasOffloaded bool - var offloadedVersion *OffloadedSchemaVersionT - if bySchemaHash, ok := offloadedSchemaVersions[eventModel.UUID]; ok { - offloadedVersion, wasOffloaded = bySchemaHash[schemaHash] - } - - // check in archived schema versions - var wasArchived bool - var archivedVersion *OffloadedSchemaVersionT - if bySchemaHash, ok := archivedSchemaVersions[eventModel.UUID]; ok { - archivedVersion, wasArchived = bySchemaHash[schemaHash] - } - - archiveOldestLastSeenVersion := func() { - oldestVersion := manager.oldestSeenVersion(eventModel.UUID) - toDeleteSchemaVersionIDs = append(toDeleteSchemaVersionIDs, oldestVersion.UUID) - manager.deleteFromSchemaVersionCache(oldestVersion) - if _, ok := archivedSchemaVersions[oldestVersion.EventModelID]; !ok { - archivedSchemaVersions[oldestVersion.EventModelID] = make(map[string]*OffloadedSchemaVersionT) - } - archivedSchemaVersions[oldestVersion.EventModelID][oldestVersion.SchemaHash] = &OffloadedSchemaVersionT{UUID: oldestVersion.UUID, LastSeen: oldestVersion.LastSeen, EventModelID: oldestVersion.EventModelID, SchemaHash: oldestVersion.SchemaHash} - stats.Default.NewTaggedStat("archive_schema_version", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": eventModel.WriteKey, "eventIdentifier": eventModel.EventIdentifier}).Increment() - } - - totalSchemaVersions := len(manager.schemaVersionMap[eventModel.UUID]) - totalSchemaVersions += len(offloadedSchemaVersions[eventModel.UUID]) - - if wasOffloaded { - manager.reloadSchemaVersion(offloadedVersion) - schemaVersion, ok = manager.schemaVersionMap[eventModel.UUID][schemaHash] - if !ok { - pkgLogger.Errorf(`[EventSchemas] Failed to reload event +%v, writeKey: %s, eventType: %s, eventIdentifier: %s`, offloadedVersion.UUID, writeKey, eventType, eventIdentifier) - return - } - stats.Default.NewTaggedStat("reload_offloaded_schema_version", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": eventModel.WriteKey, "eventIdentifier": eventModel.EventIdentifier}).Increment() - } else if wasArchived { - if totalSchemaVersions >= schemaVersionPerEventModelLimit { - archiveOldestLastSeenVersion() - } - err := manager.reloadSchemaVersion(archivedVersion) - if err != nil { - schemaVersion = manager.createSchema(schema, schemaHash, eventModel, totalSchemaVersions, archiveOldestLastSeenVersion) - } else { - schemaVersion, ok = manager.schemaVersionMap[eventModel.UUID][schemaHash] - if !ok { - pkgLogger.Errorf(`[EventSchemas] Failed to reload event +%v, writeKey: %s, eventType: %s, eventIdentifier: %s`, archivedVersion.UUID, writeKey, eventType, eventIdentifier) - return - } - stats.Default.NewTaggedStat("reload_archived_schema_version", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": eventModel.WriteKey, "eventIdentifier": eventModel.EventIdentifier}).Increment() - } - } else { - schemaVersion = manager.createSchema(schema, schemaHash, eventModel, totalSchemaVersions, archiveOldestLastSeenVersion) - } - } - - schemaVersion.LastSeen = timeutil.Now() - manager.updateSchemaVersionCache(schemaVersion, true) - - eventModel.reservoirSample.add(event, true) - schemaVersion.reservoirSample.add(event, true) - updatedEventModels[eventModel.UUID] = eventModel -} - -func (manager *EventSchemaManagerT) createModel(writeKey, eventType, eventIdentifier string, totalEventModels int, archiveOldestLastSeenModel func()) *EventModelT { - eventModelID := uuid.New().String() - em := &EventModelT{ - UUID: eventModelID, - WriteKey: writeKey, - EventType: eventType, - EventIdentifier: eventIdentifier, - Schema: []byte("{}"), - reservoirSample: NewReservoirSampler(reservoirSampleSize, 0, 0), - } - - if totalEventModels >= eventModelLimit { - archiveOldestLastSeenModel() - } - manager.updateEventModelCache(em, true) - stats.Default.NewTaggedStat("record_new_event_model", stats.CountType, stats.Tags{ - "module": "event_schemas", - "writeKey": em.WriteKey, - "eventIdentifier": em.EventIdentifier, - }).Increment() - return em -} - -func (manager *EventSchemaManagerT) createSchema(schema map[string]string, schemaHash string, eventModel *EventModelT, totalSchemaVersions int, archiveOldestLastSeenVersion func()) *SchemaVersionT { - versionID := uuid.New().String() - schemaVersion := manager.NewSchemaVersion(versionID, schema, schemaHash, eventModel.UUID) - eventModel.mergeSchema(schemaVersion) - - if totalSchemaVersions >= schemaVersionPerEventModelLimit { - archiveOldestLastSeenVersion() - } - stats.Default.NewTaggedStat("record_new_schema_version", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": eventModel.WriteKey, "eventIdentifier": eventModel.EventIdentifier}).Increment() - return schemaVersion -} - -func (manager *EventSchemaManagerT) oldestSeenModel(writeKey string) *EventModelT { - var oldestSeenModel *EventModelT - var minLastSeen time.Time - for _, eventIdentifierMap := range manager.eventModelMap[WriteKey(writeKey)] { - for _, model := range eventIdentifierMap { - if !model.LastSeen.IsZero() && (model.LastSeen.Sub(minLastSeen).Seconds() <= 0 || minLastSeen.IsZero()) { - oldestSeenModel = model - minLastSeen = model.LastSeen - } - } - } - for _, offloadedModel := range offloadedEventModels[writeKey] { - if !offloadedModel.LastSeen.IsZero() && (offloadedModel.LastSeen.Sub(minLastSeen).Seconds() <= 0 || minLastSeen.IsZero()) { - model := EventModelT{} - model.UUID = offloadedModel.UUID - model.WriteKey = offloadedModel.WriteKey - model.EventType = offloadedModel.EventType - model.EventIdentifier = offloadedModel.EventIdentifier - model.LastSeen = offloadedModel.LastSeen - minLastSeen = offloadedModel.LastSeen - oldestSeenModel = &model - } - } - return oldestSeenModel -} - -func (manager *EventSchemaManagerT) oldestSeenVersion(modelID string) *SchemaVersionT { - var oldestSeenSchemaVersion SchemaVersionT - var minLastSeen time.Time - for _, schemaVersion := range manager.schemaVersionMap[modelID] { - if !schemaVersion.LastSeen.IsZero() && (schemaVersion.LastSeen.Sub(minLastSeen).Seconds() <= 0 || minLastSeen.IsZero()) { - oldestSeenSchemaVersion = *schemaVersion - minLastSeen = schemaVersion.LastSeen - } - } - for _, offloadedVersion := range offloadedSchemaVersions[modelID] { - if !offloadedVersion.LastSeen.IsZero() && (offloadedVersion.LastSeen.Sub(minLastSeen).Seconds() <= 0 || minLastSeen.IsZero()) { - oldestSeenSchemaVersion = SchemaVersionT{} - oldestSeenSchemaVersion.UUID = offloadedVersion.UUID - oldestSeenSchemaVersion.EventModelID = offloadedVersion.EventModelID - oldestSeenSchemaVersion.SchemaHash = offloadedVersion.SchemaHash - oldestSeenSchemaVersion.LastSeen = offloadedVersion.LastSeen - minLastSeen = offloadedVersion.LastSeen - } - } - return &oldestSeenSchemaVersion -} - -func (em *EventModelT) mergeSchema(sv *SchemaVersionT) { - masterSchema := make(map[string]string) - err := json.Unmarshal(em.Schema, &masterSchema) - assertError(err) - - schema := make(map[string]string) - err = json.Unmarshal(sv.Schema, &schema) - assertError(err) - - errors := make([]string, 0) - for k := range schema { - t, ok := masterSchema[k] - if !ok { - masterSchema[k] = schema[k] - continue - } - if !strings.Contains(t, schema[k]) { - masterSchema[k] = fmt.Sprintf("%s,%s", t, schema[k]) - } - } - - if len(errors) > 0 { - pkgLogger.Errorf("EventModel with ID: %s has encountered following disparities:\n%s", em.ID, strings.Join(errors, "\n")) - } - - masterSchemaJSON, err := json.Marshal(masterSchema) - assertError(err) - em.Schema = masterSchemaJSON -} - -// NewSchemaVersion should be used when a schemaVersion is not found in its cache and requires, a schemaVersionID for the newSchema and the eventModelID to which it belongs along with schema and schemaHash -func (*EventSchemaManagerT) NewSchemaVersion(versionID string, schema map[string]string, schemaHash, eventModelID string) *SchemaVersionT { - schemaJSON, err := json.Marshal(schema) - assertError(err) - - schemaVersion := &SchemaVersionT{ - UUID: versionID, - SchemaHash: schemaHash, - EventModelID: eventModelID, - Schema: schemaJSON, - FirstSeen: timeutil.Now(), - LastSeen: timeutil.Now(), - } - schemaVersion.reservoirSample = NewReservoirSampler(reservoirSampleSize, 0, 0) - return schemaVersion -} - -func (manager *EventSchemaManagerT) recordEvents() { - for gatewayEventBatch := range eventSchemaChannel { - if !areEventSchemasPopulated { - continue - } - var eventPayload EventPayloadT - err := json.Unmarshal([]byte(gatewayEventBatch.eventBatch), &eventPayload) - assertError(err) - for _, event := range eventPayload.Batch { - manager.handleEvent(eventPayload.WriteKey, event) - } - } -} - -func getMetadataJSON(reservoirSample *ReservoirSample, schemaHash string) []byte { - metadata := &MetaDataT{ - SampledEvents: reservoirSample.getSamples(), - TotalCount: reservoirSample.getTotalCount(), - } - metadata.Counters = getSchemaVersionCounters(schemaHash) - - metadataJSON, err := json.Marshal(metadata) - pkgLogger.Debugf("[EventSchemas] Metadata JSON: %s", string(metadataJSON)) - assertError(err) - return metadataJSON -} - -func getPrivateDataJSON(schemaHash string) []byte { - privateData := &PrivateDataT{ - FrequencyCounters: getAllFrequencyCounters(schemaHash), - } - - privateDataJSON, err := json.Marshal(privateData) - pkgLogger.Debugf("[EventSchemas] Private Data JSON: %s", string(privateDataJSON)) - assertError(err) - return privateDataJSON -} - -func (manager *EventSchemaManagerT) flushEventSchemas(ctx context.Context) { - // This will run forever. If you want to quit in between, change it to ticker and call stop() - // Otherwise the ticker won't be GC'ed - ticker := time.NewTicker(flushInterval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - pkgLogger.Infof("Received signal to stop flushing event schemas returning") - return - - case <-ticker.C: - pkgLogger.Info("Starting with flushing event schemas") - - if !areEventSchemasPopulated { - pkgLogger.Warn("Event schemas aren't populated, continuing") - continue - } - if err := manager.flushEventSchemasToDB(ctx); err != nil { - pkgLogger.Errorf("Unable to flush event schemas to DB: %v", err) - } - - } - } -} - -// flushEventSchemasToDB is the main function which is responsible -// for pushing the deltas collected in-memory into the database. -func (manager *EventSchemaManagerT) flushEventSchemasToDB(ctx context.Context) error { - // If needed, copy the maps and release the lock immediately - manager.eventModelLock.Lock() - manager.schemaVersionLock.Lock() - - defer manager.eventModelLock.Unlock() - defer manager.schemaVersionLock.Unlock() - - schemaVersionsInCache := make([]*SchemaVersionT, 0) - for _, sv := range updatedSchemaVersions { - schemaVersionsInCache = append(schemaVersionsInCache, sv) - } - - if len(updatedEventModels) == 0 && len(schemaVersionsInCache) == 0 { - return nil - } - - flushDBHandle := createDBConnection() - defer flushDBHandle.Close() - - txn, err := flushDBHandle.Begin() - if err != nil { - return fmt.Errorf("unable to begin txn to flush event schemas, err: %w", err) - } - - if len(updatedEventModels) > 0 { - if err := flushEventModels(ctx, txn, updatedEventModels); err != nil { - txn.Rollback() - return fmt.Errorf("unable to flush event models to db: err: %w", err) - } - } - - if len(schemaVersionsInCache) > 0 { - if err := flushSchemaVersions(ctx, txn, schemaVersionsInCache); err != nil { - txn.Rollback() - return fmt.Errorf("unable to flush schema versions to db, err: %w", err) - } - } - - err = txn.Commit() - if err != nil { - return fmt.Errorf("unable to commit txn to update event models and schema versions, err: %w", err) - } - - updatedEventModels = make(map[string]*EventModelT) - updatedSchemaVersions = make(map[string]*SchemaVersionT) - toDeleteEventModelIDs = []string{} - toDeleteSchemaVersionIDs = []string{} - - return nil -} - -func flushSchemaVersions(ctx context.Context, txn *sql.Tx, schemaVersionsInCache []*SchemaVersionT) error { - versionIDs := make([]string, 0, len(schemaVersionsInCache)) - for uid := range updatedSchemaVersions { - versionIDs = append(versionIDs, uid) - } - - deleteOldVersionsSQL := fmt.Sprintf(`DELETE FROM %s WHERE uuid IN ('%s')`, SCHEMA_VERSIONS_TABLE, strings.Join(versionIDs, "', '")) - _, err := txn.ExecContext(ctx, deleteOldVersionsSQL) - if err != nil { - return fmt.Errorf("unable to delete old schema versions, err: %w", err) - } - - if len(toDeleteSchemaVersionIDs) > 0 { - archiveVersionsSQL := fmt.Sprintf(`UPDATE %s SET archived=%t WHERE uuid IN ('%s')`, SCHEMA_VERSIONS_TABLE, true, strings.Join(toDeleteSchemaVersionIDs, "', '")) - _, err = txn.Exec(archiveVersionsSQL) - if err != nil { - return fmt.Errorf("unable to archive schema versions, err: %w", err) - } - } - - stmt, err := txn.Prepare(pq.CopyIn(SCHEMA_VERSIONS_TABLE, - "uuid", "event_model_id", "schema_hash", "schema", "metadata", "private_data", - "first_seen", "last_seen", "total_count")) - if err != nil { - return fmt.Errorf("unable to create prepare statement for copying schema versions, err: %w ", err) - } - - defer stmt.Close() - - for _, sv := range schemaVersionsInCache { - metadataJSON := getMetadataJSON(sv.reservoirSample, sv.SchemaHash) - privateDataJSON := getPrivateDataJSON(sv.SchemaHash) - sv.TotalCount = sv.reservoirSample.totalCount - - _, err = stmt.ExecContext(ctx, sv.UUID, sv.EventModelID, sv.SchemaHash, string(sv.Schema), - string(metadataJSON), string(privateDataJSON), sv.FirstSeen, sv.LastSeen, sv.TotalCount) - if err != nil { - return fmt.Errorf("unable to update schema version: %d in db, err: %w", sv.ID, err) - } - } - _, err = stmt.ExecContext(ctx) - if err != nil { - return fmt.Errorf("unable to execute schema version statement, err: %w", err) - } - - stats.Default.NewTaggedStat( - "update_schema_version_count", stats.GaugeType, stats.Tags{"module": "event_schemas"}, - ).Gauge(len(versionIDs)) - pkgLogger.Debugf("[EventSchemas][Flush] %d new schema versions", len(schemaVersionsInCache)) - - return nil -} - -func flushEventModels(ctx context.Context, txn *sql.Tx, updatedEventModels map[string]*EventModelT) error { - eventModelIds := make([]string, 0, len(updatedEventModels)) - for _, em := range updatedEventModels { - eventModelIds = append(eventModelIds, em.UUID) - } - - deleteOldEventModelsSQL := fmt.Sprintf(`DELETE FROM %s WHERE uuid IN ('%s')`, EVENT_MODELS_TABLE, strings.Join(eventModelIds, "', '")) - _, err := txn.ExecContext(ctx, deleteOldEventModelsSQL) - if err != nil { - return fmt.Errorf("unable to delete event modelIds, err: %w", err) - } - - stmt, err := txn.Prepare(pq.CopyIn(EVENT_MODELS_TABLE, - "uuid", "write_key", "event_type", "event_model_identifier", "schema", "metadata", - "private_data", "last_seen", "total_count")) - if err != nil { - return fmt.Errorf("unable to copy statement, err: %w", err) - } - - defer stmt.Close() - - for eventModelID, eventModel := range updatedEventModels { - metadataJSON := getMetadataJSON(eventModel.reservoirSample, eventModel.UUID) - privateDataJSON := getPrivateDataJSON(eventModel.UUID) - eventModel.TotalCount = eventModel.reservoirSample.totalCount - - _, err = stmt.ExecContext(ctx, eventModelID, eventModel.WriteKey, eventModel.EventType, - eventModel.EventIdentifier, string(eventModel.Schema), string(metadataJSON), - string(privateDataJSON), eventModel.LastSeen, eventModel.TotalCount) - if err != nil { - return fmt.Errorf("unable to execute statement to update event model, err: %w", err) - } - } - _, err = stmt.ExecContext(ctx) - if err != nil { - return fmt.Errorf("unable to execute the statement") - } - - stats.Default.NewTaggedStat( - "update_event_model_count", stats.GaugeType, stats.Tags{"module": "event_schemas"}, - ).Gauge(len(eventModelIds)) - - pkgLogger.Debugf("[EventSchemas][Flush] %d new event types", len(updatedEventModels)) - return nil -} - -func eventTypeIdentifier(eventType, eventIdentifier string) string { - return fmt.Sprintf(`%s::%s`, eventType, eventIdentifier) -} - -func (manager *EventSchemaManagerT) offloadEventSchemas() { - for { - if !areEventSchemasPopulated { - time.Sleep(time.Second * 10) - continue - } - - time.Sleep(offloadLoopInterval) - manager.eventModelLock.Lock() - manager.schemaVersionLock.Lock() - for _, modelsByWriteKey := range manager.eventModelMap { - for _, modelsByEventType := range modelsByWriteKey { - for _, model := range modelsByEventType { - if timeutil.Now().Sub(model.LastSeen) > offloadThreshold { - pkgLogger.Infof("offloading model: %s-%s UUID:%s", model.EventType, model.EventIdentifier, model.UUID) - if _, ok := offloadedEventModels[model.WriteKey]; !ok { - offloadedEventModels[model.WriteKey] = make(map[string]*OffloadedModelT) - } - manager.deleteFromEventModelCache(model) - offloadedEventModels[model.WriteKey][eventTypeIdentifier(model.EventType, model.EventIdentifier)] = &OffloadedModelT{UUID: model.UUID, LastSeen: model.LastSeen, WriteKey: model.WriteKey, EventType: model.EventType, EventIdentifier: model.EventIdentifier} - stats.Default.NewTaggedStat("offload_event_model", stats.CountType, stats.Tags{"module": "event_schemas", "writeKey": model.WriteKey, "eventIdentifier": model.EventIdentifier}).Increment() - } - } - } - } - for _, modelsByWriteKey := range manager.schemaVersionMap { - for _, version := range modelsByWriteKey { - if timeutil.Now().Sub(version.LastSeen) > offloadThreshold { - if _, ok := offloadedSchemaVersions[version.EventModelID]; !ok { - offloadedSchemaVersions[version.EventModelID] = make(map[string]*OffloadedSchemaVersionT) - } - manager.deleteFromSchemaVersionCache(&SchemaVersionT{EventModelID: version.EventModelID, SchemaHash: version.SchemaHash}) - offloadedSchemaVersions[version.EventModelID][version.SchemaHash] = &OffloadedSchemaVersionT{UUID: version.UUID, LastSeen: version.LastSeen, EventModelID: version.EventModelID, SchemaHash: version.SchemaHash} - stats.Default.NewTaggedStat("offload_schema_version", stats.CountType, stats.Tags{"module": "event_schemas"}).Increment() - } - } - } - manager.schemaVersionLock.Unlock() - manager.eventModelLock.Unlock() - } -} - -func (manager *EventSchemaManagerT) reloadModel(offloadedModel *OffloadedModelT) error { - pkgLogger.Infof("reloading event model from db: %s\n", offloadedModel.UUID) - err := manager.populateEventModels(offloadedModel.UUID) - if err != nil { - return err - } - manager.populateSchemaVersionsMinimal(offloadedModel.UUID) - delete(offloadedEventModels[offloadedModel.WriteKey], eventTypeIdentifier(offloadedModel.EventType, offloadedModel.EventIdentifier)) - delete(archivedEventModels[offloadedModel.WriteKey], eventTypeIdentifier(offloadedModel.EventType, offloadedModel.EventIdentifier)) - return nil -} - -// reloadSchemaVersion fetches the latest state of the schema version from db and add the information -func (manager *EventSchemaManagerT) reloadSchemaVersion(offloadedVersion *OffloadedSchemaVersionT) error { - pkgLogger.Debugf("reloading schema vesion from db: %s\n", offloadedVersion.UUID) - err := manager.populateSchemaVersion(offloadedVersion) - if err != nil { - return err - } - delete(offloadedSchemaVersions[offloadedVersion.EventModelID], offloadedVersion.SchemaHash) - delete(archivedSchemaVersions[offloadedVersion.EventModelID], offloadedVersion.SchemaHash) - return nil -} - -// TODO: Move this into some DB manager -func createDBConnection() *sql.DB { - psqlInfo := misc.GetConnectionString(config.Default) - var err error - dbHandle, err := sql.Open("postgres", psqlInfo) - if err != nil { - panic(err) - } - - err = dbHandle.Ping() - if err != nil { - panic(err) - } - return dbHandle -} - -func assertError(err error) { - if err != nil { - panic(err) - } -} - -func (manager *EventSchemaManagerT) populateEventModels(uuidFilters ...string) error { - var uuidFilter string - if len(uuidFilters) > 0 { - uuidFilter = fmt.Sprintf(`WHERE uuid in ('%s')`, strings.Join(uuidFilters, "', '")) - } - - eventModelsSelectSQL := fmt.Sprintf(`SELECT id, uuid, write_key, event_type, event_model_identifier, created_at, schema, private_data, total_count, last_seen, (metadata->>'TotalCount')::bigint, metadata->'SampledEvents' FROM %s %s`, EVENT_MODELS_TABLE, uuidFilter) - - rows, err := manager.dbHandle.Query(eventModelsSelectSQL) - if err == sql.ErrNoRows { - return err - } else { - assertError(err) - } - defer func() { _ = rows.Close() }() - - // for each event model, we need to fetch the data from db. - for rows.Next() { - var eventModel EventModelT - var privateDataRaw json.RawMessage - var totalCount int64 - var sampleEventsRaw json.RawMessage - err := rows.Scan(&eventModel.ID, &eventModel.UUID, &eventModel.WriteKey, &eventModel.EventType, - &eventModel.EventIdentifier, &eventModel.CreatedAt, &eventModel.Schema, - &privateDataRaw, &eventModel.TotalCount, &eventModel.LastSeen, &totalCount, &sampleEventsRaw) - - assertError(err) - - var privateData PrivateDataT - err = json.Unmarshal(privateDataRaw, &privateData) - assertError(err) - - var sampleEvents []interface{} - err = json.Unmarshal(sampleEventsRaw, &sampleEvents) - assertError(err) - - reservoirSize := len(sampleEvents) - if reservoirSize > reservoirSampleSize { - reservoirSize = reservoirSampleSize - } - eventModel.reservoirSample = NewReservoirSampler(reservoirSampleSize, reservoirSize, totalCount) - for idx, sampledEvent := range sampleEvents { - if idx > reservoirSampleSize-1 { - continue - } - eventModel.reservoirSample.add(sampledEvent, false) - } - manager.updateEventModelCache(&eventModel, false) - populateFrequencyCounters( - eventModel.UUID, - privateData.FrequencyCounters, - frequencyCounterLimit) - } - return nil -} - -func (manager *EventSchemaManagerT) populateEventModelsMinimal() { - eventModelsSelectSQL := fmt.Sprintf(`SELECT uuid, event_type, event_model_identifier, write_key, last_seen, archived FROM %s`, EVENT_MODELS_TABLE) - - rows, err := manager.dbHandle.Query(eventModelsSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - for rows.Next() { - var eventModel EventModelT - err := rows.Scan(&eventModel.UUID, &eventModel.EventType, &eventModel.EventIdentifier, &eventModel.WriteKey, &eventModel.LastSeen, &eventModel.Archived) - - assertError(err) - - if eventModel.Archived { - if _, ok := archivedEventModels[eventModel.WriteKey]; !ok { - archivedEventModels[eventModel.WriteKey] = make(map[string]*OffloadedModelT) - } - - archivedEventModels[eventModel.WriteKey][eventTypeIdentifier(eventModel.EventType, eventModel.EventIdentifier)] = &OffloadedModelT{UUID: eventModel.UUID, LastSeen: eventModel.LastSeen, WriteKey: eventModel.WriteKey, EventType: eventModel.EventType, EventIdentifier: eventModel.EventIdentifier} - } else { - if _, ok := offloadedEventModels[eventModel.WriteKey]; !ok { - offloadedEventModels[eventModel.WriteKey] = make(map[string]*OffloadedModelT) - } - - offloadedEventModels[eventModel.WriteKey][eventTypeIdentifier(eventModel.EventType, eventModel.EventIdentifier)] = &OffloadedModelT{UUID: eventModel.UUID, LastSeen: eventModel.LastSeen, WriteKey: eventModel.WriteKey, EventType: eventModel.EventType, EventIdentifier: eventModel.EventIdentifier} - } - } -} - -func (manager *EventSchemaManagerT) populateSchemaVersionsMinimal(modelIDFilters ...string) { - var modelIDFilter string - if len(modelIDFilters) > 0 { - modelIDFilter = fmt.Sprintf(`WHERE event_model_id in ('%s')`, strings.Join(modelIDFilters, "', '")) - } - - schemaVersionsSelectSQL := fmt.Sprintf(`SELECT uuid, event_model_id, schema_hash, last_seen, archived FROM %s %s`, SCHEMA_VERSIONS_TABLE, modelIDFilter) - - rows, err := manager.dbHandle.Query(schemaVersionsSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - for rows.Next() { - var schemaVersion SchemaVersionT - err := rows.Scan(&schemaVersion.UUID, &schemaVersion.EventModelID, &schemaVersion.SchemaHash, &schemaVersion.LastSeen, &schemaVersion.Archived) - assertError(err) - - if schemaVersion.Archived { - if _, ok := archivedSchemaVersions[schemaVersion.EventModelID]; !ok { - archivedSchemaVersions[schemaVersion.EventModelID] = make(map[string]*OffloadedSchemaVersionT) - } - archivedSchemaVersions[schemaVersion.EventModelID][schemaVersion.SchemaHash] = &OffloadedSchemaVersionT{UUID: schemaVersion.UUID, LastSeen: schemaVersion.LastSeen, EventModelID: schemaVersion.EventModelID, SchemaHash: schemaVersion.SchemaHash} - } else { - if _, ok := offloadedSchemaVersions[schemaVersion.EventModelID]; !ok { - offloadedSchemaVersions[schemaVersion.EventModelID] = make(map[string]*OffloadedSchemaVersionT) - } - offloadedSchemaVersions[schemaVersion.EventModelID][schemaVersion.SchemaHash] = &OffloadedSchemaVersionT{UUID: schemaVersion.UUID, LastSeen: schemaVersion.LastSeen, EventModelID: schemaVersion.EventModelID, SchemaHash: schemaVersion.SchemaHash} - } - - } -} - -func (manager *EventSchemaManagerT) populateSchemaVersion(o *OffloadedSchemaVersionT) error { - schemaVersionsSelectSQL := fmt.Sprintf(`SELECT id, uuid, event_model_id, schema_hash, schema, private_data,first_seen, last_seen, total_count, (metadata->>'TotalCount')::bigint, metadata->'SampledEvents' FROM %s WHERE uuid = '%s'`, SCHEMA_VERSIONS_TABLE, o.UUID) - - var schemaVersion SchemaVersionT - var privateDataRaw json.RawMessage - var totalCount int64 - var sampleEventsRaw json.RawMessage - - err := manager.dbHandle.QueryRow(schemaVersionsSelectSQL).Scan(&schemaVersion.ID, &schemaVersion.UUID, &schemaVersion.EventModelID, &schemaVersion.SchemaHash, &schemaVersion.Schema, &privateDataRaw, &schemaVersion.FirstSeen, &schemaVersion.LastSeen, &schemaVersion.TotalCount, &totalCount, &sampleEventsRaw) - if err == sql.ErrNoRows { - return err - } else { - assertError(err) - } - - var privateData PrivateDataT - err = json.Unmarshal(privateDataRaw, &privateData) - assertError(err) - - var sampleEvents []interface{} - err = json.Unmarshal(sampleEventsRaw, &sampleEvents) - assertError(err) - - reservoirSize := len(sampleEvents) - if reservoirSize > reservoirSampleSize { - reservoirSize = reservoirSampleSize - } - schemaVersion.reservoirSample = NewReservoirSampler(reservoirSampleSize, reservoirSize, totalCount) - for idx, sampledEvent := range sampleEvents { - if idx > reservoirSampleSize-1 { - continue - } - schemaVersion.reservoirSample.add(sampledEvent, false) - } - - manager.updateSchemaVersionCache(&schemaVersion, false) - populateFrequencyCounters( - schemaVersion.SchemaHash, - privateData.FrequencyCounters, - frequencyCounterLimit) - - return nil -} - -// This should be called during the Initialize() to populate existing event Schemas -func (manager *EventSchemaManagerT) populateEventSchemas() { - pkgLogger.Infof(`Populating event models and their schema versions into in-memory`) - manager.populateEventModelsMinimal() - manager.populateSchemaVersionsMinimal() -} - -func setEventSchemasPopulated(status bool) { - areEventSchemasPopulated = status -} - -func getSchema(flattenedEvent map[string]interface{}) map[string]string { - schema := make(map[string]string) - for k, v := range flattenedEvent { - reflectType := reflect.TypeOf(v) - if reflectType != nil { - schema[k] = reflectType.String() - } else if !(v == nil && !shouldCaptureNilAsUnknowns) { - schema[k] = "unknown" - pkgLogger.Errorf("[EventSchemas] Got invalid reflectType %+v", v) - } - } - return schema -} - -func getSchemaHash(schema map[string]string) string { - keys := make([]string, 0, len(schema)) - for k := range schema { - keys = append(keys, k) - } - sort.Strings(keys) - var sb strings.Builder - for _, k := range keys { - sb.WriteString(k) - sb.WriteString(":") - sb.WriteString(schema[k]) - sb.WriteString(",") - } - - schemaHash := misc.GetMD5Hash(sb.String()) - return schemaHash -} - -// computeFrequencies iterates over the keys of input event and augment the count -// for values seen in that key. -func computeFrequencies(flattenedEvent map[string]interface{}, schemaHash string) { - // Once the frequency counters are pruned, we can augment them. - for k, v := range flattenedEvent { - fc := getFrequencyCounter(schemaHash, k, frequencyCounterLimit) - if fc == nil { - continue - } - stringVal := fmt.Sprintf("%v", v) - fc.Observe(&stringVal) - } -} - -// getEventSchemaManager returns the schema manager object which needs to be fully setup -// by calling the Setup function on it. -func getEventSchemaManager(dbHandle *sql.DB, disableInMemoryCache bool) *EventSchemaManagerT { - return &EventSchemaManagerT{ - dbHandle: dbHandle, - disableInMemoryCache: disableInMemoryCache, - eventModelMap: make(EventModelMapT), - schemaVersionMap: make(SchemaVersionMapT), - } -} - -func (manager *EventSchemaManagerT) Setup() { - pkgLogger.Info("[EventSchemas] Received a call to setup ") - if !manager.disableInMemoryCache { - rruntime.GoForWarehouse(func() { - defer setEventSchemasPopulated(true) - - populateESTimer := stats.Default.NewTaggedStat("populate_event_schemas", stats.TimerType, stats.Tags{"module": "event_schemas"}) - defer populateESTimer.RecordDuration()() - - manager.populateEventSchemas() - }) - } - eventSchemaChannel = make(chan *GatewayEventBatchT, 10000) - - for i := 0; i < noOfWorkers; i++ { - rruntime.GoForWarehouse(func() { - manager.recordEvents() - }) - } - - rruntime.GoForWarehouse(func() { - manager.flushEventSchemas(context.TODO()) - }) - - rruntime.GoForWarehouse(func() { - manager.offloadEventSchemas() - }) - - pkgLogger.Info("[EventSchemas] Set up eventSchemas successful.") -} diff --git a/event-schema/event_schema_api.go b/event-schema/event_schema_api.go deleted file mode 100644 index 9d7f7802f3..0000000000 --- a/event-schema/event_schema_api.go +++ /dev/null @@ -1,659 +0,0 @@ -// Package event_schema -// Handling HTTP requests to expose the schemas -package event_schema - -import ( - "encoding/json" - "fmt" - "net/http" - "strconv" - "strings" - - "github.com/go-chi/chi/v5" - "github.com/google/uuid" - - "github.com/rudderlabs/rudder-server/gateway/response" - "github.com/rudderlabs/rudder-server/utils/misc" -) - -func handleBasicAuth(r *http.Request) error { - username, password, ok := r.BasicAuth() - if !ok { - return fmt.Errorf("Basic auth credentials missing") - } - if username != adminUser || password != adminPassword { - return fmt.Errorf("Invalid admin credentials") - } - return nil -} - -func (manager *EventSchemaManagerT) GetEventModels(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - - writeKeys, ok := r.URL.Query()["WriteKey"] - writeKey := "" - if ok && writeKeys[0] != "" { - writeKey = writeKeys[0] - } - - eventTypes := manager.fetchEventModelsByWriteKey(writeKey) - - eventTypesJSON, err := json.Marshal(eventTypes) - if err != nil { - http.Error(w, response.MakeResponse("Internal Error: Failed to Marshal event types"), 500) - return - } - - _, _ = w.Write(eventTypesJSON) -} - -func (manager *EventSchemaManagerT) GetJsonSchemas(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - - writeKeys, ok := r.URL.Query()["WriteKey"] - writeKey := "" - if ok && writeKeys[0] != "" { - writeKey = writeKeys[0] - } - - eventModels := manager.fetchEventModelsByWriteKey(writeKey) - if len(eventModels) == 0 { - http.Error(w, response.MakeResponse("No event models exists to create a tracking plan."), 404) - return - } - - // generating json schema from eventModels - jsonSchemas, err := generateJsonSchFromEM(eventModels) - if err != nil { - http.Error(w, response.MakeResponse("Internal Error: Failed to Marshal event types"), 500) - return - } - - _, _ = w.Write(jsonSchemas) -} - -type JSPropertyTypeT struct { - Type []string `json:"type"` -} - -type JSPropertyT struct { - Property map[string]interface{} `json:"properties"` -} - -type JsonSchemaT struct { - Schema map[string]interface{} `json:"schema"` - SchemaType string `json:"schemaType"` - SchemaTIdentifier string `json:"schemaIdentifier"` -} - -// generateJsonSchFromEM Generates Json schemas from Event Models -func generateJsonSchFromEM(eventModels []*EventModelT) ([]byte, error) { - var jsonSchemas []JsonSchemaT - for _, eventModel := range eventModels { - flattenedSch := make(map[string]interface{}) - err := json.Unmarshal(eventModel.Schema, &flattenedSch) - if err != nil { - pkgLogger.Errorf("Error unmarshalling eventModelSch: %v for ID: %v", err, eventModel.ID) - continue - } - unFlattenedSch, err := unflatten(flattenedSch) - if err != nil { - pkgLogger.Errorf("Error unflattening flattenedSch: %v for ID: %v", err, eventModel.ID) - continue - } - schemaProperties, err := getETSchProp(eventModel.EventType, unFlattenedSch) - if err != nil { - pkgLogger.Errorf("Error while getting schema properties: %v for ID: %v", err, eventModel.ID) - continue - } - - jsonSchema := generateJsonSchFromSchProp(schemaProperties) - jsonSchema["additionalProperties"] = false - jsonSchema["$schema"] = "http://json-schema.org/draft-07/schema#" - - // TODO: validate if the jsonSchema is correct. - jsonSchemas = append(jsonSchemas, JsonSchemaT{ - Schema: jsonSchema, - SchemaType: eventModel.EventType, - SchemaTIdentifier: eventModel.EventIdentifier, - }) - } - eventJsonSchs, err := json.Marshal(jsonSchemas) - if err != nil { - return nil, err - } - return eventJsonSchs, nil -} - -// getETSchProp Get Event Type schema from Event Model Schema -// Empty map schema is allowed for event json schema -func getETSchProp(eventType string, eventModelSch map[string]interface{}) (map[string]interface{}, error) { - switch eventType { - case "track", "screen", "page": - filtered, _ := eventModelSch["properties"].(map[string]interface{}) - return filtered, nil - case "identify", "group": - filtered, _ := eventModelSch["traits"].(map[string]interface{}) - return filtered, nil - } - return nil, fmt.Errorf("invalid eventType") -} - -// generateJsonSchFromSchProp Generated Json schema from unflattened schema properties. -func generateJsonSchFromSchProp(schemaProperties map[string]interface{}) map[string]interface{} { - jsProperties := JSPropertyT{ - Property: make(map[string]interface{}), - } - finalSchema := make(map[string]interface{}) - - for k, v := range schemaProperties { - switch value := v.(type) { - case string: - jsProperties.Property[k] = getPropertyTypesFromSchValue(value) - case map[string]interface{}: - // check if map is an array or map - if checkIfArray(value) { - var vType interface{} - for _, v := range value { - vt, ok := v.(string) - if ok { - vType = getPropertyTypesFromSchValue(vt) - } else { - vType = generateJsonSchFromSchProp(v.(map[string]interface{})) - } - break - } - jsProperties.Property[k] = map[string]interface{}{ - "type": "array", - "items": vType, - } - break - } - jsProperties.Property[k] = generateJsonSchFromSchProp(value) - default: - pkgLogger.Errorf("unknown type found") - } - } - finalSchema["properties"] = jsProperties.Property - finalSchema["type"] = "object" - return finalSchema -} - -func getPropertyTypesFromSchValue(schVal string) *JSPropertyTypeT { - types := strings.Split(schVal, ",") - for i, v := range types { - types[i] = misc.GetJsonSchemaDTFromGoDT(v) - } - return &JSPropertyTypeT{ - Type: types, - } -} - -// prop.myarr.0 -// will not be able to say if above is prop{myarr:[0]} or prop{myarr{"0":0}} -func checkIfArray(value map[string]interface{}) bool { - if len(value) == 0 { - return false - } - - for k := range value { - _, err := strconv.Atoi(k) - if err != nil { - return false - } - // need not check the array continuity - } - return true -} - -// https://play.golang.org/p/4juOff38ea -// or use https://pkg.go.dev/github.com/wolfeidau/unflatten -// or use https://github.com/nqd/flat -func unflatten(flat map[string]interface{}) (map[string]interface{}, error) { - unflat := map[string]interface{}{} - - for key, value := range flat { - keyParts := strings.Split(key, ".") - - // Walk the keys until we get to a leaf node. - m := unflat - for i, k := range keyParts[:len(keyParts)-1] { - v, exists := m[k] - if !exists { - newMap := map[string]interface{}{} - m[k] = newMap - m = newMap - continue - } - - innerMap, ok := v.(map[string]interface{}) - if !ok { - fmt.Printf("key=%v is not an object\n", strings.Join(keyParts[0:i+1], ".")) - newMap := map[string]interface{}{} - m[k] = newMap - m = newMap - continue - } - m = innerMap - } - - leafKey := keyParts[len(keyParts)-1] - if _, exists := m[leafKey]; exists { - fmt.Printf("key=%v already exists", key) - continue - } - m[keyParts[len(keyParts)-1]] = value - } - - return unflat, nil -} - -func (manager *EventSchemaManagerT) GetEventVersions(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - - eventIDs, ok := r.URL.Query()["EventID"] - if !ok { - http.Error(w, response.MakeResponse("Mandatory field: EventID missing"), 400) - return - } - eventID := eventIDs[0] - - schemaVersions := manager.fetchSchemaVersionsByEventID(eventID) - schemaVersionsJSON, err := json.Marshal(schemaVersions) - if err != nil { - http.Error(w, response.MakeResponse("Internal Error: Failed to Marshal event types"), 500) - return - } - - _, _ = w.Write(schemaVersionsJSON) -} - -// TODO: Complete this -func (manager *EventSchemaManagerT) GetKeyCounts(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - eventID := chi.URLParam(r, "EventID") - if eventID == "" { - http.Error(w, response.MakeResponse("Mandatory field: EventID missing"), 400) - return - } - - keyCounts, err := manager.getKeyCounts(eventID) - if err != nil { - logID := uuid.New().String() - pkgLogger.Errorf("logID : %s, err: %s", logID, err.Error()) - http.Error(w, response.MakeResponse(fmt.Sprintf("Internal Error: An error has been logged with logID : %s", logID)), 500) - return - } - keyCountsJSON, err := json.Marshal(keyCounts) - if err != nil { - logID := uuid.New().String() - pkgLogger.Errorf("logID : %s, err: %s", logID, err.Error()) - http.Error(w, response.MakeResponse(fmt.Sprintf("Interna Error: An error has been logged with logID : %s", logID)), 500) - return - } - - _, _ = w.Write(keyCountsJSON) -} - -func (manager *EventSchemaManagerT) getKeyCounts(eventID string) (keyCounts map[string]int64, err error) { - schemaVersions := manager.fetchSchemaVersionsByEventID(eventID) - - keyCounts = make(map[string]int64) - for _, sv := range schemaVersions { - var schema map[string]string - err = json.Unmarshal(sv.Schema, &schema) - if err != nil { - return - } - for k := range schema { - _, ok := keyCounts[k] - if !ok { - keyCounts[k] = 0 - } - keyCounts[k] = keyCounts[k] + sv.TotalCount - } - } - return -} - -func (manager *EventSchemaManagerT) GetEventModelMetadata(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - - eventID := chi.URLParam(r, "EventID") - if eventID == "" { - http.Error(w, response.MakeResponse("Mandatory field: EventID missing"), 400) - return - } - - metadata, err := manager.fetchMetadataByEventModelID(eventID) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - metadataJSON, err := json.Marshal(metadata) - if err != nil { - http.Error(w, response.MakeResponse("Internal Error: Failed to Marshal metadata"), 500) - return - } - - _, _ = w.Write(metadataJSON) -} - -func (manager *EventSchemaManagerT) GetSchemaVersionMetadata(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - - versionID := chi.URLParam(r, "VersionID") - if versionID == "" { - http.Error(w, response.MakeResponse("Mandatory field: VersionID missing"), 400) - return - } - - metadata, err := manager.fetchMetadataByEventVersionID(versionID) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - metadataJSON, err := json.Marshal(metadata) - if err != nil { - http.Error(w, response.MakeResponse("Internal Error: Failed to Marshal metadata"), 500) - return - } - - _, _ = w.Write(metadataJSON) -} - -func (manager *EventSchemaManagerT) GetSchemaVersionMissingKeys(w http.ResponseWriter, r *http.Request) { - err := handleBasicAuth(r) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 400) - return - } - - if r.Method != http.MethodGet { - http.Error(w, response.MakeResponse("Only HTTP GET method is supported"), 400) - return - } - - versionID := chi.URLParam(r, "VersionID") - if versionID == "" { - http.Error(w, response.MakeResponse("Mandatory field: VersionID missing"), 400) - return - } - - schema, err := manager.fetchSchemaVersionByID(versionID) - if err != nil { - http.Error(w, response.MakeResponse(err.Error()), 500) - return - } - - eventModel, err := manager.fetchEventModelByID(schema.EventModelID) - if err != nil { - _, _ = w.Write([]byte("[]")) - return - } - - schemaMap := make(map[string]string) - masterSchemaMap := make(map[string]string) - - err = json.Unmarshal(schema.Schema, &schemaMap) - if err != nil { - logID := uuid.New().String() - pkgLogger.Errorf("logID : %s, err: %s", logID, err.Error()) - http.Error(w, response.MakeResponse(fmt.Sprintf("Internal Error: An error has been logged with logID : %s", logID)), 500) - return - } - - err = json.Unmarshal(eventModel.Schema, &masterSchemaMap) - if err != nil { - logID := uuid.New().String() - pkgLogger.Errorf("logID : %s, err: %s", logID, err.Error()) - http.Error(w, response.MakeResponse(fmt.Sprintf("Interna Error: An error has been logged with logID : %s", logID)), 500) - return - } - - missingKeys := make([]string, 0) - - for k := range masterSchemaMap { - if _, ok := schemaMap[k]; !ok { - missingKeys = append(missingKeys, k) - } - } - - missingKeyJSON, err := json.Marshal(missingKeys) - if err != nil { - http.Error(w, response.MakeResponse("Internal Error: Failed to Marshal metadata"), 500) - return - } - - _, _ = w.Write(missingKeyJSON) -} - -func (manager *EventSchemaManagerT) fetchEventModelsByWriteKey(writeKey string) []*EventModelT { - var eventModelsSelectSQL string - if writeKey == "" { - eventModelsSelectSQL = fmt.Sprintf(`SELECT id, uuid, write_key, event_type, event_model_identifier, created_at, schema, total_count, last_seen FROM %s`, EVENT_MODELS_TABLE) - } else { - eventModelsSelectSQL = fmt.Sprintf(`SELECT id, uuid, write_key, event_type, event_model_identifier, created_at, schema, total_count, last_seen FROM %s WHERE write_key = '%s'`, EVENT_MODELS_TABLE, writeKey) - } - - rows, err := manager.dbHandle.Query(eventModelsSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - eventModels := make([]*EventModelT, 0) - - for rows.Next() { - var eventModel EventModelT - err := rows.Scan(&eventModel.ID, &eventModel.UUID, &eventModel.WriteKey, &eventModel.EventType, - &eventModel.EventIdentifier, &eventModel.CreatedAt, &eventModel.Schema, &eventModel.TotalCount, &eventModel.LastSeen) - assertError(err) - - eventModels = append(eventModels, &eventModel) - } - - return eventModels -} - -func (manager *EventSchemaManagerT) fetchSchemaVersionsByEventID(eventID string) []*SchemaVersionT { - schemaVersionsSelectSQL := fmt.Sprintf(`SELECT id, uuid, event_model_id, schema, first_seen, last_seen, total_count FROM %s WHERE event_model_id = '%s'`, SCHEMA_VERSIONS_TABLE, eventID) - - rows, err := manager.dbHandle.Query(schemaVersionsSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - schemaVersions := make([]*SchemaVersionT, 0) - - for rows.Next() { - var schemaVersion SchemaVersionT - err := rows.Scan(&schemaVersion.ID, &schemaVersion.UUID, &schemaVersion.EventModelID, - &schemaVersion.Schema, &schemaVersion.FirstSeen, &schemaVersion.LastSeen, &schemaVersion.TotalCount) - assertError(err) - - schemaVersions = append(schemaVersions, &schemaVersion) - } - - return schemaVersions -} - -func (manager *EventSchemaManagerT) fetchEventModelByID(id string) (*EventModelT, error) { - eventModelsSelectSQL := fmt.Sprintf(`SELECT id, uuid, write_key, event_type, event_model_identifier, created_at, schema, total_count, last_seen FROM %s WHERE uuid = '%s'`, EVENT_MODELS_TABLE, id) - - rows, err := manager.dbHandle.Query(eventModelsSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - eventModels := make([]*EventModelT, 0) - - for rows.Next() { - var eventModel EventModelT - err := rows.Scan(&eventModel.ID, &eventModel.UUID, &eventModel.WriteKey, &eventModel.EventType, - &eventModel.EventIdentifier, &eventModel.CreatedAt, &eventModel.Schema, &eventModel.TotalCount, &eventModel.LastSeen) - assertError(err) - - eventModels = append(eventModels, &eventModel) - } - - if len(eventModels) == 0 { - err = fmt.Errorf("No eventModels found for given eventModelID : %s", id) - return nil, err - } - - if len(eventModels) > 1 { - panic(fmt.Sprintf("More than one entry found for eventModelId : %s. Make sure a unique key constraint is present on uuid column", id)) - } - - return eventModels[0], nil -} - -func (manager *EventSchemaManagerT) fetchSchemaVersionByID(id string) (*SchemaVersionT, error) { - schemaVersionsSelectSQL := fmt.Sprintf(`SELECT id, uuid, event_model_id, schema, first_seen, last_seen, total_count FROM %s WHERE uuid = '%s'`, SCHEMA_VERSIONS_TABLE, id) - - rows, err := manager.dbHandle.Query(schemaVersionsSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - schemaVersions := make([]*SchemaVersionT, 0) - - for rows.Next() { - var schemaVersion SchemaVersionT - err := rows.Scan(&schemaVersion.ID, &schemaVersion.UUID, &schemaVersion.EventModelID, &schemaVersion.Schema, &schemaVersion.FirstSeen, &schemaVersion.LastSeen, &schemaVersion.TotalCount) - assertError(err) - - schemaVersions = append(schemaVersions, &schemaVersion) - } - - if len(schemaVersions) == 0 { - err = fmt.Errorf("No SchemaVersion found for given VersionID : %s", id) - return nil, err - } - - if len(schemaVersions) > 1 { - panic(fmt.Sprintf("More than one entry found for eventVersionID : %s. Make sure a unique key constraint is present on uuid column", id)) - } - - return schemaVersions[0], nil -} - -func (manager *EventSchemaManagerT) fetchMetadataByEventVersionID(eventVersionID string) (metadata *MetaDataT, err error) { - metadataSelectSQL := fmt.Sprintf(`SELECT metadata FROM %s WHERE uuid = '%s'`, SCHEMA_VERSIONS_TABLE, eventVersionID) - - rows, err := manager.dbHandle.Query(metadataSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - metadatas := make([]*MetaDataT, 0) - - for rows.Next() { - var metadataRaw []byte - err := rows.Scan(&metadataRaw) - assertError(err) - - var metadata MetaDataT - err = json.Unmarshal(metadataRaw, &metadata) - assertError(err) - metadatas = append(metadatas, &metadata) - } - - if len(metadatas) > 1 { - err = fmt.Errorf("More than one entry found for eventVersionID : %s. Make sure a unique key constraint is present on uuid column", eventVersionID) - assertError(err) - } - - if len(metadatas) == 0 { - err = fmt.Errorf("No Metadata found for given VersionID : %s", eventVersionID) - return nil, err - } - - metadata = metadatas[0] - return -} - -func (manager *EventSchemaManagerT) fetchMetadataByEventModelID(eventModelID string) (metadata *MetaDataT, err error) { - metadataSelectSQL := fmt.Sprintf(`SELECT metadata FROM %s WHERE uuid = '%s'`, EVENT_MODELS_TABLE, eventModelID) - - rows, err := manager.dbHandle.Query(metadataSelectSQL) - assertError(err) - defer func() { _ = rows.Close() }() - - metadatas := make([]*MetaDataT, 0) - - for rows.Next() { - var metadataRaw []byte - err := rows.Scan(&metadataRaw) - assertError(err) - - var metadata MetaDataT - err = json.Unmarshal(metadataRaw, &metadata) - assertError(err) - metadatas = append(metadatas, &metadata) - } - - if len(metadatas) > 1 { - err = fmt.Errorf("More than one entry found for eventVersionID : %s. Make sure a unique key constraint is present on uuid column", eventModelID) - assertError(err) - } - - if len(metadatas) == 0 { - err = fmt.Errorf("No Metadata found for given VersionID : %s", eventModelID) - return nil, err - } - - metadata = metadatas[0] - return -} diff --git a/event-schema/event_schema_suite_test.go b/event-schema/event_schema_suite_test.go deleted file mode 100644 index 6f5ef9aae0..0000000000 --- a/event-schema/event_schema_suite_test.go +++ /dev/null @@ -1,13 +0,0 @@ -package event_schema - -import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func TestEventSchemas(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "Event Schemas Suite") -} diff --git a/event-schema/event_schema_test.go b/event-schema/event_schema_test.go deleted file mode 100644 index da2f0629a8..0000000000 --- a/event-schema/event_schema_test.go +++ /dev/null @@ -1,288 +0,0 @@ -package event_schema - -import ( - "context" - "database/sql" - "encoding/json" - "fmt" - "log" - "os" - "testing" - - "github.com/jeremywohl/flatten" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/ory/dockertest/v3" - "github.com/stretchr/testify/require" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource" - "github.com/rudderlabs/rudder-server/admin" - migrator "github.com/rudderlabs/rudder-server/services/sql-migrator" - "github.com/rudderlabs/rudder-server/testhelper" -) - -type envSetter interface { - Setenv(key, value string) -} - -func setupDB(es envSetter, cleanup *testhelper.Cleanup) (*resource.PostgresResource, error) { - log.Println("Initialize the database here with the necessary table structures.") - - pool, err := dockertest.NewPool("") - if err != nil { - return nil, fmt.Errorf("Unable to bring up pool for creating containers, err: %w", err) - } - - pgResource, err := resource.SetupPostgres(pool, cleanup) - if err != nil { - return nil, fmt.Errorf("Unable to setup the postgres container, err: %w", err) - } - - mg := &migrator.Migrator{ - Handle: pgResource.DB, - MigrationsTable: "node_migrations", - ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true), - } - - err = mg.Migrate("node") - if err != nil { - return nil, fmt.Errorf("Unable to run the migrations for the node, err: %w", err) - } - - // Load self configuration. - config.Reset() - - logger.Reset() // Initialize the logger - Init2() - Init() - - // Setup the supporting services like jobsdb - jobsDBInit(es, pgResource) - - return pgResource, nil -} - -func jobsDBInit(es envSetter, pgResource *resource.PostgresResource) { - // Setup env for jobsdb. - es.Setenv("JOBS_DB_HOST", pgResource.Host) - es.Setenv("JOBS_DB_USER", pgResource.User) - es.Setenv("JOBS_DB_PASSWORD", pgResource.Password) - es.Setenv("JOBS_DB_DB_NAME", pgResource.Database) - es.Setenv("JOBS_DB_PORT", pgResource.Port) - - admin.Init() -} - -var _ = Describe("Event Schemas", Ordered, func() { - var pgResource *resource.PostgresResource - var err error - cleanup := &testhelper.Cleanup{} - - BeforeAll(func() { - pgResource, err = setupDB(GinkgoT(), cleanup) - - if err != nil { - Fail(fmt.Sprintf("unable to setup the postgres resource: %s", err.Error())) - } - }) - - Describe("Event Schema Lifecycle", func() { - Context("when new event is seen", func() { - It("generates new event schema in db", func() { - writeKey := "my-write-key" - manager := getEventSchemaManager( - pgResource.DB, false) - eventStr := `{"batch": [{"type": "track", "event": "Demo Track", "sentAt": "2019-08-12T05:08:30.909Z", "channel": "android-sdk", "context": {"app": {"name": "RudderAndroidClient", "build": "1", "version": "1.0", "namespace": "com.rudderlabs.android.sdk"}, "device": {"id": "49e4bdd1c280bc00", "name": "generic_x86", "model": "Android SDK built for x86", "manufacturer": "Google"}, "locale": "en-US", "screen": {"width": 1080, "height": 1794, "density": 420}, "traits": {"anonymousId": "49e4bdd1c280bc00"}, "library": {"name": "com.rudderstack.android.sdk.core"}, "network": {"carrier": "Android"}, "user_agent": "Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"}, "rudderId": "90ca6da0-292e-4e79-9880-f8009e0ae4a3", "messageId": "a82717d0-b939-47bd-9592-59bbbea66c1a", "properties": {"label": "Demo Label", "value": 5, "testMap": {"t1": "a", "t2": 4}, "category": "Demo Category", "floatVal": 4.501, "testArray": [{"id": "elem1", "value": "e1"}, {"id": "elem2", "value": "e2"}]}, "anonymousId": "anon_id", "integrations": {"All": true}, "originalTimestamp": "2019-08-12T05:08:30.909Z"}], "writeKey": "my-write-key", "requestIP": "127.0.0.1", "receivedAt": "2022-06-15T13:51:08.754+05:30"}` - var eventPayload EventPayloadT - err := json.Unmarshal([]byte(eventStr), &eventPayload) - Expect(err).To(BeNil(), "Invalid request payload for unmarshalling") - - // Send event across a new write key - manager.handleEvent(writeKey, eventPayload.Batch[0]) - - // Event Model gets stored in the in-memory map - eventModel := manager.eventModelMap[WriteKey(writeKey)][("track")][("Demo Track")] - Expect(eventModel.UUID).ToNot(BeEmpty()) - Expect(eventModel.EventIdentifier).To(BeEquivalentTo("Demo Track")) - - // Schema Version gets stored in the in-memory map for event model. - eventMap := map[string]interface{}(eventPayload.Batch[0]) - flattenedEvent, _ := flatten.Flatten(eventMap, "", flatten.DotStyle) - hash := getSchemaHash(getSchema(flattenedEvent)) - - schemaVersion := manager.schemaVersionMap[eventModel.UUID][hash] - Expect(schemaVersion).ToNot(BeNil()) - }) - }) - }) - - Describe("Event Schema Frequency Counters", func() { - Context("when frequency counters are limited", func() { - BeforeEach(func() { - frequencyCounterLimit = 200 - }) - - It("trims frequency counters for event model reading from db", func() { - writeKey := "my-write-key" - - manager := getEventSchemaManager(pgResource.DB, false) - eventStr := `{"batch": [{"type": "track", "event": "Demo Track", "properties": {"label": "Demo Label", "value": 5 }}], "writeKey": "my-write-key", "requestIP": "127.0.0.1", "receivedAt": "2022-06-15T13:51:08.754+05:30"}` - var eventPayload EventPayloadT - err := json.Unmarshal([]byte(eventStr), &eventPayload) - Expect(err).To(BeNil()) - - manager.handleEvent(writeKey, eventPayload.Batch[0]) - eventModel := manager.eventModelMap[WriteKey(writeKey)]["track"]["Demo Track"] - - // Push the events to DB - manager.flushEventSchemasToDB(context.TODO()) - - // Bound the frequency counters to 3 - frequencyCounterLimit = 3 - - // reload the models from the database which should now respect - // that frequency counters have now been bounded. - manager.handleEvent(writeKey, eventPayload.Batch[0]) - Expect(len(countersCache[eventModel.UUID])).To(BeEquivalentTo(3)) - - // flush the events back to the database. - err = manager.flushEventSchemasToDB(context.TODO()) - Expect(err).To(BeNil()) - - freqCounters, err := getFrequencyCountersForEventModel(manager.dbHandle, eventModel.UUID) - Expect(err).To(BeNil()) - Expect(len(freqCounters)).To(BeEquivalentTo(3)) - }) - - It("trims frequency counters for event model reading from in-memory", func() { - writeKey := "my-write-key-in-memory" - - manager := getEventSchemaManager(pgResource.DB, false) - eventStr := `{"batch": [{"type": "track", "event": "Demo Track", "properties": {"label": "Demo Label", "value": 5 }}], "writeKey": "my-write-key", "requestIP": "127.0.0.1", "receivedAt": "2022-06-15T13:51:08.754+05:30"}` - var eventPayload EventPayloadT - json.Unmarshal([]byte(eventStr), &eventPayload) - - manager.handleEvent(writeKey, eventPayload.Batch[0]) - eventModel := manager.eventModelMap[WriteKey(writeKey)]["track"]["Demo Track"] - - // Bound the frequency counters to 3 - frequencyCounterLimit = 3 - - // When we receive another event for loaded in memory, we should - // prune the frequency counters to the limit - manager.handleEvent(writeKey, eventPayload.Batch[0]) - Expect(len(countersCache[eventModel.UUID])).To(BeEquivalentTo(frequencyCounterLimit)) - }) - }) - }) - - AfterAll(func() { - // clean up the resources. - cleanup.Run() - }) -}) - -func getFrequencyCountersForEventModel(handle *sql.DB, uuid string) ([]*FrequencyCounter, error) { - query := `SELECT private_data FROM event_models WHERE uuid = $1` - var privateDataRaw json.RawMessage - err := handle.QueryRow(query, uuid).Scan(&privateDataRaw) - if err != nil { - return nil, err - } - - var privateData PrivateDataT - err = json.Unmarshal(privateDataRaw, &privateData) - if err != nil { - return nil, err - } - - return privateData.FrequencyCounters, nil -} - -func TestPruneFrequencyCounters(t *testing.T) { - inputs := []struct { - hash string - counters map[string]*FrequencyCounter - bound int - countersRemaining int - }{ - { - "prune-schema-hash-1", - map[string]*FrequencyCounter{ - "k1": {}, - "k2": {}, - "k3": {}, - }, - 2, 2, // 1 extra element will be removed - }, - { - "prune-schema-hash-2", - map[string]*FrequencyCounter{ - "k1": {}, - "k2": {}, - }, - 0, 0, // all the entries will be removed as bound is 0 - }, - { - "prune-schema-hash-2", - map[string]*FrequencyCounter{ - "k1": {}, - "k2": {}, - }, - 3, 2, // all the entries will be intact as bound > existing entries - }, - } - - for _, input := range inputs { - t.Logf("Pruning for event schema hash: %s", input.hash) - countersCache[input.hash] = input.counters - pruneFrequencyCounters(input.hash, input.bound) - - require.Equal(t, len(countersCache[input.hash]), input.countersRemaining) - } -} - -func BenchmarkEventSchemaHandleEvent(b *testing.B) { - b.Log("Benchmarking the handling event of event schema") - - cleanup := &testhelper.Cleanup{} - pgResource, err := setupDB(b, cleanup) - if err != nil { - b.Errorf(fmt.Sprintf("unable to setup db resource: %s", err.Error())) - return - } - - defer cleanup.Run() - - byt, err := os.ReadFile("testdata/test_input.json") - if err != nil { - b.Errorf("Unable to perform benchmark test as unable to read the input value") - return - } - - var eventPayload EventPayloadT - if err := json.Unmarshal(byt, &eventPayload); err != nil { - b.Errorf("Invalid request payload for unmarshalling: %v", err.Error()) - return - } - - manager := EventSchemaManagerT{ - dbHandle: pgResource.DB, - disableInMemoryCache: false, - eventModelMap: EventModelMapT{}, - schemaVersionMap: SchemaVersionMapT{}, - } - - // frequencyCounterLimit = ? - for i := 0; i < b.N; i++ { - manager.handleEvent("dummy-key", eventPayload.Batch[0]) - } - - // flush the event schemas to the database. - err = manager.flushEventSchemasToDB(context.TODO()) - if err != nil { - b.Errorf("Unable to flush events back to database") - } -} diff --git a/event-schema/sampler.go b/event-schema/sampler.go deleted file mode 100644 index d30c4d07c8..0000000000 --- a/event-schema/sampler.go +++ /dev/null @@ -1,65 +0,0 @@ -package event_schema - -import ( - "math/rand" - "sync" - "time" -) - -type ReservoirSample struct { - reservoirSize int - currSize int - totalCount int64 - rand *rand.Rand - lock sync.RWMutex - sampleEvents []interface{} -} - -func NewReservoirSampler(reservoirSize, currSize int, totalCount int64) (rs *ReservoirSample) { - reservoirSampler := new(ReservoirSample) - reservoirSampler.currSize = currSize - reservoirSampler.reservoirSize = reservoirSize - reservoirSampler.totalCount = totalCount - reservoirSampler.rand = rand.New(rand.NewSource(time.Now().UnixNano())) - reservoirSampler.sampleEvents = make([]interface{}, reservoirSize) - - return reservoirSampler -} - -func (rs *ReservoirSample) add(item interface{}, incTotal bool) { - if item == nil { - pkgLogger.Debug("Debug : Trying to add an empty event in reservoir sample") - return - } - rs.lock.Lock() - defer rs.lock.Unlock() - - if incTotal { - rs.totalCount++ - } - - if rs.currSize < rs.reservoirSize { - rs.sampleEvents[rs.currSize] = item - rs.currSize++ - } else { - if i := rs.rand.Int63n(rs.totalCount); i < int64(rs.reservoirSize) { - rs.sampleEvents[i] = item - } - } -} - -func (rs *ReservoirSample) getSamples() []interface{} { - rs.lock.RLock() - defer rs.lock.RUnlock() - - dst := make([]interface{}, rs.currSize) - _ = copy(dst, rs.sampleEvents[:rs.currSize]) - return dst -} - -func (rs *ReservoirSample) getTotalCount() int64 { - rs.lock.RLock() - defer rs.lock.RUnlock() - - return rs.totalCount -} diff --git a/event-schema/setup.go b/event-schema/setup.go deleted file mode 100644 index 5ab6cca9c1..0000000000 --- a/event-schema/setup.go +++ /dev/null @@ -1,31 +0,0 @@ -package event_schema - -import ( - "strings" - "sync" - - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-server/app" - "github.com/rudderlabs/rudder-server/utils/types" -) - -var ( - eventSchemaManager types.EventSchemasI - eventSchemaManagerLock sync.RWMutex -) - -// GetInstance returns an instance of EventSchemaManagerT -func GetInstance() types.EventSchemasI { - pkgLogger.Info("[[ EventSchemas ]] Setting up EventSchemas FeatureValue") - eventSchemaManagerLock.Lock() - defer eventSchemaManagerLock.Unlock() - if eventSchemaManager == nil { - appTypeStr := strings.ToUpper(config.GetString("APP_TYPE", app.EMBEDDED)) - schemaManager := getEventSchemaManager( - createDBConnection(), - appTypeStr == app.GATEWAY) - schemaManager.Setup() // Kickoff the corresponding supporting services. - eventSchemaManager = schemaManager - } - return eventSchemaManager -} diff --git a/event-schema/testdata/test_input.json b/event-schema/testdata/test_input.json deleted file mode 100644 index a1e0418592..0000000000 --- a/event-schema/testdata/test_input.json +++ /dev/null @@ -1 +0,0 @@ -{"batch":[{"0":0,"1":1,"2":2,"3":3,"4":4,"5":5,"6":6,"7":7,"8":8,"9":9,"10":10,"11":11,"12":12,"13":13,"14":14,"15":15,"16":16,"17":17,"18":18,"19":19,"20":20,"21":21,"22":22,"23":23,"24":24,"25":25,"26":26,"27":27,"28":28,"29":29,"30":30,"31":31,"32":32,"33":33,"34":34,"35":35,"36":36,"37":37,"38":38,"39":39,"40":40,"41":41,"42":42,"43":43,"44":44,"45":45,"46":46,"47":47,"48":48,"49":49,"50":50,"51":51,"52":52,"53":53,"54":54,"55":55,"56":56,"57":57,"58":58,"59":59,"60":60,"61":61,"62":62,"63":63,"64":64,"65":65,"66":66,"67":67,"68":68,"69":69,"70":70,"71":71,"72":72,"73":73,"74":74,"75":75,"76":76,"77":77,"78":78,"79":79,"80":80,"81":81,"82":82,"83":83,"84":84,"85":85,"86":86,"87":87,"88":88,"89":89,"90":90,"91":91,"92":92,"93":93,"94":94,"95":95,"96":96,"97":97,"98":98,"99":99,"100":100,"101":101,"102":102,"103":103,"104":104,"105":105,"106":106,"107":107,"108":108,"109":109,"110":110,"111":111,"112":112,"113":113,"114":114,"115":115,"116":116,"117":117,"118":118,"119":119,"120":120,"121":121,"122":122,"123":123,"124":124,"125":125,"126":126,"127":127,"128":128,"129":129,"130":130,"131":131,"132":132,"133":133,"134":134,"135":135,"136":136,"137":137,"138":138,"139":139,"140":140,"141":141,"142":142,"143":143,"144":144,"145":145,"146":146,"147":147,"148":148,"149":149,"150":150,"151":151,"152":152,"153":153,"154":154,"155":155,"156":156,"157":157,"158":158,"159":159,"160":160,"161":161,"162":162,"163":163,"164":164,"165":165,"166":166,"167":167,"168":168,"169":169,"170":170,"171":171,"172":172,"173":173,"174":174,"175":175,"176":176,"177":177,"178":178,"179":179,"180":180,"181":181,"182":182,"183":183,"184":184,"185":185,"186":186,"187":187,"188":188,"189":189,"190":190,"191":191,"192":192,"193":193,"194":194,"195":195,"196":196,"197":197,"198":198,"199":199,"200":200,"201":201,"202":202,"203":203,"204":204,"205":205,"206":206,"207":207,"208":208,"209":209,"210":210,"211":211,"212":212,"213":213,"214":214,"215":215,"216":216,"217":217,"218":218,"219":219,"220":220,"221":221,"222":222,"223":223,"224":224,"225":225,"226":226,"227":227,"228":228,"229":229,"230":230,"231":231,"232":232,"233":233,"234":234,"235":235,"236":236,"237":237,"238":238,"239":239,"240":240,"241":241,"242":242,"243":243,"244":244,"245":245,"246":246,"247":247,"248":248,"249":249,"250":250,"251":251,"252":252,"253":253,"254":254,"255":255,"256":256,"257":257,"258":258,"259":259,"260":260,"261":261,"262":262,"263":263,"264":264,"265":265,"266":266,"267":267,"268":268,"269":269,"270":270,"271":271,"272":272,"273":273,"274":274,"275":275,"276":276,"277":277,"278":278,"279":279,"280":280,"281":281,"282":282,"283":283,"284":284,"285":285,"286":286,"287":287,"288":288,"289":289,"290":290,"291":291,"292":292,"293":293,"294":294,"295":295,"296":296,"297":297,"298":298,"299":299,"300":300,"301":301,"302":302,"303":303,"304":304,"305":305,"306":306,"307":307,"308":308,"309":309,"310":310,"311":311,"312":312,"313":313,"314":314,"315":315,"316":316,"317":317,"318":318,"319":319,"320":320,"321":321,"322":322,"323":323,"324":324,"325":325,"326":326,"327":327,"328":328,"329":329,"330":330,"331":331,"332":332,"333":333,"334":334,"335":335,"336":336,"337":337,"338":338,"339":339,"340":340,"341":341,"342":342,"343":343,"344":344,"345":345,"346":346,"347":347,"348":348,"349":349,"350":350,"351":351,"352":352,"353":353,"354":354,"355":355,"356":356,"357":357,"358":358,"359":359,"360":360,"361":361,"362":362,"363":363,"364":364,"365":365,"366":366,"367":367,"368":368,"369":369,"370":370,"371":371,"372":372,"373":373,"374":374,"375":375,"376":376,"377":377,"378":378,"379":379,"380":380,"381":381,"382":382,"383":383,"384":384,"385":385,"386":386,"387":387,"388":388,"389":389,"390":390,"391":391,"392":392,"393":393,"394":394,"395":395,"396":396,"397":397,"398":398,"399":399,"400":400,"401":401,"402":402,"403":403,"404":404,"405":405,"406":406,"407":407,"408":408,"409":409,"410":410,"411":411,"412":412,"413":413,"414":414,"415":415,"416":416,"417":417,"418":418,"419":419,"420":420,"421":421,"422":422,"423":423,"424":424,"425":425,"426":426,"427":427,"428":428,"429":429,"430":430,"431":431,"432":432,"433":433,"434":434,"435":435,"436":436,"437":437,"438":438,"439":439,"440":440,"441":441,"442":442,"443":443,"444":444,"445":445,"446":446,"447":447,"448":448,"449":449,"450":450,"451":451,"452":452,"453":453,"454":454,"455":455,"456":456,"457":457,"458":458,"459":459,"460":460,"461":461,"462":462,"463":463,"464":464,"465":465,"466":466,"467":467,"468":468,"469":469,"470":470,"471":471,"472":472,"473":473,"474":474,"475":475,"476":476,"477":477,"478":478,"479":479,"480":480,"481":481,"482":482,"483":483,"484":484,"485":485,"486":486,"487":487,"488":488,"489":489,"490":490,"491":491,"492":492,"493":493,"494":494,"495":495,"496":496,"497":497,"498":498,"499":499,"500":500,"501":501,"502":502,"503":503,"504":504,"505":505,"506":506,"507":507,"508":508,"509":509,"510":510,"511":511,"512":512,"513":513,"514":514,"515":515,"516":516,"517":517,"518":518,"519":519,"520":520,"521":521,"522":522,"523":523,"524":524,"525":525,"526":526,"527":527,"528":528,"529":529,"530":530,"531":531,"532":532,"533":533,"534":534,"535":535,"536":536,"537":537,"538":538,"539":539,"540":540,"541":541,"542":542,"543":543,"544":544,"545":545,"546":546,"547":547,"548":548,"549":549,"550":550,"551":551,"552":552,"553":553,"554":554,"555":555,"556":556,"557":557,"558":558,"559":559,"560":560,"561":561,"562":562,"563":563,"564":564,"565":565,"566":566,"567":567,"568":568,"569":569,"570":570,"571":571,"572":572,"573":573,"574":574,"575":575,"576":576,"577":577,"578":578,"579":579,"580":580,"581":581,"582":582,"583":583,"584":584,"585":585,"586":586,"587":587,"588":588,"589":589,"590":590,"591":591,"592":592,"593":593,"594":594,"595":595,"596":596,"597":597,"598":598,"599":599,"600":600,"601":601,"602":602,"603":603,"604":604,"605":605,"606":606,"607":607,"608":608,"609":609,"610":610,"611":611,"612":612,"613":613,"614":614,"615":615,"616":616,"617":617,"618":618,"619":619,"620":620,"621":621,"622":622,"623":623,"624":624,"625":625,"626":626,"627":627,"628":628,"629":629,"630":630,"631":631,"632":632,"633":633,"634":634,"635":635,"636":636,"637":637,"638":638,"639":639,"640":640,"641":641,"642":642,"643":643,"644":644,"645":645,"646":646,"647":647,"648":648,"649":649,"650":650,"651":651,"652":652,"653":653,"654":654,"655":655,"656":656,"657":657,"658":658,"659":659,"660":660,"661":661,"662":662,"663":663,"664":664,"665":665,"666":666,"667":667,"668":668,"669":669,"670":670,"671":671,"672":672,"673":673,"674":674,"675":675,"676":676,"677":677,"678":678,"679":679,"680":680,"681":681,"682":682,"683":683,"684":684,"685":685,"686":686,"687":687,"688":688,"689":689,"690":690,"691":691,"692":692,"693":693,"694":694,"695":695,"696":696,"697":697,"698":698,"699":699,"700":700,"701":701,"702":702,"703":703,"704":704,"705":705,"706":706,"707":707,"708":708,"709":709,"710":710,"711":711,"712":712,"713":713,"714":714,"715":715,"716":716,"717":717,"718":718,"719":719,"720":720,"721":721,"722":722,"723":723,"724":724,"725":725,"726":726,"727":727,"728":728,"729":729,"730":730,"731":731,"732":732,"733":733,"734":734,"735":735,"736":736,"737":737,"738":738,"739":739,"740":740,"741":741,"742":742,"743":743,"744":744,"745":745,"746":746,"747":747,"748":748,"749":749,"750":750,"751":751,"752":752,"753":753,"754":754,"755":755,"756":756,"757":757,"758":758,"759":759,"760":760,"761":761,"762":762,"763":763,"764":764,"765":765,"766":766,"767":767,"768":768,"769":769,"770":770,"771":771,"772":772,"773":773,"774":774,"775":775,"776":776,"777":777,"778":778,"779":779,"780":780,"781":781,"782":782,"783":783,"784":784,"785":785,"786":786,"787":787,"788":788,"789":789,"790":790,"791":791,"792":792,"793":793,"794":794,"795":795,"796":796,"797":797,"798":798,"799":799,"800":800,"801":801,"802":802,"803":803,"804":804,"805":805,"806":806,"807":807,"808":808,"809":809,"810":810,"811":811,"812":812,"813":813,"814":814,"815":815,"816":816,"817":817,"818":818,"819":819,"820":820,"821":821,"822":822,"823":823,"824":824,"825":825,"826":826,"827":827,"828":828,"829":829,"830":830,"831":831,"832":832,"833":833,"834":834,"835":835,"836":836,"837":837,"838":838,"839":839,"840":840,"841":841,"842":842,"843":843,"844":844,"845":845,"846":846,"847":847,"848":848,"849":849,"850":850,"851":851,"852":852,"853":853,"854":854,"855":855,"856":856,"857":857,"858":858,"859":859,"860":860,"861":861,"862":862,"863":863,"864":864,"865":865,"866":866,"867":867,"868":868,"869":869,"870":870,"871":871,"872":872,"873":873,"874":874,"875":875,"876":876,"877":877,"878":878,"879":879,"880":880,"881":881,"882":882,"883":883,"884":884,"885":885,"886":886,"887":887,"888":888,"889":889,"890":890,"891":891,"892":892,"893":893,"894":894,"895":895,"896":896,"897":897,"898":898,"899":899,"900":900,"901":901,"902":902,"903":903,"904":904,"905":905,"906":906,"907":907,"908":908,"909":909,"910":910,"911":911,"912":912,"913":913,"914":914,"915":915,"916":916,"917":917,"918":918,"919":919,"920":920,"921":921,"922":922,"923":923,"924":924,"925":925,"926":926,"927":927,"928":928,"929":929,"930":930,"931":931,"932":932,"933":933,"934":934,"935":935,"936":936,"937":937,"938":938,"939":939,"940":940,"941":941,"942":942,"943":943,"944":944,"945":945,"946":946,"947":947,"948":948,"949":949,"950":950,"951":951,"952":952,"953":953,"954":954,"955":955,"956":956,"957":957,"958":958,"959":959,"960":960,"961":961,"962":962,"963":963,"964":964,"965":965,"966":966,"967":967,"968":968,"969":969,"970":970,"971":971,"972":972,"973":973,"974":974,"975":975,"976":976,"977":977,"978":978,"979":979,"980":980,"981":981,"982":982,"983":983,"984":984,"985":985,"986":986,"987":987,"988":988,"989":989,"990":990,"991":991,"992":992,"993":993,"994":994,"995":995,"996":996,"997":997,"998":998,"999":999,"type":"track","event":"Demo Track","sentAt":"2019-08-12T05:08:30.909Z","channel":"android-sdk","context":{"app":{"name":"RudderAndroidClient","build":"1","version":"1.0","namespace":"com.rudderlabs.android.sdk"},"device":{"id":"49e4bdd1c280bc00","name":"generic_x86","model":"Android SDK built for x86","manufacturer":"Google"},"locale":"en-US","screen":{"width":1080,"height":1794,"density":420},"traits":{"anonymousId":"49e4bdd1c280bc00"},"library":{"name":"com.rudderstack.android.sdk.core"},"network":{"carrier":"Android"},"user_agent":"Dalvik/2.1.0 (Linux; U; Android 9; Android SDK built for x86 Build/PSR1.180720.075)"},"rudderId":"90ca6da0-292e-4e79-9880-f8009e0ae4a3","messageId":"a82717d0-b939-47bd-9592-59bbbea66c1a","properties":{"label":"Demo Label","value":5,"testMap":{"t1":"a","t2":4},"category":"Demo Category","floatVal":4.501,"testArray":[{"id":"elem1","value":"e1"},{"id":"elem2","value":"e2"}]},"anonymousId":"anon_id","integrations":{"All":true},"originalTimestamp":"2019-08-12T05:08:30.909Z"}],"writeKey":"29grCPTQ6XrVOSK76M7vfE5BWjb","requestIP":"127.0.0.1","receivedAt":"2022-06-15T13:51:08.754+05:30"} diff --git a/gateway/handle.go b/gateway/handle.go index 81fc8928c5..9bc30f3cdb 100644 --- a/gateway/handle.go +++ b/gateway/handle.go @@ -73,7 +73,6 @@ type Handle struct { webhook webhook.Webhook whProxy http.Handler suppressUserHandler types.UserSuppression - eventSchemaHandler types.EventSchemasI backgroundCancel context.CancelFunc backgroundWait func() error userWebRequestWorkers []*userWebRequestWorkerT @@ -102,7 +101,6 @@ type Handle struct { maxReqSize misc.ValueLoader[int] enableRateLimit misc.ValueLoader[bool] enableSuppressUserFeature bool - enableEventSchemasFeature bool diagnosisTickerTime time.Duration ReadTimeout time.Duration ReadHeaderTimeout time.Duration diff --git a/gateway/handle_http.go b/gateway/handle_http.go index 467fe8768d..7c7eea0b6c 100644 --- a/gateway/handle_http.go +++ b/gateway/handle_http.go @@ -64,24 +64,6 @@ func (*Handle) robotsHandler(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte("User-agent: * \nDisallow: / \n")) } -// eventSchemaController middleware checks if the event schemas feature is enabled. If not, it returns a 400 response -func (gw *Handle) eventSchemaController(wrappedFunc func(http.ResponseWriter, *http.Request)) func(http.ResponseWriter, *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - if !gw.conf.enableEventSchemasFeature { - status := http.StatusBadRequest - responseBody := response.MakeResponse("EventSchemas feature is disabled") - gw.logger.Infow("response", - "ip", misc.GetIPFromReq(r), - "path", r.URL.Path, - "status", status, - "body", responseBody) - http.Error(w, responseBody, status) - return - } - wrappedFunc(w, r) - } -} - // webHandler - regular web request handler func (gw *Handle) webHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { diff --git a/gateway/handle_lifecycle.go b/gateway/handle_lifecycle.go index 7745b72afb..8a9c123bdd 100644 --- a/gateway/handle_lifecycle.go +++ b/gateway/handle_lifecycle.go @@ -24,7 +24,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/app" backendconfig "github.com/rudderlabs/rudder-server/backend-config" - event_schema "github.com/rudderlabs/rudder-server/event-schema" "github.com/rudderlabs/rudder-server/gateway/throttler" "github.com/rudderlabs/rudder-server/gateway/webhook" "github.com/rudderlabs/rudder-server/jobsdb" @@ -89,8 +88,6 @@ func (gw *Handle) Setup( gw.conf.enableRateLimit = config.GetReloadableBoolVar(false, "Gateway.enableRateLimit") // Enable suppress user feature. false by default gw.conf.enableSuppressUserFeature = config.GetBoolVar(true, "Gateway.enableSuppressUserFeature") - // EventSchemas feature. false by default - gw.conf.enableEventSchemasFeature = config.GetBoolVar(false, "EventSchemas.enableEventSchemasFeature") // Time period for diagnosis ticker gw.conf.diagnosisTickerTime = config.GetDurationVar(60, time.Second, "Diagnostics.gatewayTimePeriod", "Diagnostics.gatewayTimePeriodInS") gw.conf.ReadTimeout = config.GetDurationVar(0, time.Second, "ReadTimeout", "ReadTimeOutInSec") @@ -132,9 +129,6 @@ func (gw *Handle) Setup( return fmt.Errorf("could not setup suppress user feature: %w", err) } } - if gw.conf.enableEventSchemasFeature { - gw.eventSchemaHandler = event_schema.GetInstance() - } for _, opt := range opts { opt(gw) @@ -443,18 +437,6 @@ func (gw *Handle) StartWebHandler(ctx context.Context) error { srvMux.Get("/version", withContentType("application/json; charset=utf-8", gw.versionHandler)) srvMux.Get("/robots.txt", gw.robotsHandler) - if gw.conf.enableEventSchemasFeature { - srvMux.Route("/schemas", func(r chi.Router) { - r.Get("/event-models", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetEventModels))) - r.Get("/event-versions", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetEventVersions))) - r.Get("/event-model/{EventID}/key-counts", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetKeyCounts))) - r.Get("/event-model/{EventID}/metadata", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetEventModelMetadata))) - r.Get("/event-version/{VersionID}/metadata", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetSchemaVersionMetadata))) - r.Get("/event-version/{VersionID}/missing-keys", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetSchemaVersionMissingKeys))) - r.Get("/event-models/json-schemas", withContentType("application/json; charset=utf-8", gw.eventSchemaController(gw.eventSchemaHandler.GetJsonSchemas))) - }) - } - c := cors.New(cors.Options{ AllowOriginFunc: func(_ string) bool { return true }, AllowCredentials: true, diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index 1660e53be6..cb038b768b 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -9,7 +9,6 @@ import ( "context" "database/sql" b64 "encoding/base64" - "encoding/json" "fmt" "io" "net/http" @@ -66,12 +65,6 @@ var ( VersionID string ) -type eventSchemasObject struct { - EventID string - EventType string - VersionID string -} - type event struct { anonymousID string userID string @@ -261,105 +254,6 @@ func TestMainFlow(t *testing.T) { t.Log("Processed", msgCount, "messages") }) - t.Run("event-models", func(t *testing.T) { - // GET /schemas/event-models - url := fmt.Sprintf("http://localhost:%s/schemas/event-models", httpPort) - method := "GET" - resBody, _ := getEvent(url, method) - require.Eventually(t, func() bool { - // Similarly, pole until the Event Schema Tables are updated - resBody, _ = getEvent(url, method) - return resBody != "[]" - }, time.Minute, 10*time.Millisecond) - require.NotEqual(t, resBody, "[]") - b := []byte(resBody) - var eventSchemas []eventSchemasObject - - err := json.Unmarshal(b, &eventSchemas) - if err != nil { - t.Log(err) - } - for k := range eventSchemas { - if eventSchemas[k].EventType == "page" { - EventID = eventSchemas[k].EventID - } - } - require.NotEqual(t, EventID, "") - }) - - t.Run("event-versions", func(t *testing.T) { - // GET /schemas/event-versions - url := fmt.Sprintf("http://localhost:%s/schemas/event-versions?EventID=%s", httpPort, EventID) - method := "GET" - resBody, err := getEvent(url, method) - require.NoError(t, err) - require.Contains(t, resBody, EventID) - - b := []byte(resBody) - var eventSchemas []eventSchemasObject - - err = json.Unmarshal(b, &eventSchemas) - require.NoError(t, err) - if err != nil { - t.Log(err) - } - VersionID = eventSchemas[0].VersionID - t.Log("Test Schemas Event ID's VersionID:", VersionID) - }) - - t.Run("event-model-key-counts", func(t *testing.T) { - // GET schemas/event-model/{EventID}/key-counts - url := fmt.Sprintf("http://localhost:%s/schemas/event-model/%s/key-counts", httpPort, EventID) - method := "GET" - resBody, err := getEvent(url, method) - require.NoError(t, err) - require.Contains(t, resBody, "messageId") - }) - - t.Run("event-model-metadata", func(t *testing.T) { - // GET /schemas/event-model/{EventID}/metadata - url := fmt.Sprintf("http://localhost:%s/schemas/event-model/%s/metadata", httpPort, EventID) - method := "GET" - resBody, err := getEvent(url, method) - require.NoError(t, err) - require.Contains(t, resBody, "messageId") - }) - - t.Run("event-version-metadata", func(t *testing.T) { - // GET /schemas/event-version/{VersionID}/metadata - url := fmt.Sprintf("http://localhost:%s/schemas/event-version/%s/metadata", httpPort, VersionID) - method := "GET" - resBody, err := getEvent(url, method) - require.NoError(t, err) - require.Contains(t, resBody, "messageId") - }) - - t.Run("event-version-missing-keys", func(t *testing.T) { - // GET /schemas/event-version/{VersionID}/metadata - url := fmt.Sprintf("http://localhost:%s/schemas/event-version/%s/missing-keys", httpPort, VersionID) - method := "GET" - resBody, err := getEvent(url, method) - require.NoError(t, err) - require.Contains(t, resBody, "originalTimestamp") - require.Contains(t, resBody, "sentAt") - require.Contains(t, resBody, "channel") - require.Contains(t, resBody, "integrations.All") - }) - - t.Run("event-models-json-schemas", func(t *testing.T) { - // GET /schemas/event-models/json-schemas - url := fmt.Sprintf("http://localhost:%s/schemas/event-models/json-schemas", httpPort) - method := "GET" - resBody, err := getEvent(url, method) - require.NoError(t, err) - require.Eventually(t, func() bool { - // Similarly, pole until the Event Schema Tables are updated - resBody, _ = getEvent(url, method) - return resBody != "[]" - }, time.Minute, 10*time.Millisecond) - require.NotEqual(t, resBody, "[]") - }) - t.Run("beacon-batch", func(t *testing.T) { payload := strings.NewReader(`{ "batch":[ diff --git a/processor/processor.go b/processor/processor.go index ae6d8367b0..33ce7a8ea9 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -26,7 +26,6 @@ import ( "github.com/rudderlabs/rudder-go-kit/stats/metric" kitsync "github.com/rudderlabs/rudder-go-kit/sync" backendconfig "github.com/rudderlabs/rudder-server/backend-config" - eventschema "github.com/rudderlabs/rudder-server/event-schema" "github.com/rudderlabs/rudder-server/internal/enricher" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/processor/delayed" @@ -85,7 +84,6 @@ type Handle struct { eventSchemaDB jobsdb.JobsDB archivalDB jobsdb.JobsDB logger logger.Logger - eventSchemaHandler types.EventSchemasI enrichers []enricher.PipelineEnricher dedup dedup.Dedup reporting types.Reporting @@ -133,8 +131,6 @@ type Handle struct { destGenericConsentManagementMap map[string]map[string]GenericConsentManagementProviderData batchDestinations []string configSubscriberLock sync.RWMutex - enableEventSchemasFeature bool - enableEventSchemasAPIOnly misc.ValueLoader[bool] enableDedup bool enableEventCount misc.ValueLoader[bool] transformTimesPQLength int @@ -166,7 +162,6 @@ type processorStats struct { statDBR func(partition string) stats.Measurement statDBW func(partition string) stats.Measurement statLoopTime func(partition string) stats.Measurement - eventSchemasTime func(partition string) stats.Measurement validateEventsTime func(partition string) stats.Measurement processJobsTime func(partition string) stats.Measurement statSessionTransform func(partition string) stats.Measurement @@ -444,11 +439,6 @@ func (proc *Handle) Setup( "partition": partition, }) } - proc.stats.eventSchemasTime = func(partition string) stats.Measurement { - return proc.statsFactory.NewTaggedStat("processor_event_schemas_time", stats.TimerType, stats.Tags{ - "partition": partition, - }) - } proc.stats.validateEventsTime = func(partition string) stats.Measurement { return proc.statsFactory.NewTaggedStat("processor_validate_events_time", stats.TimerType, stats.Tags{ "partition": partition, @@ -590,9 +580,6 @@ func (proc *Handle) Setup( "partition": partition, }) } - if proc.config.enableEventSchemasFeature { - proc.eventSchemaHandler = eventschema.GetInstance() - } if proc.config.enableDedup { proc.dedup = dedup.New(dedup.DefaultPath()) } @@ -766,8 +753,6 @@ func (proc *Handle) loadConfig() { proc.config.subJobSize = config.GetIntVar(defaultSubJobSize, 1, "Processor.subJobSize") // Enable dedup of incoming events by default proc.config.enableDedup = config.GetBoolVar(false, "Dedup.enableDedup") - // EventSchemas feature. false by default - proc.config.enableEventSchemasFeature = config.GetBoolVar(false, "EventSchemas.enableEventSchemasFeature") proc.config.eventSchemaV2Enabled = config.GetBoolVar(false, "EventSchemas2.enabled") proc.config.batchDestinations = misc.BatchDestinations() proc.config.transformTimesPQLength = config.GetIntVar(5, 1, "Processor.transformTimesPQLength") @@ -788,7 +773,6 @@ func (proc *Handle) loadReloadableConfig(defaultPayloadLimit int64, defaultMaxEv proc.config.transformBatchSize = config.GetReloadableIntVar(100, 1, "Processor.transformBatchSize") proc.config.userTransformBatchSize = config.GetReloadableIntVar(200, 1, "Processor.userTransformBatchSize") proc.config.enableEventCount = config.GetReloadableBoolVar(true, "Processor.enableEventCount") - proc.config.enableEventSchemasAPIOnly = config.GetReloadableBoolVar(false, "EventSchemas.enableEventSchemasAPIOnly") proc.config.maxEventsToProcess = config.GetReloadableIntVar(defaultMaxEventsToProcess, 1, "Processor.maxLoopProcessEvents") proc.config.archivalEnabled = config.GetReloadableBoolVar(true, "archival.Enabled") // Capture event name as a tag in event level stats @@ -2860,16 +2844,6 @@ func (proc *Handle) getJobs(partition string) jobsdb.JobsResult { return unprocessedList } - eventSchemasStart := time.Now() - if proc.config.enableEventSchemasFeature && !proc.config.enableEventSchemasAPIOnly.Load() { - for _, unprocessedJob := range unprocessedList.Jobs { - writeKey := gjson.GetBytes(unprocessedJob.EventPayload, "writeKey").Str - proc.eventSchemaHandler.RecordEventSchema(writeKey, string(unprocessedJob.EventPayload)) - } - } - eventSchemasTime := time.Since(eventSchemasStart) - defer proc.stats.eventSchemasTime(partition).SendTiming(eventSchemasTime) - proc.logger.Debugf("Processor DB Read Complete. unprocessedList: %v total_events: %d", len(unprocessedList.Jobs), unprocessedList.EventsCount) proc.stats.statGatewayDBR(partition).Count(len(unprocessedList.Jobs)) diff --git a/processor/processor_test.go b/processor/processor_test.go index 6b6a027126..39dda7970c 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -143,13 +143,6 @@ var ( emptyJobsList []*jobsdb.JobT ) -// setEnableEventSchemasFeature overrides enableEventSchemasFeature configuration and returns previous value -func setEnableEventSchemasFeature(proc *Handle, b bool) bool { - prev := proc.config.enableEventSchemasFeature - proc.config.enableEventSchemasFeature = b - return prev -} - // SetDisableDedupFeature overrides SetDisableDedupFeature configuration and returns previous value func setDisableDedupFeature(proc *Handle, b bool) bool { prev := proc.config.enableDedup @@ -1404,7 +1397,6 @@ var _ = Describe("Processor", Ordered, func() { prepareHandle := func(proc *Handle) *Handle { proc.config.transformerURL = transformerServer.URL - setEnableEventSchemasFeature(proc, false) isolationStrategy, err := isolation.GetStrategy(isolation.ModeNone) Expect(err).To(BeNil()) proc.isolationStrategy = isolationStrategy diff --git a/runner/runner.go b/runner/runner.go index 31c2d2c36a..bf7594af9c 100644 --- a/runner/runner.go +++ b/runner/runner.go @@ -27,7 +27,6 @@ import ( "github.com/rudderlabs/rudder-server/app" "github.com/rudderlabs/rudder-server/app/apphandlers" backendconfig "github.com/rudderlabs/rudder-server/backend-config" - eventschema "github.com/rudderlabs/rudder-server/event-schema" "github.com/rudderlabs/rudder-server/info" "github.com/rudderlabs/rudder-server/processor/transformer" "github.com/rudderlabs/rudder-server/router/customdestinationmanager" @@ -337,8 +336,6 @@ func runAllInit() { backendconfig.Init() warehouseutils.Init() validations.Init() - eventschema.Init() - eventschema.Init2() kafka.Init() customdestinationmanager.Init() alert.Init() diff --git a/sql/migrations/node/000013_event_schemas_drop.up.sql b/sql/migrations/node/000013_event_schemas_drop.up.sql new file mode 100644 index 0000000000..042828b693 --- /dev/null +++ b/sql/migrations/node/000013_event_schemas_drop.up.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS schema_versions, event_models CASCADE; \ No newline at end of file diff --git a/utils/types/types.go b/utils/types/types.go index 82ccaee103..12f1006205 100644 --- a/utils/types/types.go +++ b/utils/types/types.go @@ -3,7 +3,6 @@ package types import ( - "net/http" "time" "github.com/rudderlabs/rudder-server/enterprise/suppress-user/model" @@ -42,18 +41,6 @@ type UserSuppression interface { GetSuppressedUser(workspaceID, userID, sourceID string) *model.Metadata } -// EventSchemasI is interface to access EventSchemas feature -type EventSchemasI interface { - RecordEventSchema(writeKey, eventBatch string) bool - GetEventModels(w http.ResponseWriter, r *http.Request) - GetEventVersions(w http.ResponseWriter, r *http.Request) - GetSchemaVersionMetadata(w http.ResponseWriter, r *http.Request) - GetSchemaVersionMissingKeys(w http.ResponseWriter, r *http.Request) - GetKeyCounts(w http.ResponseWriter, r *http.Request) - GetEventModelMetadata(w http.ResponseWriter, r *http.Request) - GetJsonSchemas(w http.ResponseWriter, r *http.Request) -} - // ConfigEnvI is interface to inject env variables into config type ConfigEnvI interface { ReplaceConfigWithEnvVariables(workspaceConfig []byte) (updatedConfig []byte)