-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathNetworking.swift
110 lines (87 loc) · 3.18 KB
/
Networking.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import Combine
import CoreMedia
import Foundation
import MultipeerConnectivity
import RealtimeStreaming
import Transcoding
// MARK: - Networking
@Observable final class Networking: NSObject {
// MARK: Lifecycle
override init() {
super.init()
videoEncoderTask = Task {
for await data in videoEncoderAnnexBAdaptor.annexBData {
try? await realtimeStreaming.send(data: data, messageType: .hevcData)
}
}
videoDecoderTask = Task {
for await decodedSampleBuffer in videoDecoder.decodedSampleBuffers {
if let pixelBuffer = decodedSampleBuffer.imageBuffer {
for continuation in pixelBufferContinuations.values {
continuation.yield(pixelBuffer)
}
}
}
}
receivedMessageTask = Task {
for await (data, _) in realtimeStreaming.receivedMessages {
videoDecoderAnnexBAdaptor.decode(data)
}
}
}
// MARK: Internal
var isConnected: Bool { realtimeStreaming.isConnected }
@ObservationIgnored var pixelBuffers: AsyncStream<CVPixelBuffer> {
.init { continuation in
let id = UUID()
pixelBufferContinuations[id] = continuation
continuation.onTermination = { [weak self] _ in
self?.pixelBufferContinuations[id] = nil
}
}
}
func attemptToReconnect() {
realtimeStreaming.attemptToReconnect()
}
func disconnect() {
realtimeStreaming.disconnect()
}
func setBitrate(_ bitrate: Int) {
videoEncoder.config.averageBitRate = bitrate
}
func setExpectedFrameRate(_ frameRate: Int) {
videoEncoder.config.expectedFrameRate = frameRate
}
func send(_ pixelBuffer: CVPixelBuffer) {
guard isConnected else { return }
videoEncoder.encode(pixelBuffer)
}
func startBrowsing() {
realtimeStreaming.startBrowsing(for: Constants.bonjourServiceType)
}
func startAdvertising() {
realtimeStreaming.startListening(for: Constants.bonjourServiceType)
}
func stopAdvertising() {
realtimeStreaming.stopListening()
}
func resetEncoder() {
videoEncoder.invalidate()
}
// MARK: Private
private let realtimeStreaming = RealtimeStreaming()
private var videoEncoderTask: Task<Void, Never>?
private var videoDecoderTask: Task<Void, Never>?
private var receivedMessageTask: Task<Void, Never>?
@ObservationIgnored private lazy var videoEncoder = VideoEncoder(config: .ultraLowLatency)
@ObservationIgnored private lazy var videoEncoderAnnexBAdaptor = VideoEncoderAnnexBAdaptor(
videoEncoder: videoEncoder
)
@ObservationIgnored private lazy var videoDecoder = VideoDecoder(config: .init())
@ObservationIgnored private lazy var videoDecoderAnnexBAdaptor = VideoDecoderAnnexBAdaptor(
videoDecoder: videoDecoder,
codec: .hevc
)
@ObservationIgnored private var pixelBufferContinuations: [UUID: AsyncStream<CVPixelBuffer>.Continuation] = [:]
@ObservationIgnored private var cancellables = Set<AnyCancellable>()
}