Skip to content

Commit 7932c20

Browse files
committed
Add missing functionality.
1 parent cc21aa6 commit 7932c20

24 files changed

+1891
-60
lines changed

packages/a2a_dart/example/bin/server.dart

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
import 'dart:async';
66

77
import 'package:a2a_dart/a2a_dart.dart';
8+
import 'package:a2a_dart/src/core/push_notification.dart';
9+
import 'package:a2a_dart/src/server/delete_push_config_handler.dart';
10+
import 'package:a2a_dart/src/server/get_push_config_handler.dart';
11+
import 'package:a2a_dart/src/server/list_push_configs_handler.dart';
12+
import 'package:a2a_dart/src/server/set_push_config_handler.dart';
813
import 'package:logging/logging.dart';
914
import 'package:uuid/uuid.dart';
1015

@@ -29,7 +34,12 @@ void main() async {
2934
ListTasksHandler(taskManager),
3035
CancelTaskHandler(taskManager),
3136
ResubscribeHandler(taskManager),
37+
SetPushConfigHandler(taskManager),
38+
GetPushConfigHandler(taskManager),
39+
ListPushConfigsHandler(taskManager),
40+
DeletePushConfigHandler(taskManager),
3241
],
42+
taskManager,
3343
port: 8080,
3444
agentCard: agentCard,
3545
);
@@ -45,6 +55,7 @@ class CountdownTaskManager implements TaskManager {
4555
final Map<String, StreamController<Map<String, Object?>>> _controllers =
4656
<String, StreamController<Map<String, Object?>>>{};
4757
final Set<String> _pausedTasks = {};
58+
final _pushConfigs = <String, Map<String, PushNotificationConfig>>{};
4859

4960
Stream<Map<String, Object?>> startCountdown(Task task, int countdownStart) {
5061
final controller = StreamController<Map<String, Object?>>.broadcast();
@@ -155,6 +166,50 @@ class CountdownTaskManager implements TaskManager {
155166
}
156167

157168
bool isPaused(String taskId) => _pausedTasks.contains(taskId);
169+
170+
@override
171+
Future<void> setPushNotificationConfig(
172+
String taskId,
173+
PushNotificationConfig config,
174+
) async {
175+
if (!_tasks.containsKey(taskId)) {
176+
throw A2AServerException('Task not found', -32001);
177+
}
178+
_pushConfigs.putIfAbsent(taskId, () => {});
179+
_pushConfigs[taskId]![config.id!] = config;
180+
}
181+
182+
@override
183+
Future<PushNotificationConfig?> getPushNotificationConfig(
184+
String taskId,
185+
String configId,
186+
) async {
187+
if (!_tasks.containsKey(taskId)) {
188+
throw A2AServerException('Task not found', -32001);
189+
}
190+
return _pushConfigs[taskId]?[configId];
191+
}
192+
193+
@override
194+
Future<List<PushNotificationConfig>> listPushNotificationConfigs(
195+
String taskId,
196+
) async {
197+
if (!_tasks.containsKey(taskId)) {
198+
throw A2AServerException('Task not found', -32001);
199+
}
200+
return _pushConfigs[taskId]?.values.toList() ?? [];
201+
}
202+
203+
@override
204+
Future<void> deletePushNotificationConfig(
205+
String taskId,
206+
String configId,
207+
) async {
208+
if (!_tasks.containsKey(taskId)) {
209+
throw A2AServerException('Task not found', -32001);
210+
}
211+
_pushConfigs[taskId]?.remove(configId);
212+
}
158213
}
159214

160215
class MessageSendHandler extends RequestHandler {

packages/a2a_dart/lib/src/client/a2a_client.dart

Lines changed: 113 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import '../core/events.dart';
1111
import '../core/list_tasks_params.dart';
1212
import '../core/list_tasks_result.dart';
1313
import '../core/message.dart';
14+
import '../core/push_notification.dart';
1415
import '../core/task.dart';
1516
import 'a2a_exception.dart';
1617
import 'http_transport.dart';
@@ -120,7 +121,10 @@ class A2AClient {
120121
),
121122
);
122123
} else {
123-
sink.add(Event.fromJson(data));
124+
if (data['kind'] != null) {
125+
// Skip keepalive events
126+
sink.add(Event.fromJson(data));
127+
}
124128
}
125129
},
126130
),
@@ -187,7 +191,6 @@ class A2AClient {
187191
'method': 'tasks/resubscribe',
188192
'params': {'id': taskId},
189193
})
190-
.where((data) => data['kind'] != null)
191194
.map((data) {
192195
_log?.fine('Received event from stream: $data');
193196
return Event.fromJson(data);
@@ -198,4 +201,112 @@ class A2AClient {
198201
void close() {
199202
_transport.close();
200203
}
204+
205+
/// Sets or updates the push notification configuration for a task.
206+
Future<TaskPushNotificationConfig> setPushNotificationConfig(
207+
TaskPushNotificationConfig params,
208+
) async {
209+
_log?.info('Setting push notification config for task: ${params.taskId}');
210+
final response = await _transport.send({
211+
'jsonrpc': '2.0',
212+
'method': 'tasks/pushNotificationConfig/set',
213+
'params': params.toJson(),
214+
'id': 0,
215+
});
216+
_log?.fine(
217+
'Received response from tasks/pushNotificationConfig/set: $response',
218+
);
219+
if (response.containsKey('error')) {
220+
final error = response['error'] as Map<String, Object?>;
221+
throw A2AException.jsonRpc(
222+
code: error['code'] as int,
223+
message: error['message'] as String,
224+
);
225+
}
226+
return TaskPushNotificationConfig.fromJson(
227+
response['result'] as Map<String, Object?>,
228+
);
229+
}
230+
231+
/// Retrieves a specific push notification configuration for a task.
232+
Future<TaskPushNotificationConfig> getPushNotificationConfig(
233+
String taskId,
234+
String configId,
235+
) async {
236+
_log?.info('Getting push notification config $configId for task: $taskId');
237+
final response = await _transport.send({
238+
'jsonrpc': '2.0',
239+
'method': 'tasks/pushNotificationConfig/get',
240+
'params': {'id': taskId, 'pushNotificationConfigId': configId},
241+
'id': 0,
242+
});
243+
_log?.fine(
244+
'Received response from tasks/pushNotificationConfig/get: $response',
245+
);
246+
if (response.containsKey('error')) {
247+
final error = response['error'] as Map<String, Object?>;
248+
throw A2AException.jsonRpc(
249+
code: error['code'] as int,
250+
message: error['message'] as String,
251+
);
252+
}
253+
return TaskPushNotificationConfig.fromJson(
254+
response['result'] as Map<String, Object?>,
255+
);
256+
}
257+
258+
/// Lists all push notification configurations for a task.
259+
Future<List<PushNotificationConfig>> listPushNotificationConfigs(
260+
String taskId,
261+
) async {
262+
_log?.info('Listing push notification configs for task: $taskId');
263+
final response = await _transport.send({
264+
'jsonrpc': '2.0',
265+
'method': 'tasks/pushNotificationConfig/list',
266+
'params': {'id': taskId},
267+
'id': 0,
268+
});
269+
_log?.fine(
270+
'Received response from tasks/pushNotificationConfig/list: $response',
271+
);
272+
if (response.containsKey('error')) {
273+
final error = response['error'] as Map<String, Object?>;
274+
throw A2AException.jsonRpc(
275+
code: error['code'] as int,
276+
message: error['message'] as String,
277+
);
278+
}
279+
final result = response['result'] as Map<String, Object?>;
280+
final configs = result['configs'] as List<Object?>;
281+
return configs
282+
.map(
283+
(item) =>
284+
PushNotificationConfig.fromJson(item as Map<String, Object?>),
285+
)
286+
.toList();
287+
}
288+
289+
/// Deletes a specific push notification configuration for a task.
290+
Future<void> deletePushNotificationConfig(
291+
String taskId,
292+
String configId,
293+
) async {
294+
_log?.info('Deleting push notification config $configId for task: $taskId');
295+
final response = await _transport.send({
296+
'jsonrpc': '2.0',
297+
'method': 'tasks/pushNotificationConfig/delete',
298+
'params': {'id': taskId, 'pushNotificationConfigId': configId},
299+
'id': 0,
300+
});
301+
_log?.fine(
302+
'Received response from tasks/pushNotificationConfig/delete: $response',
303+
);
304+
if (response.containsKey('error')) {
305+
final error = response['error'] as Map<String, Object?>;
306+
throw A2AException.jsonRpc(
307+
code: error['code'] as int,
308+
message: error['message'] as String,
309+
);
310+
}
311+
}
201312
}

packages/a2a_dart/lib/src/client/sse_parser.dart

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,37 +28,42 @@ class SseParser {
2828
try {
2929
await for (final line in lines) {
3030
log?.finer('Received SSE line: $line');
31-
if (line.isEmpty) {
32-
if (data.isNotEmpty && data.first.isNotEmpty) {
33-
// SSE keep-alive
31+
if (line.startsWith('data:')) {
32+
data.add(line.substring(5).trim());
33+
} else if (line.startsWith(':')) {
34+
// Ignore comments (used for keepalives)
35+
log?.finest('Ignoring SSE comment: $line');
36+
} else if (line.isEmpty) {
37+
// Event boundary
38+
if (data.isNotEmpty) {
3439
final dataString = data.join('\n');
35-
data = [];
36-
try {
37-
final jsonData = jsonDecode(dataString) as Map<String, Object?>;
38-
log?.finer('Parsed JSON: $jsonData');
39-
if (jsonData.containsKey('result')) {
40-
final result = jsonData['result'];
41-
if (result != null) {
42-
yield result as Map<String, Object?>;
43-
} else {
44-
log?.warning('Received a null result in the SSE stream.');
40+
data = []; // Clear for next event
41+
if (dataString.isNotEmpty) {
42+
try {
43+
final jsonData = jsonDecode(dataString) as Map<String, Object?>;
44+
log?.finer('Parsed JSON: $jsonData');
45+
if (jsonData.containsKey('result')) {
46+
final result = jsonData['result'];
47+
if (result != null) {
48+
yield result as Map<String, Object?>;
49+
} else {
50+
log?.warning('Received a null result in the SSE stream.');
51+
}
52+
} else if (jsonData.containsKey('error')) {
53+
final error = jsonData['error'] as Map<String, Object?>;
54+
throw A2AException.jsonRpc(
55+
code: error['code'] as int,
56+
message: error['message'] as String,
57+
data: error['data'] as Map<String, Object?>?,
58+
);
4559
}
46-
} else if (jsonData.containsKey('error')) {
47-
final error = jsonData['error'] as Map<String, Object?>;
48-
throw A2AException.jsonRpc(
49-
code: error['code'] as int,
50-
message: error['message'] as String,
51-
data: error['data'] as Map<String, Object?>?,
52-
);
60+
} catch (e) {
61+
throw A2AException.parsing(message: e.toString());
5362
}
54-
} catch (e) {
55-
throw A2AException.parsing(message: e.toString());
5663
}
5764
}
58-
} else if (line.startsWith('data:')) {
59-
data.add(line.substring(5).trim());
60-
} else if (line.startsWith(':')) {
61-
// Ignore comments.
65+
} else {
66+
log?.warning('Ignoring unexpected SSE line: $line');
6267
}
6368
}
6469
// ignore: avoid_catching_errors

packages/a2a_dart/lib/src/core/events.dart

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import 'package:freezed_annotation/freezed_annotation.dart';
66

7-
import 'part.dart';
87
import 'task.dart';
98

109
part 'events.freezed.dart';
@@ -106,21 +105,5 @@ sealed class Event with _$Event {
106105
}) = TaskArtifactUpdate;
107106

108107
/// Creates an [Event] from a JSON object.
109-
factory Event.fromJson(Map<String, Object?> json) {
110-
if (json['kind'] == null) {
111-
// This can happen with SSE keep-alive messages.
112-
// We'll just return an empty text event.
113-
return const Event.taskArtifactUpdate(
114-
taskId: '',
115-
contextId: '',
116-
artifact: Artifact(
117-
artifactId: '',
118-
parts: [Part.text(text: '')],
119-
),
120-
append: false,
121-
lastChunk: false,
122-
);
123-
}
124-
return _$EventFromJson(json);
125-
}
108+
factory Event.fromJson(Map<String, Object?> json) => _$EventFromJson(json);
126109
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright 2025 The Flutter Authors.
2+
// Use of this source code is governed by a BSD-style license that can be
3+
// found in the LICENSE file.
4+
5+
import 'package:freezed_annotation/freezed_annotation.dart';
6+
7+
part 'push_notification.freezed.dart';
8+
part 'push_notification.g.dart';
9+
10+
/// Defines the configuration for setting up push notifications for task
11+
/// updates.
12+
@freezed
13+
abstract class PushNotificationConfig with _$PushNotificationConfig {
14+
/// Creates a [PushNotificationConfig].
15+
const factory PushNotificationConfig({
16+
/// A unique identifier (e.g. UUID) for the push notification configuration,
17+
/// set by the client to support multiple notification callbacks.
18+
String? id,
19+
20+
/// The callback URL where the agent should send push notifications.
21+
required String url,
22+
23+
/// A unique token for this task or session to validate incoming push
24+
/// notifications.
25+
String? token,
26+
27+
/// Optional authentication details for the agent to use when calling the
28+
/// notification URL.
29+
PushNotificationAuthenticationInfo? authentication,
30+
}) = _PushNotificationConfig;
31+
32+
/// Creates a [PushNotificationConfig] from a JSON object.
33+
factory PushNotificationConfig.fromJson(Map<String, Object?> json) =>
34+
_$PushNotificationConfigFromJson(json);
35+
}
36+
37+
/// Defines authentication details for a push notification endpoint.
38+
@freezed
39+
abstract class PushNotificationAuthenticationInfo
40+
with _$PushNotificationAuthenticationInfo {
41+
/// Creates a [PushNotificationAuthenticationInfo].
42+
const factory PushNotificationAuthenticationInfo({
43+
/// A list of supported authentication schemes (e.g., 'Basic', 'Bearer').
44+
required List<String> schemes,
45+
46+
/// Optional credentials required by the push notification endpoint.
47+
String? credentials,
48+
}) = _PushNotificationAuthenticationInfo;
49+
50+
/// Creates a [PushNotificationAuthenticationInfo] from a JSON object.
51+
factory PushNotificationAuthenticationInfo.fromJson(
52+
Map<String, Object?> json,
53+
) => _$PushNotificationAuthenticationInfoFromJson(json);
54+
}
55+
56+
/// A container associating a push notification configuration with a specific
57+
/// task.
58+
@freezed
59+
abstract class TaskPushNotificationConfig with _$TaskPushNotificationConfig {
60+
/// Creates a [TaskPushNotificationConfig].
61+
const factory TaskPushNotificationConfig({
62+
/// The unique identifier (e.g. UUID) of the task.
63+
required String taskId,
64+
65+
/// The push notification configuration for this task.
66+
required PushNotificationConfig pushNotificationConfig,
67+
}) = _TaskPushNotificationConfig;
68+
69+
/// Creates a [TaskPushNotificationConfig] from a JSON object.
70+
factory TaskPushNotificationConfig.fromJson(Map<String, Object?> json) =>
71+
_$TaskPushNotificationConfigFromJson(json);
72+
}

0 commit comments

Comments
 (0)