-
Notifications
You must be signed in to change notification settings - Fork 8.6k
Reset the metrics after each emission #59551
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| /* | ||
| * Licensed to Elasticsearch B.V. under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch B.V. licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| import { MetricsCollector } from './types'; | ||
|
|
||
| const createMock = () => { | ||
| const mocked: jest.Mocked<MetricsCollector<any>> = { | ||
| collect: jest.fn(), | ||
| reset: jest.fn(), | ||
| }; | ||
|
|
||
| mocked.collect.mockResolvedValue({}); | ||
|
|
||
| return mocked; | ||
| }; | ||
|
|
||
| export const collectorMock = { | ||
| create: createMock, | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,37 +57,50 @@ describe('MetricsService', () => { | |
| expect(setInterval).toHaveBeenCalledWith(expect.any(Function), testInterval); | ||
| }); | ||
|
|
||
| it('emits the metrics at start', async () => { | ||
| it('collects the metrics at every interval', async () => { | ||
| mockOpsCollector.collect.mockResolvedValue(dummyMetrics); | ||
|
|
||
| const { getOpsMetrics$ } = await metricsService.setup({ | ||
| http: httpMock, | ||
| }); | ||
|
|
||
| await metricsService.setup({ http: httpMock }); | ||
| await metricsService.start(); | ||
|
|
||
| expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1); | ||
| expect( | ||
| await getOpsMetrics$() | ||
| .pipe(take(1)) | ||
| .toPromise() | ||
| ).toEqual(dummyMetrics); | ||
|
|
||
| jest.advanceTimersByTime(testInterval); | ||
| expect(mockOpsCollector.collect).toHaveBeenCalledTimes(2); | ||
|
|
||
| jest.advanceTimersByTime(testInterval); | ||
| expect(mockOpsCollector.collect).toHaveBeenCalledTimes(3); | ||
| }); | ||
|
|
||
| it('collects the metrics at every interval', async () => { | ||
| it('resets the collector after each collection', async () => { | ||
| mockOpsCollector.collect.mockResolvedValue(dummyMetrics); | ||
|
|
||
| await metricsService.setup({ http: httpMock }); | ||
|
|
||
| const { getOpsMetrics$ } = await metricsService.setup({ http: httpMock }); | ||
| await metricsService.start(); | ||
|
|
||
| // `advanceTimersByTime` only ensure the interval handler is executed | ||
| // however the `reset` call is executed after the async call to `collect` | ||
| // meaning that we are going to miss the call if we don't wait for the | ||
| // actual observable emission that is performed after | ||
| const waitForNextEmission = () => | ||
| getOpsMetrics$() | ||
| .pipe(take(1)) | ||
| .toPromise(); | ||
|
|
||
| expect(mockOpsCollector.collect).toHaveBeenCalledTimes(1); | ||
| expect(mockOpsCollector.reset).toHaveBeenCalledTimes(1); | ||
|
|
||
| let nextEmission = waitForNextEmission(); | ||
| jest.advanceTimersByTime(testInterval); | ||
| await nextEmission; | ||
|
Comment on lines
+85
to
+95
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private async refreshMetrics() {
this.logger.debug('Refreshing metrics');
const metrics = await this.metricsCollector!.collect();
this.metricsCollector!.reset();
this.metrics$.next(metrics);
}
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does it make sense to add a comment in test?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably. Will do |
||
| expect(mockOpsCollector.collect).toHaveBeenCalledTimes(2); | ||
| expect(mockOpsCollector.reset).toHaveBeenCalledTimes(2); | ||
|
|
||
| nextEmission = waitForNextEmission(); | ||
| jest.advanceTimersByTime(testInterval); | ||
| await nextEmission; | ||
| expect(mockOpsCollector.collect).toHaveBeenCalledTimes(3); | ||
| expect(mockOpsCollector.reset).toHaveBeenCalledTimes(3); | ||
| }); | ||
|
|
||
| it('throws when called before setup', async () => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,8 +17,8 @@ | |
| * under the License. | ||
| */ | ||
|
|
||
| import { ReplaySubject } from 'rxjs'; | ||
| import { first, shareReplay } from 'rxjs/operators'; | ||
| import { Subject } from 'rxjs'; | ||
| import { first } from 'rxjs/operators'; | ||
| import { CoreService } from '../../types'; | ||
| import { CoreContext } from '../core_context'; | ||
| import { Logger } from '../logging'; | ||
|
|
@@ -37,7 +37,7 @@ export class MetricsService | |
| private readonly logger: Logger; | ||
| private metricsCollector?: OpsMetricsCollector; | ||
| private collectInterval?: NodeJS.Timeout; | ||
| private metrics$ = new ReplaySubject<OpsMetrics>(1); | ||
| private metrics$ = new Subject<OpsMetrics>(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the metrics being reset after every emission, we probably don't want a replay effect anymore, so I switched back to a plain Subject. This also go closer to the legacy/oppsy implementation that was using events (therefor have no replay capabilities)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It means that a subscriber doesn't receive the last emitted value. But okay, if it works in this way in LP |
||
|
|
||
| constructor(private readonly coreContext: CoreContext) { | ||
| this.logger = coreContext.logger.get('metrics'); | ||
|
|
@@ -46,7 +46,7 @@ export class MetricsService | |
| public async setup({ http }: MetricsServiceSetupDeps): Promise<InternalMetricsServiceSetup> { | ||
| this.metricsCollector = new OpsMetricsCollector(http.server); | ||
|
|
||
| const metricsObservable = this.metrics$.pipe(shareReplay(1)); | ||
| const metricsObservable = this.metrics$.asObservable(); | ||
|
|
||
| return { | ||
| getOpsMetrics$: () => metricsObservable, | ||
|
|
@@ -74,6 +74,7 @@ export class MetricsService | |
| private async refreshMetrics() { | ||
| this.logger.debug('Refreshing metrics'); | ||
| const metrics = await this.metricsCollector!.collect(); | ||
| this.metricsCollector!.reset(); | ||
| this.metrics$.next(metrics); | ||
| } | ||
|
|
||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont want optional properties, and as calling
resetin the constructor is not sufficient to have TS consider the variable as initialized, I'm forced to duplicate that in theresetfunction.