-
Notifications
You must be signed in to change notification settings - Fork 322
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: generateTransformationMessage is its own concurrent step #5449
chore: generateTransformationMessage is its own concurrent step #5449
Conversation
727f6dc
to
eb3998c
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5449 +/- ##
==========================================
+ Coverage 74.99% 75.01% +0.02%
==========================================
Files 460 460
Lines 63532 63548 +16
==========================================
+ Hits 47644 47673 +29
+ Misses 13227 13218 -9
+ Partials 2661 2657 -4 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me.
processor/processor.go
Outdated
hasMore: false, | ||
rsourcesStats: rsourcesStats, | ||
}) | ||
transMessage, err = proc.generateTransformationMessage(proc.processJobsForDest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR. but there's a possibility of duplicate persistence of jobs in events schema and archival?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible yeah
processor/processor.go
Outdated
@@ -1734,7 +1734,7 @@ func (proc *Handle) processJobsForDest(partition string, subJobs subJob) (*trans | |||
for _, batchEvent := range jobList { | |||
var eventParams types.EventParams | |||
if err := jsonfast.Unmarshal(batchEvent.Parameters, &eventParams); err != nil { | |||
return nil, err | |||
panic(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets return error from here, we can handle error at 1 place only in worker.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we keep this change behind a flag?
It might lead to some complexity in that it'll involve extra handling to choose the channel to put things in based on the flag. And eventually some more at shutdown too. Imo we can go ahead without a flag because the only concern could be memory usage - but that's blocked at the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
🤖 I have created a release *beep* *boop* --- ## [1.43.0](v1.42.0...v1.43.0) (2025-02-18) ### Features * add a new implementation of schema with ttl ([#5462](#5462)) ([c708863](c708863)) * implement snowpipe destination config validation ([#5472](#5472)) ([a3810fc](a3810fc)) * introduce router transformer metrics for cost attribution ([#5511](#5511)) ([a8b3a00](a8b3a00)) * introduce transformer metrics for cost attribution ([#5505](#5505)) ([90d1f24](90d1f24)) * sync latency metrics api ([#5500](#5500)) ([b35a657](b35a657)) * updated validation check for microsoftClickId in bingads_offline_conversions async destination ([#5514](#5514)) ([6468c42](6468c42)) ### Bug Fixes * async framework destination live events ([#5480](#5480)) ([4938a25](4938a25)) * databricks columns to add becomes empty after filtering ([#5494](#5494)) ([4854220](4854220)) * deltalake syncs failing for columns with unhandled data type ([#5467](#5467)) ([28d6072](28d6072)) * security vulnerabilities ([#5481](#5481)) ([d89b8cc](d89b8cc)) * snowpipe streaming error enrichment for failed events ([#5479](#5479)) ([737a42a](737a42a)) * warehouse transformations SourceDefinitionType as empty ([#5510](#5510)) ([15faf6b](15faf6b)) ### Miscellaneous * **deps:** bump google.golang.org/protobuf from 1.36.4 to 1.36.5 in the go-deps group ([#5489](#5489)) ([e0b64f8](e0b64f8)) * **deps:** bump the go-deps group across 1 directory with 14 updates ([#5478](#5478)) ([1ab22b0](1ab22b0)) * fix gateway metrics ([#5483](#5483)) ([4938a25](4938a25)) * generateTransformationMessage is its own concurrent step ([#5449](#5449)) ([734e759](734e759)) * integrate build-scan-push-action to scan docker images ([#5464](#5464)) ([e9579b8](e9579b8)) * make cslb configurable ([#5451](#5451)) ([4938a25](4938a25)) * migration to drop unused wh_schema_versions table ([#5469](#5469)) ([28d6072](28d6072)) * sanitise sample event payload and sample response before inserting in reports table ([#5493](#5493)) ([9ec7552](9ec7552)) * sync release v1.42.1 to main branch ([#5477](#5477)) ([ad74333](ad74333)) * track event name post user transformation for reporting ([#5501](#5501)) ([7fcdc47](7fcdc47)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Description
generateTransformationMessage is its own concurrent step.
Includes:
Linear Ticket
Resolves PIPE-1719
Security