diff --git a/src/index.ts b/src/index.ts index a68a415..4eac7f9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,6 +1,7 @@ import {DevToolEnabledSource} from '@cycle/run'; import xs, {Stream, MemoryStream} from 'xstream'; import dropRepeats from 'xstream/extra/dropRepeats'; +import { adapt } from '@cycle/run/lib/adapt'; export type MainFn = (sources: So) => Si; export type Reducer = (state: T | undefined) => T | undefined; @@ -16,21 +17,23 @@ export type Scope = string | number | Lens; export function pick(selector: Selector | string) { if (typeof selector === 'string') { - return function pickWithString(sinksArray$: Stream>): Stream> { - return sinksArray$.map(sinksArray => sinksArray.map(sinks => sinks[selector])); + return function pickWithString(sinksArray$: any): Stream> { + return adapt((xs.fromObservable(sinksArray$) as Stream>) + .map(sinksArray => sinksArray.map(sinks => sinks[selector]))); }; } else { - return function pickWithFunction(sinksArray$: Stream>): Stream> { - return sinksArray$.map(sinksArray => sinksArray.map(selector)); + return function pickWithFunction(sinksArray$: any): Stream> { + return adapt((xs.fromObservable(sinksArray$) as Stream>) + .map(sinksArray => sinksArray.map(selector))); }; } } export function mix(aggregator: Aggregator) { - return function mixOperator(streamArray$: Stream>>): Stream { - return streamArray$ + return function mixOperator(streamArray$: any): Stream>> { + return adapt((xs.fromObservable(streamArray$) as Stream>>) .map(streamArray => aggregator(...streamArray)) - .flatten(); + .flatten()); } } @@ -105,7 +108,7 @@ export class StateSource { constructor(stream: Stream, name: string | null) { this._name = name; - this.state$ = stream.compose(dropRepeats()).remember(); + this.state$ = adapt(stream.compose(dropRepeats()).remember()); if (!name) { return; } @@ -134,7 +137,10 @@ export default function onionify( .drop(1); sources[name] = new StateSource(state$, name) as any; const sinks = main(sources as So); - reducerMimic$.imitate(sinks[name]); + if (sinks[name]) { + const stream$ = xs.fromObservable>(sinks[name]); + reducerMimic$.imitate(stream$); + } return sinks; } }