Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expanded exporter APIs with explicitTimeout on force flush and shutdown #460

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Sources/Exporters/DatadogExporter/DatadogExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class DatadogExporter: SpanExporter, MetricExporter {
metricsExporter = try MetricsExporter(config: configuration)
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
public func export(spans: [SpanData], explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
spans.forEach {
if $0.traceFlags.sampled || configuration.exportUnsampledSpans {
spansExporter?.exportSpan(span: $0)
Expand All @@ -38,7 +38,7 @@ public class DatadogExporter: SpanExporter, MetricExporter {
return .success
}

public func flush() -> SpanExporterResultCode {
public func flush(explicitTimeout: TimeInterval?) -> SpanExporterResultCode {
spansExporter?.tracesStorage.writer.queue.sync {}
logsExporter?.logsStorage.writer.queue.sync {}
metricsExporter?.metricsStorage.writer.queue.sync {}
Expand All @@ -49,7 +49,7 @@ public class DatadogExporter: SpanExporter, MetricExporter {
return .success
}

public func shutdown() {
public func shutdown(explicitTimeout: TimeInterval?) {
_ = self.flush()
}

Expand Down
64 changes: 32 additions & 32 deletions Sources/Exporters/InMemory/InMemoryExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,38 @@ import Foundation
import OpenTelemetrySdk

public class InMemoryExporter: SpanExporter {
private var finishedSpanItems: [SpanData] = []
private var isRunning: Bool = true

public init() {}

public func getFinishedSpanItems() -> [SpanData] {
return finishedSpanItems
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

finishedSpanItems.append(contentsOf: spans)
return .success
private var finishedSpanItems: [SpanData] = []
private var isRunning: Bool = true

public init() {}

public func getFinishedSpanItems() -> [SpanData] {
return finishedSpanItems
}

public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

public func flush() -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

return .success
}

public func reset() {
finishedSpanItems.removeAll()
}

public func shutdown() {
finishedSpanItems.removeAll()
isRunning = false

finishedSpanItems.append(contentsOf: spans)
return .success
}

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
guard isRunning else {
return .failure
}

return .success
}

public func reset() {
finishedSpanItems.removeAll()
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
finishedSpanItems.removeAll()
isRunning = false
}
}
46 changes: 23 additions & 23 deletions Sources/Exporters/Jaeger/JaegerSpanExporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,29 @@
import Thrift

public class JaegerSpanExporter: SpanExporter {
let collectorAddress: String
let process: Process

public init(serviceName: String, collectorAddress: String) {
process = Process(serviceName: serviceName, tags: TList<Tag>())
self.collectorAddress = collectorAddress
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
var spanList = TList<Span>()
spanList.append(contentsOf: Adapter.toJaeger(spans: spans))
let batch = Batch(process: process, spans: spanList)
let sender = Sender(host: collectorAddress)
let success = sender.sendBatch(batch: batch)
return success ? SpanExporterResultCode.success : SpanExporterResultCode.failure
}

public func flush() -> SpanExporterResultCode {
return .success
}

public func shutdown() {
}
let collectorAddress: String
let process: Process
public init(serviceName: String, collectorAddress: String) {
process = Process(serviceName: serviceName, tags: TList<Tag>())
self.collectorAddress = collectorAddress
}

Check warning on line 19 in Sources/Exporters/Jaeger/JaegerSpanExporter.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Exporters/Jaeger/JaegerSpanExporter.swift#L16-L19

Added lines #L16 - L19 were not covered by tests
public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
var spanList = TList<Span>()
spanList.append(contentsOf: Adapter.toJaeger(spans: spans))
let batch = Batch(process: process, spans: spanList)
let sender = Sender(host: collectorAddress)
let success = sender.sendBatch(batch: batch)
return success ? SpanExporterResultCode.success : SpanExporterResultCode.failure
}

Check warning on line 28 in Sources/Exporters/Jaeger/JaegerSpanExporter.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Exporters/Jaeger/JaegerSpanExporter.swift#L21-L28

Added lines #L21 - L28 were not covered by tests
public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

Check warning on line 32 in Sources/Exporters/Jaeger/JaegerSpanExporter.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Exporters/Jaeger/JaegerSpanExporter.swift#L30-L32

Added lines #L30 - L32 were not covered by tests
public func shutdown(explicitTimeout: TimeInterval? = nil) {
}

Check warning on line 35 in Sources/Exporters/Jaeger/JaegerSpanExporter.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Exporters/Jaeger/JaegerSpanExporter.swift#L34-L35

Added lines #L34 - L35 were not covered by tests
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -13,59 +13,59 @@
import OpenTelemetryProtocolExporterCommon

public class OtlpLogExporter : LogRecordExporter {
let channel : GRPCChannel
var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}
else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}
let channel : GRPCChannel
var logClient : Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel,
config: OtlpConfiguration = OtlpConfiguration(),
logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }),
envVarHeaders: [(String, String)]? = EnvVarHeaders.attributes){
self.channel = channel
logClient = Opentelemetry_Proto_Collector_Logs_V1_LogsServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
}

public func export(logRecords: [ReadableLogRecord]) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}

if config.timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}


let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure
}
else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}

public func shutdown() {
_ = channel.close()
}

public func export(logRecords: [ReadableLogRecord], explicitTimeout: TimeInterval? = nil) -> ExportResult {
let logRequest = Opentelemetry_Proto_Collector_Logs_V1_ExportLogsServiceRequest.with { request in
request.resourceLogs = LogRecordAdapter.toProtoResourceRecordLog(logRecordList: logRecords)
}

public func forceFlush() -> ExportResult {
.success
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}


let export = logClient.export(logRequest, callOptions: callOptions)
do {
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}

public func forceFlush(explicitTimeout: TimeInterval? = nil) -> ExportResult {
.success
}

Check warning on line 70 in Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Exporters/OpenTelemetryProtocolGrpc/logs/OtlpLogExporter.swift#L68-L70

Added lines #L68 - L70 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,58 @@
import OpenTelemetryProtocolExporterCommon

public class OtlpTraceExporter: SpanExporter {
let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}

let channel: GRPCChannel
var traceClient: Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient
let config : OtlpConfiguration
var callOptions : CallOptions

public init(channel: GRPCChannel, config: OtlpConfiguration = OtlpConfiguration(), logger: Logging.Logger = Logging.Logger(label: "io.grpc", factory: { _ in SwiftLogNoOpLogHandler() }), envVarHeaders: [(String,String)]? = EnvVarHeaders.attributes) {
self.channel = channel
traceClient = Opentelemetry_Proto_Collector_Trace_V1_TraceServiceNIOClient(channel: channel)
self.config = config
let userAgentHeader = (Constants.HTTP.userAgent, Headers.getUserAgentHeader())
if let headers = envVarHeaders {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else if let headers = config.headers {
var updatedHeaders = headers
updatedHeaders.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(updatedHeaders), logger: logger)
} else {
var headers = [(String, String)]()
headers.append(userAgentHeader)
callOptions = CallOptions(customMetadata: HPACKHeaders(headers), logger: logger)
}

public func export(spans: [SpanData]) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}

if config.timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(config.timeout.toNanoseconds)))
}

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}


public func export(spans: [SpanData], explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
let exportRequest = Opentelemetry_Proto_Collector_Trace_V1_ExportTraceServiceRequest.with {
$0.resourceSpans = SpanAdapter.toProtoResourceSpans(spanDataList: spans)
}

public func flush() -> SpanExporterResultCode {
return .success
let timeout = min(explicitTimeout ?? TimeInterval.greatestFiniteMagnitude, config.timeout)
if timeout > 0 {
callOptions.timeLimit = TimeLimit.timeout(TimeAmount.nanoseconds(Int64(timeout.toNanoseconds)))
}

public func shutdown() {
_ = channel.close()

let export = traceClient.export(exportRequest, callOptions: callOptions)

do {
// wait() on the response to stop the program from exiting before the response is received.
_ = try export.response.wait()
return .success
} catch {
return .failure
}
}

public func flush(explicitTimeout: TimeInterval? = nil) -> SpanExporterResultCode {
return .success
}

Check warning on line 65 in Sources/Exporters/OpenTelemetryProtocolGrpc/trace/OtlpTraceExporter.swift

View check run for this annotation

Codecov / codecov/patch

Sources/Exporters/OpenTelemetryProtocolGrpc/trace/OtlpTraceExporter.swift#L63-L65

Added lines #L63 - L65 were not covered by tests

public func shutdown(explicitTimeout: TimeInterval? = nil) {
_ = channel.close()
}
}
Loading
Loading