Skip to content

Commit d3e165e

Browse files
committed
Merge main into hiroshi/fix-stream-progress
Resolve conflicts by adopting enum-based TextStreamOperationType approach: - Use TextStreamOperationType.fromPBType() for incoming streams - Keep enum type for TextStreamInfo.operationType - Update protocol conversion to use .toPBType() - Maintain consistency with main branch enum approach
1 parent a4c0e45 commit d3e165e

File tree

4 files changed

+43
-36
lines changed

4 files changed

+43
-36
lines changed

lib/src/core/room.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,7 @@ extension DataStreamRoomMethods on Room {
13401340
? streamHeader.textHeader.generated
13411341
: false,
13421342
operationType: streamHeader.textHeader.hasOperationType()
1343-
? TextStreamOperationType.fromString(streamHeader.textHeader.operationType.name)
1343+
? TextStreamOperationType.fromPBType(streamHeader.textHeader.operationType)
13441344
: null,
13451345
);
13461346

lib/src/participant/local.dart

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1152,22 +1152,6 @@ extension RPCMethods on LocalParticipant {
11521152
}
11531153
}
11541154

1155-
/// Helper function to convert string operation type to enum
1156-
lk_models.DataStream_OperationType _stringToOperationType(String? type) {
1157-
switch (type?.toLowerCase()) {
1158-
case 'create':
1159-
return lk_models.DataStream_OperationType.CREATE;
1160-
case 'update':
1161-
return lk_models.DataStream_OperationType.UPDATE;
1162-
case 'delete':
1163-
return lk_models.DataStream_OperationType.DELETE;
1164-
case 'reaction':
1165-
return lk_models.DataStream_OperationType.REACTION;
1166-
default:
1167-
return lk_models.DataStream_OperationType.CREATE;
1168-
}
1169-
}
1170-
11711155
extension DataStreamParticipantMethods on LocalParticipant {
11721156
Future<TextStreamInfo> sendText(String text,
11731157
{SendTextOptions? options}) async {
@@ -1231,40 +1215,43 @@ extension DataStreamParticipantMethods on LocalParticipant {
12311215

12321216
Future<TextStreamWriter> streamText(StreamTextOptions? options) async {
12331217
final streamId = options?.streamId ?? Uuid().v4();
1218+
final timestamp = DateTime.timestamp().millisecondsSinceEpoch;
12341219

12351220
final info = TextStreamInfo(
12361221
id: streamId,
12371222
mimeType: 'text/plain',
1238-
timestamp: DateTime.timestamp().millisecondsSinceEpoch,
1223+
timestamp: timestamp,
12391224
topic: options?.topic ?? '',
12401225
size: options?.totalSize ?? 0,
12411226
replyToStreamId: options?.replyToStreamId,
12421227
attachedStreamIds: options?.attachedStreamIds ?? [],
12431228
version: options?.version,
12441229
generated: options?.generated ?? false,
1245-
operationType: TextStreamOperationType.fromString(options?.type),
1230+
operationType: options?.type,
12461231
);
12471232

12481233
final header = lk_models.DataStream_Header(
12491234
streamId: streamId,
12501235
mimeType: info.mimeType,
12511236
topic: info.topic,
1252-
timestamp: Int64(info.timestamp),
1237+
timestamp: Int64(timestamp),
12531238
totalLength: Int64(options?.totalSize ?? 0),
12541239
attributes: options?.attributes.entries,
12551240
textHeader: lk_models.DataStream_TextHeader(
12561241
version: options?.version,
12571242
attachedStreamIds: options?.attachedStreamIds,
12581243
replyToStreamId: options?.replyToStreamId,
12591244
generated: options?.generated ?? false,
1260-
operationType: _stringToOperationType(options?.type),
1245+
operationType: options?.type?.toPBType(),
12611246
),
12621247
);
1248+
12631249
final destinationIdentities = options?.destinationIdentities;
12641250
final packet = lk_models.DataPacket(
12651251
destinationIdentities: destinationIdentities,
12661252
streamHeader: header,
12671253
);
1254+
12681255
await room.engine.sendDataPacket(packet, reliability: true);
12691256

12701257
final writableStream = WritableStream<String>(
@@ -1332,12 +1319,13 @@ extension DataStreamParticipantMethods on LocalParticipant {
13321319

13331320
Future<ByteStreamWriter> streamBytes(StreamBytesOptions? options) async {
13341321
final streamId = options?.streamId ?? Uuid().v4();
1322+
final timestamp = DateTime.timestamp().millisecondsSinceEpoch;
13351323

13361324
final info = ByteStreamInfo(
13371325
name: options?.name ?? 'unknown',
13381326
id: streamId,
13391327
mimeType: options?.mimeType ?? 'application/octet-stream',
1340-
timestamp: DateTime.timestamp().millisecondsSinceEpoch,
1328+
timestamp: timestamp,
13411329
topic: options?.topic ?? '',
13421330
size: options?.totalSize ?? 0,
13431331
attributes: options?.attributes ?? {},
@@ -1349,7 +1337,7 @@ extension DataStreamParticipantMethods on LocalParticipant {
13491337
streamId: streamId,
13501338
topic: options?.topic,
13511339
encryptionType: options?.encryptionType,
1352-
timestamp: Int64(info.timestamp),
1340+
timestamp: Int64(timestamp),
13531341
byteHeader: lk_models.DataStream_ByteHeader(
13541342
name: info.name,
13551343
),

lib/src/types/data_stream.dart

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import 'dart:async';
22
import 'dart:io' show File;
33

44
import '../data_stream/stream_reader.dart';
5+
import '../proto/livekit_models.pb.dart' as lk_models;
56
import '../proto/livekit_models.pb.dart' show Encryption_Type, DataStream_Chunk;
67

78
const kStreamChunkSize = 15_000;
@@ -53,7 +54,7 @@ class StreamTextOptions {
5354
int? totalSize;
5455

5556
/// 'create' | 'update' | 'delete' | 'reaction'
56-
String? type;
57+
TextStreamOperationType? type;
5758

5859
/// true if the text has been generated by an agent from a participant's audio transcription
5960
bool generated;
@@ -172,21 +173,34 @@ enum TextStreamOperationType {
172173
delete,
173174
reaction;
174175

175-
static TextStreamOperationType? fromString(String? stringValue) {
176-
if (stringValue == null) return null;
177-
switch (stringValue.toUpperCase()) {
178-
case 'CREATE':
176+
static TextStreamOperationType? fromPBType(lk_models.DataStream_OperationType? type) {
177+
if (type == null) return TextStreamOperationType.create;
178+
switch (type) {
179+
case lk_models.DataStream_OperationType.CREATE:
179180
return TextStreamOperationType.create;
180-
case 'UPDATE':
181+
case lk_models.DataStream_OperationType.UPDATE:
181182
return TextStreamOperationType.update;
182-
case 'DELETE':
183+
case lk_models.DataStream_OperationType.DELETE:
183184
return TextStreamOperationType.delete;
184-
case 'REACTION':
185+
case lk_models.DataStream_OperationType.REACTION:
185186
return TextStreamOperationType.reaction;
186187
default:
187188
return null;
188189
}
189190
}
191+
192+
lk_models.DataStream_OperationType toPBType() {
193+
switch (this) {
194+
case TextStreamOperationType.create:
195+
return lk_models.DataStream_OperationType.CREATE;
196+
case TextStreamOperationType.update:
197+
return lk_models.DataStream_OperationType.UPDATE;
198+
case TextStreamOperationType.delete:
199+
return lk_models.DataStream_OperationType.DELETE;
200+
case TextStreamOperationType.reaction:
201+
return lk_models.DataStream_OperationType.REACTION;
202+
}
203+
}
190204
}
191205

192206
class TextStreamInfo extends BaseStreamInfo {

test/core/data_stream_test.dart

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,12 @@ void main() {
157157
});
158158

159159
test('Text Stream With Operation Types', () async {
160-
final operationTypes = ['create', 'update', 'delete', 'reaction'];
160+
final operationTypes = [
161+
TextStreamOperationType.create,
162+
TextStreamOperationType.update,
163+
TextStreamOperationType.delete,
164+
TextStreamOperationType.reaction,
165+
];
161166
final receivedMessages = <String>[];
162167

163168
for (var operationType in operationTypes) {
@@ -176,7 +181,7 @@ void main() {
176181
final stream = await room.localParticipant?.streamText(StreamTextOptions(
177182
topic: 'chat-operations',
178183
type: operationType,
179-
version: operationType == 'update' ? 2 : null,
184+
version: operationType == TextStreamOperationType.update ? 2 : null,
180185
));
181186
await stream?.write('Streamed ${operationType}');
182187
await stream?.close();
@@ -229,7 +234,7 @@ void main() {
229234
// Send a reply to an existing stream
230235
final stream = await room.localParticipant?.streamText(StreamTextOptions(
231236
topic: 'chat-replies',
232-
type: 'create',
237+
type: TextStreamOperationType.create,
233238
streamId: replyStreamId,
234239
replyToStreamId: originalStreamId,
235240
version: 1,
@@ -388,7 +393,7 @@ void main() {
388393
final stream = await room.localParticipant?.streamText(StreamTextOptions(
389394
topic: 'concurrent-streams',
390395
streamId: 'stream-${i}',
391-
type: 'create',
396+
type: TextStreamOperationType.create,
392397
));
393398
await stream?.write('Concurrent message ${i}');
394399
await stream?.close();
@@ -454,7 +459,7 @@ void main() {
454459
// Send a message with comprehensive options
455460
final stream = await room.localParticipant?.streamText(StreamTextOptions(
456461
topic: 'header-validation',
457-
type: 'create',
462+
type: TextStreamOperationType.create,
458463
version: 1,
459464
generated: false,
460465
attributes: {

0 commit comments

Comments
 (0)