Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

Commit

Permalink
feat: update smoldot connection options
Browse files Browse the repository at this point in the history
  • Loading branch information
kratico committed Nov 11, 2022
1 parent 2176929 commit 9b5b629
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 63 deletions.
183 changes: 130 additions & 53 deletions rpc/provider/smoldot.test.ts
Original file line number Diff line number Diff line change
@@ -1,66 +1,143 @@
// TODO
import { deferred } from "../../deps/std/async.ts";
import { deferred, delay } from "../../deps/std/async.ts";
import { assertExists, assertNotInstanceOf } from "../../deps/std/testing/asserts.ts";
import { ProviderListener } from "./base.ts";
import { smoldotProvider } from "./smoldot.ts";

const POLKADOT_CHAIN_SPEC_URL =
"https://raw.githubusercontent.com/paritytech/substrate-connect/main/packages/connect/src/connector/specs/polkadot.json";

Deno.test({
name: "Smoldot Provider",
sanitizeOps: false,
sanitizeResources: false,
async fn() {
const polkadotChainSpec = await (await fetch(POLKADOT_CHAIN_SPEC_URL)).text();
const pendingSubscriptionId = deferred<string>();
const initialized = deferred();
const unsubscribed = deferred();
const checks: ProviderListener<any, any>[] = [
// check for chainHead_unstable_follow subscription
(message) => {
assertNotInstanceOf(message, Error);
assertExists(message.result);
pendingSubscriptionId.resolve(message.result);
},
// check for chainHead_unstable_follow initialized event
(message) => {
assertNotInstanceOf(message, Error);
assertExists(message.params?.result);
if (message.params?.result.event === "initialized") {
initialized.resolve();
}
},
// check for chainHead_unstable_unfollow unsubscribe
(message) => {
assertNotInstanceOf(message, Error);
if (message?.result === null) {
unsubscribed.resolve();
}
async fn(t) {
await t.step({
name: "relay chain connection",
async fn() {
const relayChainSpec = await (
await fetch(
"https://raw.githubusercontent.com/paritytech/substrate-connect/main/packages/connect/src/connector/specs/polkadot.json",
)
)
.text();
const pendingSubscriptionId = deferred<string>();
const initialized = deferred();
const unsubscribed = deferred();
const checks: ProviderListener<any, any>[] = [
// check for chainHead_unstable_follow subscription
(message) => {
assertNotInstanceOf(message, Error);
assertExists(message.result);
pendingSubscriptionId.resolve(message.result);
},
// check for chainHead_unstable_follow initialized event
(message) => {
assertNotInstanceOf(message, Error);
assertExists(message.params?.result);
if (message.params?.result.event === "initialized") {
initialized.resolve();
}
},
// check for chainHead_unstable_unfollow unsubscribe
(message) => {
assertNotInstanceOf(message, Error);
if (message?.result === null) {
unsubscribed.resolve();
}
},
];
const provider = smoldotProvider(relayChainSpec, (message) => {
if (checks.length > 1) {
checks.shift()!(message);
} else {
checks[0]!(message);
}
});
provider.send({
jsonrpc: "2.0",
id: provider.nextId(),
method: "chainHead_unstable_follow",
params: [false],
});
const subscriptionId = await pendingSubscriptionId;
await initialized;
provider.send({
jsonrpc: "2.0",
id: provider.nextId(),
method: "chainHead_unstable_unfollow",
params: [subscriptionId],
});
await unsubscribed;
const providerRelease = await provider.release();
assertNotInstanceOf(providerRelease, Error);
},
];
const provider = smoldotProvider(polkadotChainSpec, (message) => {
if (checks.length > 1) {
checks.shift()!(message);
} else {
checks[0]!(message);
}
});
provider.send({
jsonrpc: "2.0",
id: provider.nextId(),
method: "chainHead_unstable_follow",
params: [true],
});
const subscriptionId = await pendingSubscriptionId;
await initialized;
provider.send({
jsonrpc: "2.0",
id: provider.nextId(),
method: "chainHead_unstable_unfollow",
params: [subscriptionId],
await t.step({
name: "parachain connection",
async fn() {
const relayChainSpec = await (
await fetch(
"https://raw.githubusercontent.com/paritytech/substrate-connect/main/packages/connect/src/connector/specs/westend2.json",
)
)
.text();
const parachainSpec = await (
await fetch(
"https://raw.githubusercontent.com/paritytech/substrate-connect/main/projects/demo/src/assets/westend-westmint.json",
)
)
.text();
const pendingSubscriptionId = deferred<string>();
const initialized = deferred();
const unsubscribed = deferred();
const checks: ProviderListener<any, any>[] = [
// check for chainHead_unstable_follow subscription
(message) => {
assertNotInstanceOf(message, Error);
assertExists(message.result);
pendingSubscriptionId.resolve(message.result);
},
// check for chainHead_unstable_follow initialized event
(message) => {
assertNotInstanceOf(message, Error);
assertExists(message.params?.result);
if (message.params?.result.event === "initialized") {
initialized.resolve();
}
},
// check for chainHead_unstable_unfollow unsubscribe
(message) => {
assertNotInstanceOf(message, Error);
if (message?.result === null) {
unsubscribed.resolve();
}
},
];
const provider = smoldotProvider(
[parachainSpec, relayChainSpec],
(message) => {
if (checks.length > 1) {
checks.shift()!(message);
} else {
checks[0]!(message);
}
},
);
provider.send({
jsonrpc: "2.0",
id: provider.nextId(),
method: "chainHead_unstable_follow",
params: [false],
});
const subscriptionId = await pendingSubscriptionId;
await initialized;
provider.send({
jsonrpc: "2.0",
id: provider.nextId(),
method: "chainHead_unstable_unfollow",
params: [subscriptionId],
});
await unsubscribed;
const providerRelease = await provider.release();
assertNotInstanceOf(providerRelease, Error);
},
});
await unsubscribed;
await provider.release();
},
});
37 changes: 27 additions & 10 deletions rpc/provider/smoldot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
start,
} from "../../deps/smoldot.ts";
import { Chain, Client, ClientOptions } from "../../deps/smoldot/client.d.ts";
import { deferred } from "../../deps/std/async.ts";
import * as msg from "../messages.ts";
import { nextIdFactory, Provider, ProviderConnection, ProviderListener } from "./base.ts";
import { ProviderCloseError, ProviderHandlerError, ProviderSendError } from "./errors.ts";
Expand All @@ -26,15 +27,17 @@ type SmoldotHandlerErrorData =
type SmoldotCloseErrorData = AlreadyDestroyedError | CrashError;

let client: undefined | Client;
const connections = new Map<string, SmoldotProviderConnection>();
const connections = new Map<ChainSpecOptions, SmoldotProviderConnection>();
class SmoldotProviderConnection
extends ProviderConnection<Chain, SmoldotSendErrorData, SmoldotHandlerErrorData>
{}

const nextId = nextIdFactory();

type ChainSpecOptions = string | [parachainSpec: string, relayChainSpec: string];

export const smoldotProvider: Provider<
string,
ChainSpecOptions,
SmoldotSendErrorData,
SmoldotHandlerErrorData,
SmoldotCloseErrorData
Expand Down Expand Up @@ -75,7 +78,7 @@ export const smoldotProvider: Provider<
};

async function connection(
chainSpec: string,
chainSpec: ChainSpecOptions,
listener: ProviderListener<SmoldotSendErrorData, SmoldotHandlerErrorData>,
): Promise<SmoldotProviderConnection> {
if (!client) {
Expand All @@ -88,17 +91,31 @@ async function connection(
}
let conn = connections.get(chainSpec);
if (!conn) {
const inner = await client.addChain({ chainSpec });
conn = new SmoldotProviderConnection(inner, () => {
try {
inner.remove();
} catch (_e) { /* TODO */ }
});
let inner: Chain;
if (typeof chainSpec === "string") {
inner = await client.addChain({ chainSpec });
} else {
const [parachainSpec, relayChainSpec] = chainSpec;
const relayChainConnection = await client.addChain({ chainSpec: relayChainSpec });
inner = await client.addChain({
chainSpec: parachainSpec,
potentialRelayChains: [relayChainConnection],
});
}
const stopListening = deferred<undefined>();
conn = new SmoldotProviderConnection(inner, () => stopListening.resolve());
connections.set(chainSpec, conn);
(async () => {
while (true) {
try {
const message = msg.parse(await inner.nextJsonRpcResponse());
const response = await Promise.race([
stopListening,
inner.nextJsonRpcResponse(),
]);
if (!response) {
break;
}
const message = msg.parse(response);
conn!.forEachListener(message);
} catch (e) {
conn!.forEachListener(new ProviderHandlerError(e as SmoldotHandlerErrorData));
Expand Down

0 comments on commit 9b5b629

Please sign in to comment.