Skip to content

Commit

Permalink
[GEN-2302]: watch for modified events, do not toast them (#2370)
Browse files Browse the repository at this point in the history
This pull request introduces several changes to the Kubernetes
instrumentation configuration watcher and various frontend components to
handle modified events and improve notification handling. The most
important changes include adding support for modified events in the
watcher, updating notification messages, and enhancing the CRUD
operations for destinations and sources.

### Kubernetes Instrumentation Configuration Watcher:

* Added a new event batcher for handling modified instrumentation
configurations
(`frontend/kube/watchers/instrumentation_config_watcher.go`).
[[1]](diffhunk://#diff-4601618d422a275e070235864f974e3bbe9420efc0e1d06456ace8dc6f1bbf2aR18)
[[2]](diffhunk://#diff-4601618d422a275e070235864f974e3bbe9420efc0e1d06456ace8dc6f1bbf2aR37-R51)
* Implemented a function to handle modified instrumentation
configurations and integrated it into the event handling logic
(`frontend/kube/watchers/instrumentation_config_watcher.go`).
[[1]](diffhunk://#diff-4601618d422a275e070235864f974e3bbe9420efc0e1d06456ace8dc6f1bbf2aR93-R94)
[[2]](diffhunk://#diff-4601618d422a275e070235864f974e3bbe9420efc0e1d06456ace8dc6f1bbf2aR115-R127)

### Notification Messages:

* Updated notification messages for source and destination updates to
include the name of the updated entity
(`frontend/webapp/cypress/constants/index.ts`).
* Adjusted test cases to reflect the updated notification messages
(`frontend/webapp/cypress/e2e/03-sources.cy.ts`,
`frontend/webapp/cypress/e2e/04-destinations.cy.ts`).
[[1]](diffhunk://#diff-981488421b684f7ca98e7e8b147b14c23fe2c57e876d88ca981c563c8ba003cdL53-R53)
[[2]](diffhunk://#diff-6f34bd7eb1cce2b4683938973757c4b0e80921234633a259cd17674c0805ca1bL48-R48)

### CRUD Operations Enhancements:

* Added `removePendingItems` to the `useDestinationCRUD` hook and
updated the `onCompleted` handler to refetch data and notify the user
upon successful updates
(`frontend/webapp/hooks/destinations/useDestinationCRUD.ts`).
[[1]](diffhunk://#diff-c7f19ca063b62568e37726473e1b8c265d3309def0d157772951023c66e055b8L19-R19)
[[2]](diffhunk://#diff-c7f19ca063b62568e37726473e1b8c265d3309def0d157772951023c66e055b8L86-R99)
[[3]](diffhunk://#diff-c7f19ca063b62568e37726473e1b8c265d3309def0d157772951023c66e055b8L104-R141)
* Updated the `useSourceCRUD` hook to include the name of the updated
source in the success notification
(`frontend/webapp/hooks/sources/useSourceCRUD.ts`).

### SSE Handling:

* Modified the SSE handling logic to exclude "modified" and "connected"
events from toast notifications and updated constants for SSE event
types (`frontend/webapp/hooks/notification/useSSE.ts`,
`frontend/webapp/utils/constants/string.tsx`).
[[1]](diffhunk://#diff-db9ebe0ce8cdabc0ede2f45de12661d55fb7aa50a5ecfc0787f5898c55b044d6L5-R5)
[[2]](diffhunk://#diff-db9ebe0ce8cdabc0ede2f45de12661d55fb7aa50a5ecfc0787f5898c55b044d6L24-R46)
[[3]](diffhunk://#diff-6b7ecf63c9f57c564b2f2f5d7998fd2cc45aee9e4dfc71aeeeb8361a39ba66cbL94-R105)
  • Loading branch information
BenElferink authored Feb 3, 2025
1 parent f64539d commit 1f010bb
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 30 deletions.
32 changes: 32 additions & 0 deletions frontend/kube/watchers/instrumentation_config_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
)

var instrumentationConfigAddedEventBatcher *EventBatcher
var instrumentationConfigModifiedEventBatcher *EventBatcher
var instrumentationConfigDeletedEventBatcher *EventBatcher

func StartInstrumentationConfigWatcher(ctx context.Context, namespace string) error {
Expand All @@ -33,6 +34,21 @@ func StartInstrumentationConfigWatcher(ctx context.Context, namespace string) er
},
)

instrumentationConfigModifiedEventBatcher = NewEventBatcher(
EventBatcherConfig{
MinBatchSize: 1,
Duration: 5 * time.Second,
Event: sse.MessageEventModified,
CRDType: consts.InstrumentationConfig,
SuccessBatchMessageFunc: func(count int, crdType string) string {
return fmt.Sprintf("Successfully updated %d sources", count)
},
FailureBatchMessageFunc: func(count int, crdType string) string {
return fmt.Sprintf("Failed to update %d sources", count)
},
},
)

instrumentationConfigDeletedEventBatcher = NewEventBatcher(
EventBatcherConfig{
MinBatchSize: 1,
Expand Down Expand Up @@ -60,6 +76,7 @@ func StartInstrumentationConfigWatcher(ctx context.Context, namespace string) er
func handleInstrumentationConfigWatchEvents(ctx context.Context, watcher watch.Interface) {
ch := watcher.ResultChan()
defer instrumentationConfigAddedEventBatcher.Cancel()
defer instrumentationConfigModifiedEventBatcher.Cancel()
defer instrumentationConfigDeletedEventBatcher.Cancel()
for {
select {
Expand All @@ -73,6 +90,8 @@ func handleInstrumentationConfigWatchEvents(ctx context.Context, watcher watch.I
switch event.Type {
case watch.Added:
handleAddedInstrumentationConfig(event.Object.(*v1alpha1.InstrumentationConfig))
case watch.Modified:
handleModifiedInstrumentationConfig(event.Object.(*v1alpha1.InstrumentationConfig))
case watch.Deleted:
handleDeletedInstrumentationConfig(event.Object.(*v1alpha1.InstrumentationConfig))
}
Expand All @@ -93,6 +112,19 @@ func handleAddedInstrumentationConfig(instruConfig *v1alpha1.InstrumentationConf
instrumentationConfigAddedEventBatcher.AddEvent(sse.MessageTypeSuccess, data, target)
}

func handleModifiedInstrumentationConfig(instruConfig *v1alpha1.InstrumentationConfig) {
namespace := instruConfig.Namespace
name, kind, err := commonutils.ExtractWorkloadInfoFromRuntimeObjectName(instruConfig.Name)
if err != nil {
genericErrorMessage(sse.MessageEventModified, consts.InstrumentationConfig, err.Error())
return
}

target := fmt.Sprintf("namespace=%s&name=%s&kind=%s", namespace, name, kind)
data := fmt.Sprintf(`Source "%s" updated`, name)
instrumentationConfigModifiedEventBatcher.AddEvent(sse.MessageTypeSuccess, data, target)
}

func handleDeletedInstrumentationConfig(instruConfig *v1alpha1.InstrumentationConfig) {
namespace := instruConfig.Namespace
name, kind, err := commonutils.ExtractWorkloadInfoFromRuntimeObjectName(instruConfig.Name)
Expand Down
5 changes: 2 additions & 3 deletions frontend/webapp/cypress/constants/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ export const TEXTS = {
INSTRUMENTATION_RULE_WARN_MODAL_TITLE: `Delete rule (${CYPRESS_TEST})`,

NOTIF_SOURCES_CREATED: (amount: number) => `Successfully created ${amount} sources`,
NOTIF_SOURCES_UPDATED: (amount: number) => `Successfully updated ${amount} source`,
NOTIF_SOURCES_UPDATED: (name: string) => `Successfully updated "${name}" source`,
NOTIF_SOURCES_DELETED: (amount: number) => `Successfully deleted ${amount} sources`,

NOTIF_DESTINATIONS_CREATED: (amount: number) => `Successfully created ${amount} destinations`,
// TODO: this message isn't right, fix in backend
NOTIF_DESTINATIONS_UPDATED: (amount: number) => `Successfully transformed ${amount + 1} destinations to otelcol configuration`,
NOTIF_DESTINATIONS_UPDATED: (name: string) => `Successfully updated "${name}" destination`,
NOTIF_DESTINATIONS_DELETED: (amount: number) => `Successfully deleted ${amount} destinations`,

NOTIF_ACTION_CREATED: (crdId: string) => `Action "${crdId}" created`,
Expand Down
2 changes: 1 addition & 1 deletion frontend/webapp/cypress/e2e/03-sources.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ describe('Sources CRUD', () => {
getCrdIds({ namespace, crdName, expectedError: '', expectedLength: 5 }, (crdIds) => {
const crdId = CRD_IDS.SOURCE;
expect(crdIds).includes(crdId);
awaitToast({ withSSE: false, message: TEXTS.NOTIF_SOURCES_UPDATED(1) }, () => {
awaitToast({ withSSE: false, message: TEXTS.NOTIF_SOURCES_UPDATED(SELECTED_ENTITIES.SOURCE) }, () => {
getCrdById({ namespace, crdName, crdId, expectedError: '', expectedKey: 'serviceName', expectedValue: TEXTS.UPDATED_NAME });
});
});
Expand Down
3 changes: 1 addition & 2 deletions frontend/webapp/cypress/e2e/04-destinations.cy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ describe('Destinations CRUD', () => {
cy.wait('@gql').then(() => {
getCrdIds({ namespace, crdName, expectedError: '', expectedLength: 1 }, (crdIds) => {
const crdId = crdIds[0];

awaitToast({ withSSE: true, message: TEXTS.NOTIF_DESTINATIONS_UPDATED(1) }, () => {
awaitToast({ withSSE: false, message: TEXTS.NOTIF_DESTINATIONS_UPDATED(SELECTED_ENTITIES.DESTINATION_TYPE) }, () => {
getCrdById({ namespace, crdName, crdId, expectedError: '', expectedKey: 'destinationName', expectedValue: TEXTS.UPDATED_NAME });
});
});
Expand Down
29 changes: 21 additions & 8 deletions frontend/webapp/hooks/destinations/useDestinationCRUD.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ interface Params {
export const useDestinationCRUD = (params?: Params) => {
const filters = useFilterStore();
const { data: config } = useConfig();
const { addPendingItems } = usePendingStore();
const { addPendingItems, removePendingItems } = usePendingStore();
const { addNotification, removeNotifications } = useNotificationStore();

const notifyUser = (type: NOTIFICATION_TYPE, title: string, message: string, id?: string, hideFromHistory?: boolean) => {
Expand Down Expand Up @@ -83,7 +83,20 @@ export const useDestinationCRUD = (params?: Params) => {

const [updateDestination, uState] = useMutation<{ updateDestination: { id: string } }>(UPDATE_DESTINATION, {
onError: (error) => handleError(ACTION.UPDATE, error.message),
onCompleted: () => handleComplete(ACTION.UPDATE),
onCompleted: (res, req) => {
handleComplete(ACTION.UPDATE);

// This is instead of toasting a k8s modified-event watcher...
// If we do toast with a watcher, we can't guarantee an SSE will be sent for this update alone. It will definitely include SSE for all updates, even those unexpected.
// Not that there's anything about a watcher that would break the UI, it's just that we would receive unexpected events with ridiculous amounts.
setTimeout(() => {
const { id, destination } = req?.variables || {};

refetch();
notifyUser(NOTIFICATION_TYPE.SUCCESS, ACTION.UPDATE, `Successfully updated "${destination.type}" destination`, id);
removePendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: id }]);
}, 2000);
},
});

const [deleteDestination, dState] = useMutation<{ deleteDestination: boolean }>(DELETE_DESTINATION, {
Expand All @@ -101,31 +114,31 @@ export const useDestinationCRUD = (params?: Params) => {
filteredDestinations: filtered,
refetchDestinations: refetch,

createDestination: (destination: DestinationInput) => {
createDestination: async (destination: DestinationInput) => {
if (config?.readonly) {
notifyUser(NOTIFICATION_TYPE.WARNING, DISPLAY_TITLES.READONLY, FORM_ALERTS.READONLY_WARNING, undefined, true);
} else {
notifyUser(NOTIFICATION_TYPE.INFO, 'Pending', 'Creating destination...', undefined, true);
addPendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: undefined }]);
createDestination({ variables: { destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
await createDestination({ variables: { destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
}
},
updateDestination: (id: string, destination: DestinationInput) => {
updateDestination: async (id: string, destination: DestinationInput) => {
if (config?.readonly) {
notifyUser(NOTIFICATION_TYPE.WARNING, DISPLAY_TITLES.READONLY, FORM_ALERTS.READONLY_WARNING, undefined, true);
} else {
notifyUser(NOTIFICATION_TYPE.INFO, 'Pending', 'Updating destination...', undefined, true);
addPendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: id }]);
updateDestination({ variables: { id, destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
await updateDestination({ variables: { id, destination: { ...destination, fields: destination.fields.filter(({ value }) => value !== undefined) } } });
}
},
deleteDestination: (id: string) => {
deleteDestination: async (id: string) => {
if (config?.readonly) {
notifyUser(NOTIFICATION_TYPE.WARNING, DISPLAY_TITLES.READONLY, FORM_ALERTS.READONLY_WARNING, undefined, true);
} else {
notifyUser(NOTIFICATION_TYPE.INFO, 'Pending', 'Deleting destination...', undefined, true);
addPendingItems([{ entityType: ENTITY_TYPES.DESTINATION, entityId: id }]);
deleteDestination({ variables: { id } });
await deleteDestination({ variables: { id } });
}
},
};
Expand Down
19 changes: 10 additions & 9 deletions frontend/webapp/hooks/notification/useSSE.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { useEffect, useRef } from 'react';
import { NOTIFICATION_TYPE } from '@odigos/ui-utils';
import { useDestinationCRUD } from '../destinations';
import { usePaginatedSources } from '../compute-platform';
import { API, DISPLAY_TITLES, NOTIF_CRD_TYPES } from '@/utils';
import { API, DISPLAY_TITLES, SSE_CRD_TYPES, SSE_EVENT_TYPES } from '@/utils';
import { type NotifyPayload, useNotificationStore, usePendingStore, useStatusStore } from '@/store';

export const useSSE = () => {
Expand All @@ -21,28 +21,29 @@ export const useSSE = () => {

es.onmessage = (event) => {
const data = JSON.parse(event.data);
const crdType = data.crdType || '';
const notification: NotifyPayload = {
type: data.type,
title: data.event || '',
message: data.data || '',
crdType,
crdType: data.crdType || '',
target: data.target,
};

// SSE toast notification
if (crdType !== NOTIF_CRD_TYPES.CONNECTED) addNotification(notification);
if (notification.title !== SSE_EVENT_TYPES.MODIFIED && notification.crdType !== SSE_CRD_TYPES.CONNECTED) {
// SSE toast notification (for all events except "modified" and "connected")
addNotification(notification);
}

// Handle specific CRD types
if ([NOTIF_CRD_TYPES.CONNECTED].includes(crdType)) {
if ([SSE_CRD_TYPES.CONNECTED].includes(notification.crdType as string)) {
if (title !== DISPLAY_TITLES.API_TOKEN) {
setStatusStore({ status: NOTIFICATION_TYPE.SUCCESS, title: notification.title as string, message: notification.message as string });
}
} else if ([NOTIF_CRD_TYPES.INSTRUMENTATION_CONFIG, NOTIF_CRD_TYPES.INSTRUMENTATION_INSTANCE].includes(crdType)) {
} else if ([SSE_CRD_TYPES.INSTRUMENTATION_CONFIG, SSE_CRD_TYPES.INSTRUMENTATION_INSTANCE].includes(notification.crdType as string)) {
fetchSources();
} else if ([NOTIF_CRD_TYPES.DESTINATION].includes(crdType)) {
} else if ([SSE_CRD_TYPES.DESTINATION].includes(notification.crdType as string)) {
refetchDestinations();
} else console.warn('Unhandled SSE for CRD type:', crdType);
} else console.warn('Unhandled SSE for CRD type:', notification.crdType);

// This works for now,
// but in the future we might have to change this to "removePendingItems",
Expand Down
10 changes: 4 additions & 6 deletions frontend/webapp/hooks/sources/useSourceCRUD.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,14 @@ export const useSourceCRUD = (params?: Params) => {
onCompleted: (res, req) => {
handleComplete(ACTION.UPDATE);

// This is instead of using a k8s modified-event watcher...
// If we do use a watcher, we can't guarantee an SSE will be sent for this update alone.
// It will definitely include SSE for all updates, that can be instrument/uninstrument, conditions changed etc.
// Not that there's anything about a watcher that would break the UI, it's just that we would receive unexpected events with ridiculous amounts,
// (example: instrument 5 apps, update the name of 2, then uninstrument the other 3, we would get an SSE with minimum 10 updated sources, when we expect it to show only 2 due to name change).
// This is instead of toasting a k8s modified-event watcher...
// If we do toast with a watcher, we can't guarantee an SSE will be sent for this update alone. It will definitely include SSE for all updates, even those unexpected.
// Not that there's anything about a watcher that would break the UI, it's just that we would receive unexpected events with ridiculous amounts.
setTimeout(() => {
const { sourceId, patchSourceRequest } = req?.variables || {};

updateSource(sourceId, patchSourceRequest);
notifyUser(NOTIFICATION_TYPE.SUCCESS, ACTION.UPDATE, 'Successfully updated 1 source', sourceId);
notifyUser(NOTIFICATION_TYPE.SUCCESS, ACTION.UPDATE, `Successfully updated "${sourceId.name}" source`, sourceId);
removePendingItems([{ entityType: ENTITY_TYPES.SOURCE, entityId: sourceId }]);
}, 2000);
},
Expand Down
8 changes: 7 additions & 1 deletion frontend/webapp/utils/constants/string.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,15 @@ export const DISPLAY_TITLES = {
READONLY: 'Readonly',
};

export const NOTIF_CRD_TYPES = {
export const SSE_CRD_TYPES = {
CONNECTED: 'CONNECTED',
INSTRUMENTATION_CONFIG: 'InstrumentationConfig',
INSTRUMENTATION_INSTANCE: 'InstrumentationInstance',
DESTINATION: 'Destination',
};

export const SSE_EVENT_TYPES = {
ADDED: 'Added',
MODIFIED: 'Modified',
DELETED: 'Deleted',
};

0 comments on commit 1f010bb

Please sign in to comment.