Skip to content
This repository has been archived by the owner on Sep 14, 2023. It is now read-only.

feat: rpc subscription refactor + returning results #413

Merged
merged 14 commits into from
Nov 23, 2022
2 changes: 1 addition & 1 deletion codegen/typeVisitor.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { posix as pathPosix } from "../deps/std/path.ts"
import * as M from "../frame_metadata/mod.ts"
import { normalizeCase } from "../util/case.ts"
import { getOrInit } from "../util/map.ts"
import { getOrInit } from "../util/mod.ts"
import { Files } from "./Files.ts"
import { CodegenProps } from "./mod.ts"
import { makeDocComment, S } from "./utils.ts"
Expand Down
37 changes: 19 additions & 18 deletions effects/blockWatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,30 @@ import { chain } from "./rpc_known_methods.ts"

const k0_ = Symbol()

// TODO: replace this once contramap util implemented
export function blockWatch<Client extends Z.$<rpc.Client>>(client: Client) {
return <
Listener extends Z.$<U.Listener<rpc.known.SignedBlock, BlockWatchListenerContext>>,
>(listener: Listener) => {
const listenerMapped = Z
.ls(listener, Z.env)
.next(([listener, env]) => {
return async function(this: rpc.ClientSubscribeContext, header: rpc.known.Header) {
const blockHash = chain.getBlockHash(client)(header.number)
const block = await chain.getBlock(client)(blockHash).bind(env)()
// TODO: return error with `this.stop` once implemented
if (block instanceof Error) throw block
listener.apply({ ...this, env }, [block])
}
}, k0_)
const subscriptionId = chain.subscribeNewHeads(client)([], listenerMapped)
return chain
.unsubscribeNewHeads(client)(subscriptionId)
.zoned("BlockWatch")
CreateListener extends Z.$<U.CreateListener<BlockWatchListenerContext, rpc.known.SignedBlock>>,
>(createListener: CreateListener) => {
const createListenerMapped = Z
.ls(createListener, Z.env)
.next(([createListener, env]) =>
(ctx: rpc.ClientSubscriptionContext) => {
const inner = createListener({ ...ctx, env })
return async (header: rpc.known.Header) => {
const blockHash = chain.getBlockHash(client)(header.number)
const block = await chain.getBlock(client)(blockHash).bind(env)()
if (block instanceof Error) {
return ctx.end(block)
}
return inner(block) as U.InnerEnd<Z.T<CreateListener>>
}
}, k0_)
return chain.subscribeNewHeads(client)([], createListenerMapped)
}
}

// TODO: generalize creating watch effects + accessing context + halting with a value
export interface BlockWatchListenerContext extends rpc.ClientSubscribeContext {
export interface BlockWatchListenerContext extends rpc.ClientSubscriptionContext {
env: Z.Env
}
2 changes: 1 addition & 1 deletion effects/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export function const_<Client extends Z.$<rpc.Client>>(client: Client) {
const entryValueTypeI = constMetadata_.access("ty").access("id")
const constValue = constMetadata_.access("value")
const $const = scale.codec(deriveCodec_, entryValueTypeI)
return scale.scaleDecoded($const, constValue, "value").zoned("Const")
return scale.scaleDecoded($const, constValue, "value")
}
}
Object.defineProperty(const_, "name", {
Expand Down
2 changes: 1 addition & 1 deletion effects/entryRead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ export function entryRead<Client extends Z.$<rpc.Client>>(client: Client) {
const storageBytes = storageBytesHex.next(U.hex.decode)
const entryValueTypeI = entryMetadata_.access("value")
const $entry = scale.codec(deriveCodec_, entryValueTypeI)
return scale.scaleDecoded($entry, storageBytes, "value").zoned("EntryRead")
return scale.scaleDecoded($entry, storageBytes, "value")
}
}
37 changes: 17 additions & 20 deletions effects/entryWatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export function entryWatch<Client extends Z.$<rpc.Client>>(client: Client) {
palletName: PalletName,
entryName: EntryName,
keys: Keys,
listener: U.Listener<WatchEntryEvent[], rpc.ClientSubscribeContext>,
createListener: U.CreateListener<rpc.ClientSubscriptionContext, WatchEntryEvent[]>,
) => {
const metadata_ = metadata(client)()
const deriveCodec_ = scale.deriveCodec(metadata_)
Expand All @@ -31,25 +31,22 @@ export function entryWatch<Client extends Z.$<rpc.Client>>(client: Client) {
.scaleEncoded($storageKey_, keys.length ? [keys] : [])
.next(U.hex.encode)
.next(U.tuple)
const listenerMapped = Z.ls($entry, listener).next(([$entry, listener]) => {
return function listenerMapped(
this: rpc.ClientSubscribeContext,
changeset: rpc.known.StorageChangeSet,
) {
// TODO: in some cases there might be keys to decode
// key ? $storageKey.decode(U.hex.decode(key)) : undefined
const getKey = (key: rpc.known.Hex) => {
return key
const createListenerMapped = Z
.ls($entry, createListener)
.next(([$entry, createListener]) => {
return (ctx: rpc.ClientSubscriptionContext) => {
const inner = createListener(ctx)
return (changeset: rpc.known.StorageChangeSet) => {
// TODO: in some cases there might be keys to decode
// key ? $storageKey.decode(U.hex.decode(key)) : undefined
const getKey = (key: rpc.known.Hex) => key
const changes: WatchEntryEvent[] = changeset.changes.map(
([key, val]) => [getKey(key), val ? $entry.decode(U.hex.decode(val)) : undefined],
)
return inner(changes)
}
}
const changes: WatchEntryEvent[] = changeset.changes.map(([key, val]) => {
return [getKey(key), val ? $entry.decode(U.hex.decode(val)) : undefined]
})
listener.apply(this, [changes])
}
}, k0_)
const subscriptionId = state.subscribeStorage(client)([storageKeys], listenerMapped)
return state
.unsubscribeStorage(client)(subscriptionId)
.zoned("EntryWatch")
}, k0_)
return state.subscribeStorage(client)([storageKeys], createListenerMapped)
}
}
30 changes: 11 additions & 19 deletions effects/extrinsic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,14 @@ export class SignedExtrinsic<
this.extrinsicDecoded = scale.scaleDecoded($extrinsic_, this.extrinsicBytes, "extrinsic")
}

watch<Listener extends Z.$<U.Listener<rpc.known.TransactionStatus, rpc.ClientSubscribeContext>>>(
watch<
Listener extends Z.$<
U.CreateListener<rpc.ClientSubscriptionContext, rpc.known.TransactionStatus>
>,
>(
listener: Listener,
) {
const subscriptionId = author.submitAndWatchExtrinsic(this.client)(
[this.extrinsicHex],
listener,
)
return author.unwatchExtrinsic(this.client)(subscriptionId)
.zoned("ExtrinsicWatch")
return author.submitAndWatchExtrinsic(this.client)([this.extrinsicHex], listener)
}

get sent() {
Expand All @@ -137,18 +136,11 @@ export class SignedExtrinsic<
}

// TODO: attach to extrinsics sent.finalized result once zones-level method addition implemented
export function extrinsicsDecoded<
Client extends Z.$<rpc.Client>,
Hexes extends Z.$<rpc.known.Hex[]>,
>(
client: Client,
hexes: Hexes,
) {
return Z
.ls($extrinsic(client), hexes)
.next(([$extrinsic, hexes]) => {
return hexes.map((hex) => $extrinsic.decode(U.hex.decode(hex)))
})
export function extrinsicsDecoded<Client extends Z.$<rpc.Client>>(client: Client) {
return <Hexes extends Z.$<rpc.known.Hex[]>>(hexes: Hexes) =>
Z.ls($extrinsic(client), hexes).next(([$extrinsic, hexes]) =>
hexes.map((hex) => $extrinsic.decode(U.hex.decode(hex)))
)
}

function $extrinsic<
Expand Down
1 change: 0 additions & 1 deletion effects/keyPageRead.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,5 @@ export function keyPageRead<Client extends Z.$<rpc.Client>>(client: Client) {
return $key.decode(U.hex.decode(keyEncoded))
})
}, k0_)
.zoned("KeyPageRead")
}
}
5 changes: 0 additions & 5 deletions effects/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ export function metadata<Client extends Z.$<rpc.Client>>(client: Client) {
return e as $.ScaleError
}
}, k0_)
.zoned("Metadata")
}
}

Expand All @@ -36,7 +35,6 @@ export function palletMetadata<Metadata extends Z.$<M.Metadata>, PalletName exte
.next(([metadata, palletName]) => {
return M.getPallet(metadata, palletName)
}, k1_)
.zoned("PalletMetadata")
}

export function entryMetadata<PalletMetadata extends Z.$<M.Pallet>, EntryName extends Z.$<string>>(
Expand All @@ -48,7 +46,6 @@ export function entryMetadata<PalletMetadata extends Z.$<M.Pallet>, EntryName ex
.next(([palletMetadata, entryName]) => {
return M.getEntry(palletMetadata, entryName)
}, k2_)
.zoned("EntryMetadata")
}

export function constMetadata<
Expand All @@ -63,7 +60,6 @@ export function constMetadata<
.next(([palletMetadata, constName]) => {
return M.getConst(palletMetadata, constName)
}, k3_)
.zoned("ConstMetadata")
}

export function mapMetadata<PalletMetadata extends Z.$<M.Pallet>, EntryName extends Z.$<string>>(
Expand All @@ -80,7 +76,6 @@ export function mapMetadata<PalletMetadata extends Z.$<M.Pallet>, EntryName exte
}
return entryMetadata
}, k4_)
.zoned("MapMetadata")
}

export class ExpectedMapError extends Error {
Expand Down
71 changes: 31 additions & 40 deletions effects/rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ export function rpcClient<
.next(([_, discoveryValue]) => {
return new rpc.Client(provider, discoveryValue as DiscoveryValue)
}, k0_)
.zoned("RpcClient")
}

export function rpcCall<Params extends unknown[], Result>(method: string, nonIdempotent?: boolean) {
Expand All @@ -34,12 +33,7 @@ export function rpcCall<Params extends unknown[], Result>(method: string, nonIde
// TODO: why do we need to explicitly type this / why is this not being inferred?
const id = client.providerRef.nextId()
const result: rpc.ClientCallEvent<ClientE["send"], ClientE["handler"], Result> =
await client.call<Result>({
jsonrpc: "2.0",
id,
method,
params,
})
await client.call<Result>(id, method, params)
const discardCheckResult = await discardCheck<ClientE["close"]>(client, counter)
if (discardCheckResult) return discardCheckResult
if (result instanceof Error) {
Expand All @@ -49,51 +43,48 @@ export function rpcCall<Params extends unknown[], Result>(method: string, nonIde
}
return result.result
}, k1_)
.zoned("RpcCall")
}
}
}

// TODO: why are leading type params unknown when `extends Z.$Client<any, SendErrorData, HandlerErrorData, CloseErrorData>`?
export function rpcSubscription<Params extends unknown[], Result>() {
return <Method extends string>(method: Method) => {
export function rpcSubscription<Params extends unknown[], Event>() {
return <SubscribeMethod extends string>(
subscribeMethod: SubscribeMethod,
unsubscribeMethod: string,
) => {
return <Client_ extends Z.$<rpc.Client>>(client: Client_) => {
return <
Params_ extends Z.Ls$<Params>,
Listener extends Z.$<U.Listener<Result, rpc.ClientSubscribeContext>>,
>(params: [...Params_], listener: Listener) => {
CreateListener extends Z.$<U.CreateListener<rpc.ClientSubscriptionContext, Event>>,
>(params: [...Params_], createListener: CreateListener) => {
type ClientE = Z.T<Client_>[rpc.ClientE_]
return Z
.rc(client, listener, ...params)
.next(async ([[client, listener, ...params], counter]) => {
type ClientE = typeof client[rpc.ClientE_]
const id = client.providerRef.nextId()
let error:
| undefined
| RpcServerError
| rpc.ProviderSendError<ClientE["send"]>
| rpc.ProviderHandlerError<ClientE["handler"]>
const subscriptionId = await client.subscribe<Method, Result>({
jsonrpc: "2.0",
id,
method,
params,
}, function(e) {
if (e instanceof Error) {
error = e
this.stop()
} else if (e.error) {
error = new RpcServerError(e)
console.log(e)
this.stop()
} else {
// TODO: halt if returns `Error` | `Promise<Error>`?
listener.apply(this, [e.params.result])
.rc(client, createListener, ...params)
.next(async ([[client, createListener, ...params], counter]) => {
const result = await client.subscriptionFactory<
Params,
Event
>()(subscribeMethod, unsubscribeMethod, params as [...Params], (ctx) => {
const inner = createListener(ctx)
return (e_) => {
const e = e_ as rpc.ClientSubscriptionEvent<
Z.T<SubscribeMethod>,
Event,
ClientE["send"],
ClientE["handler"]
>
if (e instanceof Error) {
return ctx.end(e)
} else if (e.error) {
return ctx.end(new RpcServerError(e))
}
return inner(e.params.result) as U.InnerEnd<Z.T<CreateListener>>
}
})
const discardCheckResult = await discardCheck<ClientE["close"]>(client, counter)
return discardCheckResult || error || subscriptionId!
return discardCheckResult || result
}, k2_)
.zoned("RpcSubscription")
}
}
}
Expand All @@ -111,7 +102,7 @@ async function discardCheck<CloseErrorData>(
}

export class RpcServerError extends Error {
override readonly name = "RpcServer"
override readonly name = "RpcServerError"

constructor(readonly inner: rpc.msg.ErrorMessage) {
super()
Expand Down
10 changes: 4 additions & 6 deletions effects/rpc_known_methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ export namespace state {
known.StorageChangeSet
>()(
"state_subscribeStorage",
)
export const unsubscribeStorage = rpcCall<[subscriptionId: string], true>(
"state_unsubscribeStorage",
)
export const getKeysPaged = rpcCall<
Expand All @@ -24,8 +22,8 @@ export namespace state {
>("state_getKeysPaged")
}
export namespace chain {
export const subscribeNewHeads = rpcSubscription<[], known.Header>()("chain_subscribeNewHeads")
export const unsubscribeNewHeads = rpcCall<[subscriptionId: string], true>(
export const subscribeNewHeads = rpcSubscription<[], known.Header>()(
"chain_subscribeNewHeads",
"chain_unsubscribeNewHeads",
)
export const getBlock = rpcCall<[hash?: U.HexHash], known.SignedBlock>("chain_getBlock")
Expand All @@ -42,8 +40,8 @@ export namespace author {
export const submitAndWatchExtrinsic = rpcSubscription<
[extrinsic: U.Hex],
known.TransactionStatus
>()("author_submitAndWatchExtrinsic")
export const unwatchExtrinsic = rpcCall<[subscriptionId: string], true>(
>()(
"author_submitAndWatchExtrinsic",
"author_unwatchExtrinsic",
)
export const submitExtrinsic = rpcCall<[extrinsic: U.Hex], U.Hash>("author_submitExtrinsic")
Expand Down
1 change: 0 additions & 1 deletion effects/scale.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ export function scaleDecoded<
.next(([codec, encoded, key]): Record<Z.T<Key>, any> => {
return { [key]: codec.decode(encoded) } as any
}, k2_)
.zoned("ScaleDecoded")
}

// TODO: eventually, utilize `V` to toggle runtime validation
Expand Down
13 changes: 8 additions & 5 deletions examples/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ const tx = extrinsic({
}),
})
.signed(T.alice.sign)
.watch(function(status) {
console.log(status)
if (C.rpc.known.TransactionStatus.isTerminal(status)) {
this.stop()
.watch((ctx) =>
(status) => {
console.log(status)
if (C.rpc.known.TransactionStatus.isTerminal(status)) {
return ctx.end()
}
return
}
})
)

U.throwIfError(await tx.run())
Loading