Skip to content

Commit

Permalink
Multiple log processors (#967)
Browse files Browse the repository at this point in the history
adding support for multiple log processors
  • Loading branch information
brettmc authored Apr 14, 2023
1 parent c687df1 commit a06e5e4
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 28 deletions.
1 change: 0 additions & 1 deletion examples/logs/exporters/otlp_grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

$transport = (new GrpcTransportFactory())->create('http://collector:4317' . OtlpUtil::method(Signals::LOGS));
$exporter = new LogsExporter($transport);

$loggerProvider = new LoggerProvider(
new BatchLogsProcessor(
$exporter,
Expand Down
9 changes: 4 additions & 5 deletions examples/logs/getting_started.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@

use OpenTelemetry\API\Logs\EventLogger;
use OpenTelemetry\API\Logs\LogRecord;
use OpenTelemetry\SDK\Common\Export\Stream\StreamTransportFactory;
use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\SDK\Common\Instrumentation\InstrumentationScopeFactory;
use OpenTelemetry\SDK\Logs\Exporter\ConsoleExporter;
use OpenTelemetry\SDK\Logs\Exporter\ConsoleExporterFactory;
use OpenTelemetry\SDK\Logs\LoggerProvider;
use OpenTelemetry\SDK\Logs\LogRecordLimitsBuilder;
use OpenTelemetry\SDK\Logs\Processor\SimpleLogsProcessor;
use OpenTelemetry\SDK\Trace\TracerProvider;

require __DIR__ . '/../../vendor/autoload.php';

$loggerProvider = new LoggerProvider(
new SimpleLogsProcessor(
new ConsoleExporter((new StreamTransportFactory())->create(STDOUT, 'text'))
(new ConsoleExporterFactory())->create()
),
new InstrumentationScopeFactory((new LogRecordLimitsBuilder())->build()->getAttributeFactory())
new InstrumentationScopeFactory(Attributes::factory())
);
$tracerProvider = new TracerProvider();
$tracer = $tracerProvider->getTracer('demo-tracer');
Expand Down
2 changes: 1 addition & 1 deletion examples/sdk_builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
->build();

$loggerProvider = LoggerProvider::builder()
->setLogRecordProcessor(
->addLogRecordProcessor(
new SimpleLogsProcessor($logRecordExporter)
)
->build();
Expand Down
20 changes: 19 additions & 1 deletion src/SDK/Logs/LogRecordProcessorFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,32 @@
use OpenTelemetry\SDK\Common\Configuration\Variables;
use OpenTelemetry\SDK\Common\Time\ClockFactory;
use OpenTelemetry\SDK\Logs\Processor\BatchLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\MultiLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\NoopLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\SimpleLogsProcessor;

class LogRecordProcessorFactory
{
public function create(LogRecordExporterInterface $exporter, ?MeterProviderInterface $meterProvider = null): LogRecordProcessorInterface
{
$name = Configuration::getEnum(Variables::OTEL_PHP_LOGS_PROCESSOR);
$processors = [];
$list = Configuration::getList(Variables::OTEL_PHP_LOGS_PROCESSOR);
foreach ($list as $name) {
$processors[] = $this->createProcessor($name, $exporter, $meterProvider);
}

switch (count($processors)) {
case 0:
return NoopLogsProcessor::getInstance();
case 1:
return $processors[0];
default:
return new MultiLogsProcessor($processors);
}
}

private function createProcessor($name, LogRecordExporterInterface $exporter, ?MeterProviderInterface $meterProvider = null): LogRecordProcessorInterface
{
switch ($name) {
case KnownValues::VALUE_BATCH:
return new BatchLogsProcessor(
Expand Down
2 changes: 1 addition & 1 deletion src/SDK/Logs/LoggerProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public function shutdown(CancellationInterface $cancellation = null): bool

public function forceFlush(CancellationInterface $cancellation = null): bool
{
return $this->loggerSharedState->getProcessor()->forceFlush($cancellation);
return $this->loggerSharedState->forceFlush($cancellation);
}

public static function builder(): LoggerProviderBuilder
Expand Down
29 changes: 20 additions & 9 deletions src/SDK/Logs/LoggerProviderBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,21 @@

use OpenTelemetry\SDK\Common\Attribute\Attributes;
use OpenTelemetry\SDK\Common\Instrumentation\InstrumentationScopeFactory;
use OpenTelemetry\SDK\Logs\Processor\MultiLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\NoopLogsProcessor;
use OpenTelemetry\SDK\Resource\ResourceInfo;

class LoggerProviderBuilder
{
private LogRecordProcessorInterface $processor;
/**
* @var array<LogRecordProcessorInterface>
*/
private array $processors = [];
private ?ResourceInfo $resource = null;

public function __construct()
public function addLogRecordProcessor(LogRecordProcessorInterface $processor): self
{
$this->processor = new NoopLogsProcessor();
}

public function setLogRecordProcessor(LogRecordProcessorInterface $processor): self
{
$this->processor = $processor;
$this->processors[] = $processor;

return $this;
}
Expand All @@ -36,9 +35,21 @@ public function setResource(ResourceInfo $resource): self
public function build(): LoggerProviderInterface
{
return new LoggerProvider(
$this->processor,
$this->buildProcessor(),
new InstrumentationScopeFactory(Attributes::factory()),
$this->resource
);
}

private function buildProcessor(): LogRecordProcessorInterface
{
switch (count($this->processors)) {
case 0:
return NoopLogsProcessor::getInstance();
case 1:
return $this->processors[0];
default:
return new MultiLogsProcessor($this->processors);
}
}
}
15 changes: 11 additions & 4 deletions src/SDK/Logs/LoggerSharedState.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,18 @@ public function getLogRecordLimits(): LogRecordLimits
return $this->limits;
}

/**
* Returns `false` if the provider is already shutdown, otherwise `true`.
*/
public function shutdown(?CancellationInterface $cancellation = null): bool
{
return $this->shutdownResult ?? ($this->shutdownResult = $this->processor->shutdown($cancellation));
if ($this->shutdownResult !== null) {
return $this->shutdownResult;
}
$this->shutdownResult = $this->processor->shutdown($cancellation);

return $this->shutdownResult;
}

public function forceFlush(?CancellationInterface $cancellation = null): bool
{
return $this->processor->forceFlush($cancellation);
}
}
62 changes: 62 additions & 0 deletions src/SDK/Logs/Processor/MultiLogsProcessor.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

declare(strict_types=1);

namespace OpenTelemetry\SDK\Logs\Processor;

use OpenTelemetry\Context\ContextInterface;
use OpenTelemetry\SDK\Common\Future\CancellationInterface;
use OpenTelemetry\SDK\Logs\LogRecordProcessorInterface;
use OpenTelemetry\SDK\Logs\ReadWriteLogRecord;

class MultiLogsProcessor implements LogRecordProcessorInterface
{
// @var LogRecordProcessorInterface[]
private array $processors = [];

public function __construct(array $processors)
{
foreach ($processors as $processor) {
assert($processor instanceof LogRecordProcessorInterface);
$this->processors[] = $processor;
}
}

public function onEmit(ReadWriteLogRecord $record, ?ContextInterface $context = null): void
{
foreach ($this->processors as $processor) {
$processor->onEmit($record, $context);
}
}

/**
* Returns `true` if all processors shut down successfully, else `false`
* Subsequent calls to `shutdown` are a no-op.
*/
public function shutdown(?CancellationInterface $cancellation = null): bool
{
$result = true;
foreach ($this->processors as $processor) {
if (!$processor->shutdown($cancellation)) {
$result = false;
}
}

return $result;
}

/**
* Returns `true` if all processors flush successfully, else `false`.
*/
public function forceFlush(?CancellationInterface $cancellation = null): bool
{
$result = true;
foreach ($this->processors as $processor) {
if (!$processor->forceFlush($cancellation)) {
$result = false;
}
}

return $result;
}
}
9 changes: 5 additions & 4 deletions tests/Unit/SDK/Logs/LogRecordProcessorFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use OpenTelemetry\SDK\Logs\LogRecordExporterInterface;
use OpenTelemetry\SDK\Logs\LogRecordProcessorFactory;
use OpenTelemetry\SDK\Logs\Processor\BatchLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\MultiLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\NoopLogsProcessor;
use OpenTelemetry\SDK\Logs\Processor\SimpleLogsProcessor;
use PHPUnit\Framework\TestCase;
Expand Down Expand Up @@ -54,11 +55,11 @@ public function test_create_invalid(): void
(new LogRecordProcessorFactory())->create($this->createMock(LogRecordExporterInterface::class));
}

public function test_rejects_multiple(): void
public function test_create_multiple(): void
{
$this->setEnvironmentVariable('OTEL_PHP_LOGS_PROCESSOR', 'one,two');
$this->expectException(\InvalidArgumentException::class);
$this->setEnvironmentVariable('OTEL_PHP_LOGS_PROCESSOR', 'batch,simple');
$processor = (new LogRecordProcessorFactory())->create($this->createMock(LogRecordExporterInterface::class));

(new LogRecordProcessorFactory())->create($this->createMock(LogRecordExporterInterface::class));
$this->assertInstanceOf(MultiLogsProcessor::class, $processor);
}
}
2 changes: 1 addition & 1 deletion tests/Unit/SDK/Logs/LoggerProviderBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public function test_builder(): void
$processor = $this->createMock(LogRecordProcessorInterface::class);
$resource = $this->createMock(ResourceInfo::class);
$provider = LoggerProvider::builder()
->setLogRecordProcessor($processor)
->addLogRecordProcessor($processor)
->setResource($resource)
->build();
$this->assertInstanceOf(LoggerProviderInterface::class, $provider);
Expand Down
9 changes: 8 additions & 1 deletion tests/Unit/SDK/Logs/LoggerSharedStateTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,14 @@ public function test_shutdown(): void
$this->processor->expects($this->once())->method('shutdown')->willReturn(true);

$this->assertFalse($this->loggerSharedState->hasShutdown());
$this->loggerSharedState->shutdown();
$this->assertTrue($this->loggerSharedState->shutdown());
$this->assertTrue($this->loggerSharedState->hasShutdown());
}

public function test_force_flush(): void
{
$this->processor->expects($this->once())->method('forceFlush')->willReturn(true);

$this->assertTrue($this->loggerSharedState->forceFlush());
}
}

0 comments on commit a06e5e4

Please sign in to comment.