Skip to content

Commit 38c0072

Browse files
authored
Merge branch 'main' into remove-unnecessary-base64-encoding
2 parents c381dcb + 3e59291 commit 38c0072

File tree

11 files changed

+319
-37
lines changed

11 files changed

+319
-37
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ For experimental package changes, see the [experimental CHANGELOG](experimental/
1212
### :rocket: (Enhancement)
1313

1414
* perf(otlp-transformer): skip unnecessary base64 encode of span contexts [#4343](https://github.com/open-telemetry/opentelemetry-js/pull/4343) @seemk
15+
* feat(sdk-trace-base): improve log messages when dropping span events [#4223](https://github.com/open-telemetry/opentelemetry-js/pull/4223) @mkubliniak
1516

1617
### :bug: (Bug Fix)
1718

experimental/CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ All notable changes to experimental packages in this project will be documented
3131

3232
### :bug: (Bug Fix)
3333

34+
* fix(sdk-logs): await async resources in log processors
3435
* fix(sdk-logs): avoid map attribute set when count limit exceeded
3536
* fix(instrumentation-fetch): only access navigator if it is defined [#4063](https://github.com/open-telemetry/opentelemetry-js/pull/4063)
3637
* allows for experimental usage of this instrumentation with non-browser runtimes

experimental/packages/sdk-logs/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
"@babel/core": "7.23.6",
7676
"@opentelemetry/api": ">=1.4.0 <1.8.0",
7777
"@opentelemetry/api-logs": "0.46.0",
78+
"@opentelemetry/resources_1.9.0": "npm:@opentelemetry/[email protected]",
7879
"@types/mocha": "10.0.6",
7980
"@types/node": "18.6.5",
8081
"@types/sinon": "10.0.20",

experimental/packages/sdk-logs/src/export/BatchLogRecordProcessorBase.ts

+30-16
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import {
2121
getEnv,
2222
globalErrorHandler,
2323
unrefTimer,
24-
callWithTimeout,
2524
BindOnceFuture,
25+
internal,
26+
callWithTimeout,
2627
} from '@opentelemetry/core';
2728

2829
import type { BufferConfig } from '../types';
@@ -163,21 +164,34 @@ export abstract class BatchLogRecordProcessorBase<T extends BufferConfig>
163164
}
164165
}
165166

166-
private _export(logRecords: LogRecord[]): Promise<ExportResult> {
167-
return new Promise((resolve, reject) => {
168-
this._exporter.export(logRecords, (res: ExportResult) => {
169-
if (res.code !== ExportResultCode.SUCCESS) {
170-
reject(
171-
res.error ??
172-
new Error(
173-
`BatchLogRecordProcessorBase: log record export failed (status ${res})`
174-
)
175-
);
176-
return;
177-
}
178-
resolve(res);
179-
});
180-
});
167+
private _export(logRecords: LogRecord[]): Promise<void> {
168+
const doExport = () =>
169+
internal
170+
._export(this._exporter, logRecords)
171+
.then((result: ExportResult) => {
172+
if (result.code !== ExportResultCode.SUCCESS) {
173+
globalErrorHandler(
174+
result.error ??
175+
new Error(
176+
`BatchLogRecordProcessor: log record export failed (status ${result})`
177+
)
178+
);
179+
}
180+
})
181+
.catch(globalErrorHandler);
182+
183+
const pendingResources = logRecords
184+
.map(logRecord => logRecord.resource)
185+
.filter(resource => resource.asyncAttributesPending);
186+
187+
// Avoid scheduling a promise to make the behavior more predictable and easier to test
188+
if (pendingResources.length === 0) {
189+
return doExport();
190+
} else {
191+
return Promise.all(
192+
pendingResources.map(resource => resource.waitForAsyncAttributes?.())
193+
).then(doExport, globalErrorHandler);
194+
}
181195
}
182196

183197
protected abstract onShutdown(): void;

experimental/packages/sdk-logs/src/export/SimpleLogRecordProcessor.ts

+39-14
Original file line numberDiff line numberDiff line change
@@ -19,40 +19,65 @@ import {
1919
BindOnceFuture,
2020
ExportResultCode,
2121
globalErrorHandler,
22+
internal,
2223
} from '@opentelemetry/core';
23-
2424
import type { LogRecordExporter } from './LogRecordExporter';
2525
import type { LogRecordProcessor } from '../LogRecordProcessor';
2626
import type { LogRecord } from './../LogRecord';
2727

2828
export class SimpleLogRecordProcessor implements LogRecordProcessor {
2929
private _shutdownOnce: BindOnceFuture<void>;
30+
private _unresolvedExports: Set<Promise<void>>;
3031

3132
constructor(private readonly _exporter: LogRecordExporter) {
3233
this._shutdownOnce = new BindOnceFuture(this._shutdown, this);
34+
this._unresolvedExports = new Set<Promise<void>>();
3335
}
3436

3537
public onEmit(logRecord: LogRecord): void {
3638
if (this._shutdownOnce.isCalled) {
3739
return;
3840
}
3941

40-
this._exporter.export([logRecord], (res: ExportResult) => {
41-
if (res.code !== ExportResultCode.SUCCESS) {
42-
globalErrorHandler(
43-
res.error ??
44-
new Error(
45-
`SimpleLogRecordProcessor: log record export failed (status ${res})`
46-
)
47-
);
48-
return;
42+
const doExport = () =>
43+
internal
44+
._export(this._exporter, [logRecord])
45+
.then((result: ExportResult) => {
46+
if (result.code !== ExportResultCode.SUCCESS) {
47+
globalErrorHandler(
48+
result.error ??
49+
new Error(
50+
`SimpleLogRecordProcessor: log record export failed (status ${result})`
51+
)
52+
);
53+
}
54+
})
55+
.catch(globalErrorHandler);
56+
57+
// Avoid scheduling a promise to make the behavior more predictable and easier to test
58+
if (logRecord.resource.asyncAttributesPending) {
59+
const exportPromise = logRecord.resource
60+
.waitForAsyncAttributes?.()
61+
.then(() => {
62+
// Using TS Non-null assertion operator because exportPromise could not be null in here
63+
// if waitForAsyncAttributes is not present this code will never be reached
64+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
65+
this._unresolvedExports.delete(exportPromise!);
66+
return doExport();
67+
}, globalErrorHandler);
68+
69+
// store the unresolved exports
70+
if (exportPromise != null) {
71+
this._unresolvedExports.add(exportPromise);
4972
}
50-
});
73+
} else {
74+
void doExport();
75+
}
5176
}
5277

53-
public forceFlush(): Promise<void> {
54-
// do nothing as all log records are being exported without waiting
55-
return Promise.resolve();
78+
public async forceFlush(): Promise<void> {
79+
// await unresolved resources before resolving
80+
await Promise.all(Array.from(this._unresolvedExports));
5681
}
5782

5883
public shutdown(): Promise<void> {

experimental/packages/sdk-logs/test/common/export/BatchLogRecordProcessor.test.ts

+25-3
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,19 @@ import {
3232
import { BatchLogRecordProcessorBase } from '../../../src/export/BatchLogRecordProcessorBase';
3333
import { reconfigureLimits } from '../../../src/config';
3434
import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState';
35-
import { Resource } from '@opentelemetry/resources';
35+
import { Resource, ResourceAttributes } from '@opentelemetry/resources';
3636

3737
class BatchLogRecordProcessor extends BatchLogRecordProcessorBase<BufferConfig> {
3838
onInit() {}
3939
onShutdown() {}
4040
}
4141

42-
const createLogRecord = (limits?: LogRecordLimits): LogRecord => {
42+
const createLogRecord = (
43+
limits?: LogRecordLimits,
44+
resource?: Resource
45+
): LogRecord => {
4346
const sharedState = new LoggerProviderSharedState(
44-
Resource.default(),
47+
resource || Resource.default(),
4548
Infinity,
4649
reconfigureLimits(limits ?? {})
4750
);
@@ -308,6 +311,25 @@ describe('BatchLogRecordProcessorBase', () => {
308311
await processor.forceFlush();
309312
assert.strictEqual(exporter.getFinishedLogRecords().length, 1);
310313
});
314+
315+
it('should wait for pending resource on flush', async () => {
316+
const processor = new BatchLogRecordProcessor(exporter);
317+
const asyncResource = new Resource(
318+
{},
319+
new Promise<ResourceAttributes>(resolve => {
320+
setTimeout(() => resolve({ async: 'fromasync' }), 1);
321+
})
322+
);
323+
const logRecord = createLogRecord(undefined, asyncResource);
324+
processor.onEmit(logRecord);
325+
await processor.forceFlush();
326+
const exportedLogs = exporter.getFinishedLogRecords();
327+
assert.strictEqual(exportedLogs.length, 1);
328+
assert.strictEqual(
329+
exportedLogs[0].resource.attributes['async'],
330+
'fromasync'
331+
);
332+
});
311333
});
312334

313335
describe('shutdown', () => {

experimental/packages/sdk-logs/test/common/export/SimpleLogRecordProcessor.test.ts

+66-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ import {
2121
loggingErrorHandler,
2222
setGlobalErrorHandler,
2323
} from '@opentelemetry/core';
24+
import { Resource, ResourceAttributes } from '@opentelemetry/resources';
25+
import { Resource as Resource190 } from '@opentelemetry/resources_1.9.0';
2426

2527
import {
2628
InMemoryLogRecordExporter,
@@ -29,12 +31,12 @@ import {
2931
LogRecord,
3032
} from './../../../src';
3133
import { LoggerProviderSharedState } from '../../../src/internal/LoggerProviderSharedState';
32-
import { Resource } from '@opentelemetry/resources';
3334
import { reconfigureLimits } from '../../../src/config';
35+
import { TestExporterWithDelay } from './TestExporterWithDelay';
3436

35-
const setup = (exporter: LogRecordExporter) => {
37+
const setup = (exporter: LogRecordExporter, resource?: Resource) => {
3638
const sharedState = new LoggerProviderSharedState(
37-
Resource.default(),
39+
resource || Resource.default(),
3840
Infinity,
3941
reconfigureLimits({})
4042
);
@@ -113,4 +115,65 @@ describe('SimpleLogRecordProcessor', () => {
113115
assert.ok(shutdownSpy.callCount === 1);
114116
});
115117
});
118+
119+
describe('force flush', () => {
120+
it('should await unresolved resources', async () => {
121+
const exporter = new InMemoryLogRecordExporter();
122+
const asyncResource = new Resource(
123+
{},
124+
new Promise<ResourceAttributes>(resolve => {
125+
setTimeout(() => resolve({ async: 'fromasync' }), 1);
126+
})
127+
);
128+
const { processor, logRecord } = setup(exporter, asyncResource);
129+
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
130+
processor.onEmit(logRecord);
131+
132+
await processor.forceFlush();
133+
134+
const exportedLogs = exporter.getFinishedLogRecords();
135+
assert.strictEqual(exportedLogs.length, 1);
136+
assert.strictEqual(
137+
exportedLogs[0].resource.attributes['async'],
138+
'fromasync'
139+
);
140+
});
141+
142+
it('should await doExport() and delete from _unresolvedExports', async () => {
143+
const testExporterWithDelay = new TestExporterWithDelay();
144+
const asyncResource = new Resource(
145+
{},
146+
new Promise<ResourceAttributes>(resolve => {
147+
setTimeout(() => resolve({ async: 'fromasync' }), 1);
148+
})
149+
);
150+
const processor = new SimpleLogRecordProcessor(testExporterWithDelay);
151+
const { logRecord } = setup(testExporterWithDelay, asyncResource);
152+
153+
processor.onEmit(logRecord);
154+
assert.strictEqual(processor['_unresolvedExports'].size, 1);
155+
await processor.forceFlush();
156+
assert.strictEqual(processor['_unresolvedExports'].size, 0);
157+
const exportedLogRecords = testExporterWithDelay.getFinishedLogRecords();
158+
assert.strictEqual(exportedLogRecords.length, 1);
159+
});
160+
});
161+
162+
describe('compatibility', () => {
163+
it('should export when using old resource implementation', async () => {
164+
const exporter = new InMemoryLogRecordExporter();
165+
const { processor, logRecord } = setup(
166+
exporter,
167+
new Resource190({ fromold: 'fromold' })
168+
);
169+
assert.strictEqual(exporter.getFinishedLogRecords().length, 0);
170+
processor.onEmit(logRecord);
171+
const exportedLogs = exporter.getFinishedLogRecords();
172+
assert.strictEqual(exportedLogs.length, 1);
173+
assert.strictEqual(
174+
exportedLogs[0].resource.attributes['fromold'],
175+
'fromold'
176+
);
177+
});
178+
});
116179
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { ExportResult } from '@opentelemetry/core';
18+
import { InMemoryLogRecordExporter, ReadableLogRecord } from '../../../src';
19+
20+
/**
21+
* A test-only exporter that delays during export to mimic a real exporter.
22+
*/
23+
export class TestExporterWithDelay extends InMemoryLogRecordExporter {
24+
private _exporterCreatedLogRecords: ReadableLogRecord[] = [];
25+
26+
constructor() {
27+
super();
28+
}
29+
30+
override export(
31+
logRecords: ReadableLogRecord[],
32+
resultCallback: (result: ExportResult) => void
33+
): void {
34+
super.export(logRecords, () => setTimeout(resultCallback, 1));
35+
}
36+
37+
override shutdown(): Promise<void> {
38+
return super.shutdown().then(() => {
39+
this._exporterCreatedLogRecords = [];
40+
});
41+
}
42+
43+
override reset() {
44+
super.reset();
45+
this._exporterCreatedLogRecords = [];
46+
}
47+
48+
getExporterCreatedLogRecords(): ReadableLogRecord[] {
49+
return this._exporterCreatedLogRecords;
50+
}
51+
}

0 commit comments

Comments
 (0)