Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4847](https://github.com/open-telemetry/opentelemetry-python/pull/4847))
- Prevent possible endless recursion from happening in `SimpleLogRecordProcessor.on_emit`,
([#4799](https://github.com/open-telemetry/opentelemetry-python/pull/4799)).
- Make ConcurrentMultiSpanProcessor fork safe
([#4862](https://github.com/open-telemetry/opentelemetry-python/pull/4862))

## Version 1.39.0/0.60b0 (2025-12-03)

Expand Down
12 changes: 12 additions & 0 deletions opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import concurrent.futures
import json
import logging
import os
import threading
import traceback
import typing
import weakref
from os import environ
from time import time_ns
from types import MappingProxyType, TracebackType
Expand Down Expand Up @@ -238,6 +240,16 @@ def __init__(self, num_threads: int = 2):
# iterating through it on "on_start" and "on_end".
self._span_processors = () # type: Tuple[SpanProcessor, ...]
self._lock = threading.Lock()
self._init_executor(num_threads)
if hasattr(os, "register_at_fork"):
# Only the main thread is kept in forked processed, the executor
# needs to be re-instantiated to get a fresh pool of threads:
weak_reinit = weakref.WeakMethod(self._init_executor)
os.register_at_fork(
after_in_child=lambda: weak_reinit()(num_threads)
)

def _init_executor(self, num_threads: int) -> None:
self._executor = concurrent.futures.ThreadPoolExecutor(
max_workers=num_threads
)
Expand Down
71 changes: 71 additions & 0 deletions opentelemetry-sdk/tests/trace/test_span_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
# limitations under the License.

import abc
import gc
import multiprocessing
import os
import time
import typing
import unittest
import weakref
from platform import python_implementation, system
from threading import Event
from typing import Optional
Expand All @@ -26,6 +30,10 @@
from opentelemetry import trace as trace_api
from opentelemetry.context import Context
from opentelemetry.sdk import trace
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)


def span_event_start_fmt(span_processor_name, span_name):
Expand Down Expand Up @@ -486,3 +494,66 @@ def test_force_flush_late_by_span_processor(self):
for mock_processor in mocks:
self.assertEqual(1, mock_processor.force_flush.call_count)
multi_processor.shutdown()

def test_processor_gc(self):
multi_processor = trace.ConcurrentMultiSpanProcessor(5)
weak_ref = weakref.ref(multi_processor)
multi_processor.shutdown()

# When the processor is garbage collected
del multi_processor
gc.collect()

# Then the reference to the processor should no longer exist
self.assertIsNone(
weak_ref(),
"The ConcurrentMultiSpanProcessor object created by this test wasn't garbage collected",
)

@unittest.skipUnless(hasattr(os, "fork"), "needs *nix")
def test_batch_span_processor_fork(self):
multiprocessing_context = multiprocessing.get_context("fork")
tracer_provider = trace.TracerProvider()
tracer = tracer_provider.get_tracer(__name__)
exporter = InMemorySpanExporter()
multi_processor = trace.ConcurrentMultiSpanProcessor(2)
multi_processor.add_span_processor(SimpleSpanProcessor(exporter))
tracer_provider.add_span_processor(multi_processor)

# Use the ConcurrentMultiSpanProcessor in the main process.
# This is necessary in this test to start using the underlying ThreadPoolExecutor and avoid false positive:
with tracer.start_as_current_span("main process before fork span"):
pass
assert (
exporter.get_finished_spans()[-1].name
== "main process before fork span"
)

# The forked ConcurrentMultiSpanProcessor is usable in the child process:
def child(conn):
with tracer.start_as_current_span("child process span"):
pass
conn.send(exporter.get_finished_spans()[-1].name)
conn.close()

parent_conn, child_conn = multiprocessing_context.Pipe()
process = multiprocessing_context.Process(
target=child, args=(child_conn,)
)
process.start()
has_response = parent_conn.poll(timeout=5)
if not has_response:
process.kill()
self.fail(
"The child process did not send any message after 5 seconds, it's very probably locked"
)
process.join(timeout=5)
assert parent_conn.recv() == "child process span"

# The ConcurrentMultiSpanProcessor is still usable in the main process after the child process termination:
with tracer.start_as_current_span("main process after fork span"):
pass
assert (
exporter.get_finished_spans()[-1].name
== "main process after fork span"
)
Loading