Skip to content

Commit 6084043

Browse files
rsercanosangaman
authored andcommitted
feat(cli): improved streamorders (#780)
This commit changes the behavior of the `streamorders` cli call: 1. Allow it to start before xud. `streamorders` will wait for xud to start and not issue an error. 2. Print a line upon connection. 3. Upon termination of xud or any other RPC error - reconnect to xud and wait if xud is not available. 4. Upon connection to xud - print existing orders. Closes #687.
1 parent 8b914ac commit 6084043

File tree

1 file changed

+39
-0
lines changed

1 file changed

+39
-0
lines changed

lib/cli/commands/subscribeorders.ts

+39
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { loadXudClient } from '../command';
22
import { Arguments } from 'yargs';
33
import * as xudrpc from '../../proto/xudrpc_pb';
4+
import { XudClient } from '../../proto/xudrpc_grpc_pb';
45

56
export const command = 'streamorders [existing]';
67

@@ -15,20 +16,58 @@ export const builder = {
1516
};
1617

1718
export const handler = (argv: Arguments) => {
19+
ensureConnection(argv, true);
20+
};
21+
22+
let xud: XudClient;
23+
24+
const ensureConnection = (argv: Arguments, printError?: boolean) => {
25+
if (!xud) xud = loadXudClient(argv);
26+
xud.waitForReady(Number.POSITIVE_INFINITY, (error: Error | null) => {
27+
if (error) {
28+
if (printError) console.error(`${error.name}: ${error.message}`);
29+
setTimeout(ensureConnection.bind(undefined, argv), 3000);
30+
} else {
31+
console.log('Successfully connected, subscribing for orders');
32+
subscribeOrders(argv);
33+
}
34+
});
35+
};
36+
37+
const subscribeOrders = (argv: Arguments) => {
1838
const addedOrdersRequest = new xudrpc.SubscribeAddedOrdersRequest();
1939
addedOrdersRequest.setExisting(argv.existing);
2040
const addedOrdersSubscription = loadXudClient(argv).subscribeAddedOrders(addedOrdersRequest);
2141
addedOrdersSubscription.on('data', (order: xudrpc.Order) => {
2242
console.log(`Order added: ${JSON.stringify(order.toObject())}`);
2343
});
2444

45+
// adding end, close, error events only once,
46+
// since they'll be thrown for three of subscriptions in the corresponding cases, catching once is enough.
47+
addedOrdersSubscription.on('end', reconnect.bind(undefined, argv));
48+
addedOrdersSubscription.on('error', (err: Error) => {
49+
console.log(`Unexpected error occured: ${JSON.stringify(err)}, trying to reconnect`);
50+
ensureConnection(argv);
51+
});
52+
2553
const removedOrdersSubscription = loadXudClient(argv).subscribeRemovedOrders(new xudrpc.SubscribeRemovedOrdersRequest());
2654
removedOrdersSubscription.on('data', (orderRemoval: xudrpc.OrderRemoval) => {
2755
console.log(`Order removed: ${JSON.stringify(orderRemoval.toObject())}`);
2856
});
2957

58+
// prevent exiting and do nothing, it's already caught above.
59+
removedOrdersSubscription.on('error', () => {});
60+
3061
const swapsSubscription = loadXudClient(argv).subscribeSwaps(new xudrpc.SubscribeSwapsRequest());
3162
swapsSubscription.on('data', (swapResult: xudrpc.SwapResult) => {
3263
console.log(`Order swapped: ${JSON.stringify(swapResult.toObject())}`);
3364
});
65+
66+
// prevent exiting and do nothing, it's already caught above.
67+
swapsSubscription.on('error', () => {});
68+
};
69+
70+
const reconnect = (argv: Arguments) => {
71+
console.log('Stream has closed, trying to reconnect');
72+
ensureConnection(argv, false);
3473
};

0 commit comments

Comments
 (0)