Skip to content

Commit

Permalink
expanded exporter APIs with explicitTimeout on force flush and shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bryce-b committed Sep 21, 2023
1 parent bf5f4ec commit c0cd7d3
Show file tree
Hide file tree
Showing 45 changed files with 261 additions and 214 deletions.
6 changes: 3 additions & 3 deletions Sources/Exporters/InMemory/InMemoryExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public class InMemoryExporter: SpanExporter {
return finishedSpanItems
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
public func export(spans: [SpanData], explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}
Expand All @@ -25,7 +25,7 @@ public class InMemoryExporter: SpanExporter {
return .success
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}
Expand All @@ -37,7 +37,7 @@ public class InMemoryExporter: SpanExporter {
finishedSpanItems.removeAll()
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
finishedSpanItems.removeAll()
isRunning = false
}
Expand Down
8 changes: 4 additions & 4 deletions Sources/Exporters/Jaeger/JaegerSpanExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ public class JaegerSpanExporter: SpanExporter {
self.collectorAddress = collectorAddress
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
public func export(spans: [SpanData], explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
var spanList = TList<Span>()
spanList.append(contentsOf: Adapter.toJaeger(spans: spans))
let batch = Batch(process: process, spans: spanList)
let sender = Sender(host: collectorAddress)
let sender = Sender(host: collectorAddress)
let success = sender.sendBatch(batch: batch)
return success ? SpanExporterResultCode.success : SpanExporterResultCode.failure
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
return .success
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ public class OtlpLogExporter : LogRecordExporter {
}
}

public func export(logRecords: [ReadableLogRecord]) -> ExportResult {
public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval?) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}

if config.timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}


Expand All @@ -61,11 +61,11 @@ public class OtlpLogExporter : LogRecordExporter {
}
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
_ = channel.close()
}

public func forceFlush() -> ExportResult {
public func forceFlush(explicitTimeout: TimeInterval?) -> ExportResult {
.success
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon

public class OtlpTraceExporter: SpanExporter {

let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
Expand All @@ -38,31 +39,32 @@ public class OtlpTraceExporter: SpanExporter {
}
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}

if config.timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}
public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

let export = traceClient.export(exportRequest, callOptions: callOptions)
let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}
do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
return .success
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
_ = channel.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import OpenTelemetrySdk
import OpenTelemetryProtocolExporterCommon

public class OtlpTraceJsonExporter: SpanExporter {


// MARK: - Variables declaration
private var exportedSpans = [OtlpSpan]()
Expand All @@ -18,7 +19,7 @@ public class OtlpTraceJsonExporter: SpanExporter {
exportedSpans
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
public func export(spans: [SpanData], explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
guard isRunning else { return .failure }

let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
Expand All @@ -39,7 +40,7 @@ public class OtlpTraceJsonExporter: SpanExporter {
}
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
guard isRunning else { return .failure }
return .success
}
Expand All @@ -48,7 +49,7 @@ public class OtlpTraceJsonExporter: SpanExporter {
exportedSpans.removeAll()
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
exportedSpans.removeAll()
isRunning = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ import OpenTelemetryProtocolExporterCommon
public class OtlpHttpExporterBase {
let endpoint: URL
let httpClient: HTTPClient

public init(endpoint: URL, useSession: URLSession? = nil) {
self.endpoint = endpoint
let envVarHeaders : [(String,String)]?

let config : OtlpConfiguration
public init(endpoint: URL, config: OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.envVarHeaders = envVarHeaders

self.endpoint = endpoint
self.config = config
if let providedSession = useSession {
self.httpClient = HTTPClient(session: providedSession)
} else {
Expand All @@ -35,6 +40,6 @@ public class OtlpHttpExporterBase {
return request
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@ public func defaultOltpHttpLoggingEndpoint() -> URL {
}

public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {

var pendingLogRecords: [ReadableLogRecord] = []

override
public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(), useSession: URLSession? = nil) {
super.init(endpoint: endpoint, useSession: useSession)
override public init(endpoint: URL = defaultOltpHttpLoggingEndpoint(),
config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil,
envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes){
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
}

public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord]) -> OpenTelemetrySdk.ExportResult {
public func export(logRecords: [OpenTelemetrySdk.ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> OpenTelemetrySdk.ExportResult {
pendingLogRecords.append(contentsOf: logRecords)
let sendingLogRecords = pendingLogRecords
pendingLogRecords = []
Expand All @@ -28,7 +31,8 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: sendingLogRecords)
}

let request = createRequest(body: body, endpoint: endpoint)
var request = createRequest(body: body, endpoint: endpoint)
request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude , config.timeout)
httpClient.send(request: request) { [weak self] result in
switch result {
case .success(_):
Expand All @@ -42,20 +46,21 @@ public class OtlpHttpLogExporter : OtlpHttpExporterBase, LogRecordExporter {
return .success
}

public func forceFlush() -> OpenTelemetrySdk.ExportResult {
self.flush()
public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
self.flush(explicitTimeout: explicitTimeout)
}

public func flush() -> ExportResult {
public func flush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
var exporterResult: ExportResult = .success

if !pendingLogRecords.isEmpty {
let body = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: pendingLogRecords)
}
let semaphore = DispatchSemaphore(value: 0)
let request = createRequest(body: body, endpoint: endpoint)

var request = createRequest(body: body, endpoint: endpoint)
request.timeoutInterval = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude , config.timeout)

httpClient.send(request: request) { result in
switch result {
case .success(_):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ public class OtlpHttpMetricExporter: OtlpHttpExporterBase, MetricExporter {
var pendingMetrics: [Metric] = []

override
public init(endpoint: URL = defaultOltpHTTPMetricsEndpoint(), useSession: URLSession? = nil) {
super.init(endpoint: endpoint, useSession: useSession)
public init(endpoint: URL = defaultOltpHTTPMetricsEndpoint(), config : OtlpConfiguration = OtlpConfiguration(), useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession, envVarHeaders: envVarHeaders)
}

public func export(metrics: [Metric], shouldCancel: (() -> Bool)?) -> MetricExporterResultCode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,34 @@ public func defaultOltpHttpTracesEndpoint() -> URL {
}

public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {
var pendingSpans: [SpanData] = []


var pendingSpans: [SpanData] = []
override
public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), useSession: URLSession? = nil) {
super.init(endpoint: endpoint, useSession: useSession)
public init(endpoint: URL = defaultOltpHttpTracesEndpoint(), config: OtlpConfiguration = OtlpConfiguration(),
useSession: URLSession? = nil, envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
super.init(endpoint: endpoint, config: config, useSession: useSession)
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
public func export(spans: [SpanData], explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
pendingSpans.append(contentsOf: spans)
let sendingSpans = pendingSpans
pendingSpans = []

let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}
let request = createRequest(body: body, endpoint: endpoint)
var request = createRequest(body: body, endpoint: endpoint)
if let headers = envVarHeaders {
headers.forEach { (key, value) in
request.addValue(value, forHTTPHeaderField: key)
}

} else if let headers = config.headers {
headers.forEach { (key, value) in
request.addValue(value, forHTTPHeaderField: key)
}
}
httpClient.send(request: request) { [weak self] result in
switch result {
case .success:
Expand All @@ -40,7 +52,7 @@ public class OtlpHttpTraceExporter: OtlpHttpExporterBase, SpanExporter {
return .success
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
var resultValue: SpanExporterResultCode = .success
if !pendingSpans.isEmpty {
let body = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
Expand Down
6 changes: 3 additions & 3 deletions Sources/Exporters/Persistence/Export/DataExportWorker.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import Foundation
// a protocol for an exporter of `Data` to which a `DataExportWorker` can delegate persisted
// data export
internal protocol DataExporter {
func export(data: Data) -> DataExportStatus
func export(data: Data, explicitTimeout: TimeInterval?) -> DataExportStatus
}

// a protocol needed for mocking `DataExportWorker`
Expand Down Expand Up @@ -52,7 +52,7 @@ internal class DataExportWorker: DataExportWorkerProtocol {
let nextBatch = isSystemReady ? self.fileReader.readNextBatch() : nil
if let batch = nextBatch {
// Export batch
let exportStatus = self.dataExporter.export(data: batch.data)
let exportStatus = self.dataExporter.export(data: batch.data, explicitTimeout: nil)

// Delete or keep batch depending on the export status
if exportStatus.needsRetry {
Expand Down Expand Up @@ -86,7 +86,7 @@ internal class DataExportWorker: DataExportWorkerProtocol {
internal func flush() -> Bool {
let success = queue.sync {
self.fileReader.onRemainingBatches {
let exportStatus = self.dataExporter.export(data: $0.data)
let exportStatus = self.dataExporter.export(data: $0.data, explicitTimeout: nil)
if !exportStatus.needsRetry {
self.fileReader.markBatchAsRead($0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import OpenTelemetrySdk
protocol DecoratedExporter {
associatedtype SignalType

func export(values: [SignalType]) -> DataExportStatus
func export(values: [SignalType], explicitTimeout: TimeInterval?) -> DataExportStatus
}

// a generic decorator of `DecoratedExporter` adding filesystem persistence of batches of `[T.SignalType]`.
Expand All @@ -25,7 +25,7 @@ internal class PersistenceExporterDecorator<T> where T: DecoratedExporter, T.Sig
self.decoratedExporter = decoratedExporter
}

func export(data: Data) -> DataExportStatus {
func export(data: Data, explicitTimeout: TimeInterval?) -> DataExportStatus {
// decode batches of `[T.SignalType]` from the raw data.
// the data is made of batches of comma-suffixed JSON arrays, so in order to utilize
// `JSONDecoder`, add a "[" prefix and "null]" suffix making the data a valid
Expand All @@ -41,7 +41,7 @@ internal class PersistenceExporterDecorator<T> where T: DecoratedExporter, T.Sig
from: arrayData
).compactMap { $0 }.flatMap { $0 }

return decoratedExporter.export(values: exportables)
return decoratedExporter.export(values: exportables, explicitTimeout: explicitTimeout)
} catch {
return DataExportStatus(needsRetry: false)
}
Expand Down
Loading

0 comments on commit c0cd7d3

Please sign in to comment.