Skip to content

Commit 7661566

Browse files
feat: stream manager debugging support (#35)
Co-authored-by: Michael Dombrowski <[email protected]>
1 parent 02ff551 commit 7661566

25 files changed

+5041
-24
lines changed

pom.xml

+23-3
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,31 @@
8080
<version>5.5.2</version>
8181
<scope>test</scope>
8282
</dependency>
83+
<dependency>
84+
<groupId>com.amazonaws.greengrass</groupId>
85+
<artifactId>streammanager</artifactId>
86+
<version>1.1.0</version>
87+
<exclusions>
88+
<exclusion>
89+
<groupId>org.slf4j</groupId>
90+
<artifactId>slf4j-api</artifactId>
91+
</exclusion>
92+
</exclusions>
93+
</dependency>
8394
<dependency>
8495
<groupId>com.fasterxml.jackson.dataformat</groupId>
85-
<artifactId>jackson-dataformat-yaml</artifactId>
86-
<version>2.10.1</version>
87-
<scope>provided</scope>
96+
<artifactId>jackson-dataformat-cbor</artifactId>
97+
<version>2.13.3</version>
98+
<exclusions>
99+
<exclusion>
100+
<groupId>com.fasterxml.jackson.core</groupId>
101+
<artifactId>jackson-core</artifactId>
102+
</exclusion>
103+
<exclusion>
104+
<groupId>com.fasterxml.jackson.core</groupId>
105+
<artifactId>jackson-databind</artifactId>
106+
</exclusion>
107+
</exclusions>
88108
</dependency>
89109
<dependency>
90110
<groupId>org.mockito</groupId>

src/main/java/com/aws/greengrass/localdebugconsole/APICalls.java

+35
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,39 @@ public enum APICalls {
9696
* Utility called by the client to unsubscribe to a local IPC topic.
9797
*/
9898
unsubscribeToPubSubTopic,
99+
100+
/**
101+
* Returns the current Stream Manager streams list
102+
*/
103+
streamManagerListStreams,
104+
105+
/**
106+
* Describes a message stream to get metadata including the stream’s definition, size, and exporter statuses.
107+
*/
108+
streamManagerDescribeStream,
109+
110+
/**
111+
* Deletes a message stream based on its name.
112+
*/
113+
streamManagerDeleteMessageStream,
114+
115+
/**
116+
* Read message(s) from a chosen stream with options. If no options are specified it will try to read 1 message from the stream.
117+
*/
118+
streamManagerReadMessages,
119+
120+
/**
121+
* Append a message into the specified message stream.
122+
*/
123+
streamManagerAppendMessage,
124+
125+
/**
126+
* Create a message stream with a given definition.
127+
*/
128+
streamManagerCreateMessageStream,
129+
130+
/**
131+
* Update a message stream with a given definition.
132+
*/
133+
streamManagerUpdateMessageStream,
99134
}

src/main/java/com/aws/greengrass/localdebugconsole/DashboardServer.java

+149-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package com.aws.greengrass.localdebugconsole;
77

8+
import com.amazonaws.greengrass.streammanager.model.MessageStreamDefinition;
89
import com.aws.greengrass.builtin.services.pubsub.PubSubIPCEventStreamAgent;
910
import com.aws.greengrass.builtin.services.pubsub.PublishEvent;
1011
import com.aws.greengrass.builtin.services.pubsub.SubscribeRequest;
@@ -16,6 +17,7 @@
1617
import com.aws.greengrass.localdebugconsole.messageutils.MessageType;
1718
import com.aws.greengrass.localdebugconsole.messageutils.PackedRequest;
1819
import com.aws.greengrass.localdebugconsole.messageutils.Request;
20+
import com.aws.greengrass.localdebugconsole.messageutils.StreamManagerResponseMessage;
1921
import com.aws.greengrass.logging.api.Logger;
2022
import com.aws.greengrass.mqttclient.MqttClient;
2123
import com.aws.greengrass.mqttclient.MqttRequestException;
@@ -24,6 +26,7 @@
2426
import com.aws.greengrass.mqttclient.v5.Unsubscribe;
2527
import com.aws.greengrass.util.DefaultConcurrentHashMap;
2628
import com.aws.greengrass.util.Pair;
29+
import com.aws.greengrass.util.Utils;
2730
import com.fasterxml.jackson.core.JsonProcessingException;
2831
import com.fasterxml.jackson.databind.JsonNode;
2932
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -71,20 +74,23 @@ public class DashboardServer extends WebSocketServer implements KernelMessagePus
7174
private final Authenticator authenticator;
7275
private final MqttClient mqttClient;
7376

77+
private final StreamManagerHelper streamManagerHelper;
78+
7479
PubSubIPCEventStreamAgent pubSubIPCAgent;
7580
private final String SERVICE_NAME = "LocalDebugConsole";
7681

7782
public DashboardServer(InetSocketAddress address, Logger logger, Kernel root, DeviceConfiguration deviceConfig,
78-
Authenticator authenticator, Provider<SSLEngine> engineProvider) {
83+
Authenticator authenticator, Provider<SSLEngine> engineProvider, String streamManagerAuthToken) {
7984
this(address, logger, new KernelCommunicator(root, logger, deviceConfig), authenticator, engineProvider,
8085
root.getContext().get(PubSubIPCEventStreamAgent.class),
81-
root.getContext().get(MqttClient.class));
86+
root.getContext().get(MqttClient.class),
87+
new StreamManagerHelper(root, streamManagerAuthToken));
8288
}
8389

8490
// constructor for unit testing
8591
DashboardServer(InetSocketAddress address, Logger logger, DashboardAPI dashboardAPI, Authenticator authenticator,
8692
Provider<SSLEngine> engineProvider, PubSubIPCEventStreamAgent pubSubIPCAgent,
87-
MqttClient mqttClient) {
93+
MqttClient mqttClient, StreamManagerHelper streamManagerHelper) {
8894
super(address);
8995
setReuseAddr(true);
9096
setTcpNoDelay(true);
@@ -97,6 +103,7 @@ public DashboardServer(InetSocketAddress address, Logger logger, Kernel root, De
97103
this.logger.atInfo().log("Starting dashboard server on address: {}", address);
98104
this.pubSubIPCAgent = pubSubIPCAgent;
99105
this.mqttClient = mqttClient;
106+
this.streamManagerHelper = streamManagerHelper;
100107
}
101108

102109
// links the API impl and starts the socket server
@@ -251,6 +258,40 @@ public void onMessage(WebSocket conn, String msg) {
251258
unsubscribeFromPubSubTopic(conn, packedRequest, req);
252259
break;
253260
}
261+
case streamManagerListStreams: {
262+
streamManagerListStreams(conn, packedRequest);
263+
break;
264+
}
265+
case streamManagerDescribeStream: {
266+
streamManagerDescribeStream(conn, packedRequest, req);
267+
break;
268+
}
269+
270+
case streamManagerDeleteMessageStream: {
271+
streamManagerDeleteMessageStream(conn, packedRequest, req);
272+
break;
273+
}
274+
275+
case streamManagerReadMessages: {
276+
streamManagerReadMessages(conn, packedRequest, req);
277+
break;
278+
}
279+
280+
case streamManagerAppendMessage:{
281+
streamManagerAppendMessage(conn, packedRequest, req);
282+
break;
283+
}
284+
285+
case streamManagerCreateMessageStream:{
286+
streamManagerCreateMessageStream(conn, packedRequest, req);
287+
break;
288+
}
289+
290+
case streamManagerUpdateMessageStream:{
291+
streamManagerUpdateMessageStream(conn, packedRequest, req);
292+
break;
293+
}
294+
254295
default: { // echo
255296
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, req.call));
256297
break;
@@ -355,6 +396,111 @@ private void publishToPubSubTopic(WebSocket conn, PackedRequest packedRequest, R
355396
}
356397
}
357398

399+
private void streamManagerListStreams(WebSocket conn, PackedRequest packedRequest) {
400+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
401+
try {
402+
responseMessage.streamsList = this.streamManagerHelper.listStreams();
403+
responseMessage.successful = true;
404+
}
405+
catch (Exception e){
406+
logger.error("Error while listing streams:", e);
407+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
408+
}
409+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
410+
}
411+
412+
private void streamManagerDescribeStream(WebSocket conn, PackedRequest packedRequest, Request req) {
413+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
414+
try {
415+
responseMessage.messageStreamInfo = this.streamManagerHelper.describeStream(req.args[0]);
416+
responseMessage.successful = true;
417+
}
418+
catch (Exception e){
419+
logger.error("Error while describing stream:",e);
420+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
421+
}
422+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
423+
}
424+
425+
private void streamManagerDeleteMessageStream(WebSocket conn, PackedRequest packedRequest, Request req) {
426+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
427+
try {
428+
this.streamManagerHelper.deleteMessageStream(req.args[0]);
429+
responseMessage.successful = true;
430+
}
431+
catch (Exception e){
432+
logger.error("Error while deleting stream:", e);
433+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
434+
}
435+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
436+
}
437+
438+
private void streamManagerReadMessages(WebSocket conn, PackedRequest packedRequest, Request req) {
439+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
440+
try {
441+
if (req.args.length == 5) {
442+
responseMessage.messagesList = this.streamManagerHelper.readMessages(
443+
req.args[0],
444+
Long.parseLong(req.args[1]),
445+
Long.parseLong(req.args[2]),
446+
Long.parseLong(req.args[3]),
447+
Long.parseLong(req.args[4]));
448+
responseMessage.successful = true;
449+
}
450+
else{
451+
logger.atError().log("StreamManagerReadMessages requires 5 arguments");
452+
responseMessage.errorMsg = "StreamManagerReadMessages requires 5 arguments";
453+
}
454+
}
455+
catch (Exception e){
456+
logger.error("Error while reading messages:", e);
457+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
458+
}
459+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
460+
}
461+
462+
private void streamManagerAppendMessage(WebSocket conn, PackedRequest packedRequest, Request req) {
463+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
464+
try {
465+
this.streamManagerHelper.appendMessage(req.args[0], req.args[1].getBytes());
466+
responseMessage.successful = true;
467+
}
468+
catch (Exception e){
469+
logger.error("Error while appending message to the stream:", e);
470+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
471+
}
472+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
473+
}
474+
475+
private void streamManagerCreateMessageStream(WebSocket conn, PackedRequest packedRequest, Request req) {
476+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
477+
try {
478+
MessageStreamDefinition messageStreamDefinition = jsonMapper.readValue(req.args[0], MessageStreamDefinition.class);
479+
this.streamManagerHelper.createMessageStream(messageStreamDefinition);
480+
responseMessage.successful = true;
481+
}
482+
catch (Exception e){
483+
logger.error("Error while appending message to the stream:", e);
484+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
485+
}
486+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
487+
}
488+
489+
private void streamManagerUpdateMessageStream(WebSocket conn, PackedRequest packedRequest, Request req) {
490+
StreamManagerResponseMessage responseMessage = new StreamManagerResponseMessage();
491+
try {
492+
MessageStreamDefinition messageStreamDefinition = jsonMapper.readValue(req.args[0], MessageStreamDefinition.class);
493+
this.streamManagerHelper.updateMessageStream(messageStreamDefinition);
494+
responseMessage.successful = true;
495+
}
496+
catch (Exception e){
497+
logger.error("Error while appending message to the stream:", e);
498+
responseMessage.errorMsg = Utils.generateFailureMessage(e);
499+
}
500+
sendIfOpen(conn, new Message(MessageType.RESPONSE, packedRequest.requestID, responseMessage));
501+
}
502+
503+
358504
@Override
359505
public void onClose(WebSocket conn, int code, String reason, boolean remote) {
360506
connections.remove(conn);

src/main/java/com/aws/greengrass/localdebugconsole/SimpleHttpServer.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.aws.greengrass.dependency.ImplementsService;
1313
import com.aws.greengrass.dependency.State;
1414
import com.aws.greengrass.deployment.DeviceConfiguration;
15+
import com.aws.greengrass.ipc.AuthenticationHandler;
1516
import com.aws.greengrass.lifecyclemanager.Kernel;
1617
import com.aws.greengrass.lifecyclemanager.PluginService;
1718
import com.aws.greengrass.logging.api.Logger;
@@ -96,6 +97,7 @@
9697
import javax.net.ssl.SSLException;
9798

9899
import static com.aws.greengrass.componentmanager.KernelConfigResolver.CONFIGURATION_CONFIG_KEY;
100+
import static com.aws.greengrass.ipc.AuthenticationHandler.SERVICE_UNIQUE_ID_KEY;
99101
import static com.aws.greengrass.util.Utils.isEmpty;
100102
import static io.netty.buffer.Unpooled.copiedBuffer;
101103

@@ -131,6 +133,7 @@ public class SimpleHttpServer extends PluginService implements Authenticator {
131133
private boolean httpsEnabled = DEFAULT_HTTPS_ENABLED;
132134
private SslContext context;
133135
private Provider<SSLEngine> engineProvider;
136+
private String streamManagerAuthToken;
134137

135138
@Inject
136139
public SimpleHttpServer(Topics t, Kernel kernel, DeviceConfiguration deviceConfiguration) {
@@ -142,6 +145,10 @@ public SimpleHttpServer(Topics t, Kernel kernel, DeviceConfiguration deviceConfi
142145
@Override
143146
public void postInject() {
144147
super.postInject();
148+
// Does not happen for built-in/plugin services so doing explicitly
149+
AuthenticationHandler.registerAuthenticationToken(this);
150+
streamManagerAuthToken = Coerce.toString(this.getPrivateConfig().findLeafChild(SERVICE_UNIQUE_ID_KEY));
151+
145152
config.lookup(CONFIGURATION_CONFIG_KEY, "port").dflt(port).subscribe((w, n) -> {
146153
int oldPort = port;
147154
port = Coerce.toInt(n);
@@ -195,7 +202,7 @@ public void startup() throws InterruptedException {
195202

196203
logger.atInfo().log("Starting local dashboard server");
197204
dashboardServer = new DashboardServer(new InetSocketAddress(bindHostname, websocketPort), logger,
198-
kernel, deviceConfig, this, engineProvider);
205+
kernel, deviceConfig, this, engineProvider, streamManagerAuthToken);
199206
dashboardServer.startup();
200207
try {
201208
// We need to wait for the server to startup before grabbing the port because it starts in a separate thread
@@ -531,6 +538,10 @@ private boolean authenticated(String authHeader) {
531538

532539
@Override
533540
public boolean isUsernameAndPasswordValid(Pair<String, String> usernameAndPassword) {
541+
if (consoleAuthDisabled()) {
542+
return true;
543+
}
544+
534545
Topics passwordTopics = config.getRoot().findTopics(DEBUG_PASSWORD_NAMESPACE);
535546
if (passwordTopics == null || usernameAndPassword == null) {
536547
return false;
@@ -563,7 +574,15 @@ public boolean isUsernameAndPasswordValid(Pair<String, String> usernameAndPasswo
563574
return Instant.now().isBefore(Instant.ofEpochMilli(Coerce.toLong(expirationTopic)));
564575
}
565576

577+
private static boolean consoleAuthDisabled() {
578+
// Set to true in development so you don't need to authenticate all the time.
579+
return false;
580+
}
581+
566582
private Pair<String, String> getUsernameAndPassword(String authHeader) {
583+
if (consoleAuthDisabled()) {
584+
return new Pair<>("", "");
585+
}
567586
if (Utils.isEmpty(authHeader)) {
568587
return null;
569588
}

0 commit comments

Comments
 (0)