diff --git a/rx.lua b/rx.lua index bef15ac..ecaf2db 100644 --- a/rx.lua +++ b/rx.lua @@ -112,16 +112,21 @@ function Observable.create(subscribe) return setmetatable(self, Observable) end ---- Shorthand for creating an Observer and passing it to this Observable's subscription function. --- @arg {function} onNext - Called when the Observable produces a value. +--- Attaches an Observer to this Observable and returns a corresponding Subscription. +-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable +-- produces a value. If a function, then the three-argument semantics are used, which are a +-- shorthand for creating an Observer and passing it to this same method. -- @arg {function} onError - Called when the Observable terminates due to an error. -- @arg {function} onCompleted - Called when the Observable completes normally. +-- @returns {Subscription} function Observable:subscribe(onNext, onError, onCompleted) + local s if type(onNext) == 'table' then - return self._subscribe(onNext) + s = self._subscribe(onNext) else - return self._subscribe(Observer.create(onNext, onError, onCompleted)) + s = self._subscribe(Observer.create(onNext, onError, onCompleted)) end + return s or Subscription.create(util.noop) end --- Returns an Observable that immediately completes without producing a value. @@ -2076,6 +2081,67 @@ end Subject.__call = Subject.onNext +--- @class AnonymousSubject +-- @description Like Subject, AnonymousSubjects function both as an Observer and as an Observable. +-- The functionality of both sides of an AnonymousSubject is provided by an external observer +-- and observable. Usually, the entity that creates the AnonymousSubject will subscribe to +-- the observable so that pushing a value to the AnonymousSubject results in an effect on the +-- observer. + +local AnonymousSubject = setmetatable({}, Observable) +AnonymousSubject.__index = AnonymousSubject +AnonymousSubject.__tostring = util.constant('AnonymousSubject') + +--- Creates a new AnonymousSubject +-- @arg{Observer} destination - the observer (input) side of the AnonymousSubject +-- @arg{Observable} source - the observable (output) side of the AnonymousSubject +-- @returns {AnonymousSubject} +function AnonymousSubject.create(_destination, _source) + local self = { + destination = _destination, + source = _source + } + + return setmetatable(self, AnonymousSubject) +end + +--- Attaches an Observer to this AnonymousSubject's observable. +-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable +-- produces a value. If a function, then the three-argument semantics are used, which are a +-- shorthand for creating an Observer and passing it to this same method. +-- @arg {function} onError - Called when the Observable terminates due to an error. +-- @arg {function} onCompleted - Called when the Observable completes normally. +function AnonymousSubject:subscribe(onNext, onError, onCompleted) + if self.source then + return self.source:subscribe(onNext, onError, onCompleted) + else + return Subscription.create(util.noop) + end +end + +--- Pushes zero or more values to the AnonymousSubject. +-- @arg {*...} values +function AnonymousSubject:onNext(...) + if self.destination then + self.destination:onNext(...) + end +end + +--- Signal to the AnonymousSubject that an error has occurred. +-- @arg {string=} message - A string describing what went wrong. +function AnonymousSubject:onError(message) + if self.destination then + self.destination:onError(message) + end +end + +--- Signal to the AnonymousSubject that its observer will not be fed any more values. +function AnonymousSubject:onCompleted() + if self.destination then + self.destination:onCompleted() + end +end + --- @class AsyncSubject -- @description AsyncSubjects are subjects that produce either no values or a single value. If -- multiple values are produced via onNext, only the last one is used. If onError is called, then @@ -2307,6 +2373,7 @@ return { CooperativeScheduler = CooperativeScheduler, TimeoutScheduler = TimeoutScheduler, Subject = Subject, + AnonymousSubject = AnonymousSubject, AsyncSubject = AsyncSubject, BehaviorSubject = BehaviorSubject, ReplaySubject = ReplaySubject diff --git a/src/observable.lua b/src/observable.lua index ec792c5..347ed49 100644 --- a/src/observable.lua +++ b/src/observable.lua @@ -17,16 +17,21 @@ function Observable.create(subscribe) return setmetatable(self, Observable) end ---- Shorthand for creating an Observer and passing it to this Observable's subscription function. --- @arg {function} onNext - Called when the Observable produces a value. +--- Attaches an Observer to this Observable and returns a corresponding Subscription. +-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable +-- produces a value. If a function, then the three-argument semantics are used, which are a +-- shorthand for creating an Observer and passing it to this same method. -- @arg {function} onError - Called when the Observable terminates due to an error. -- @arg {function} onCompleted - Called when the Observable completes normally. +-- @returns {Subscription} function Observable:subscribe(onNext, onError, onCompleted) + local s if type(onNext) == 'table' then - return self._subscribe(onNext) + s = self._subscribe(onNext) else - return self._subscribe(Observer.create(onNext, onError, onCompleted)) + s = self._subscribe(Observer.create(onNext, onError, onCompleted)) end + return s or Subscription.create(util.noop) end --- Returns an Observable that immediately completes without producing a value. diff --git a/tests/anonymoussubject.lua b/tests/anonymoussubject.lua new file mode 100644 index 0000000..9012905 --- /dev/null +++ b/tests/anonymoussubject.lua @@ -0,0 +1,53 @@ +describe('AnonymousSubject', function() + describe('create', function() + it('returns an AnonymousSubject', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create(onNext, nil, onCompleted) + local subject = Rx.AnonymousSubject.create(observer, observable) + expect(subject).to.be.an(Rx.AnonymousSubject) + end) + it('returns an Observer', function() + local onNext, onCompleted = spy(), spy() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create(onNext, nil, onCompleted) + local subject = Rx.AnonymousSubject.create(observer, observable) + subject:onNext(1) + subject:onCompleted() + expect(onNext).to.equal({{1}}) + expect(#onCompleted).to.equal(1) + end) + it('returns an Observable', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create() + local subject = Rx.AnonymousSubject.create(observer, observable) + local onNext, onError, onCompleted = observableSpy(subject) + expect(onNext).to.equal({{42}}) + end) + it('supports operators', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create() + local subject = Rx.AnonymousSubject.create(observer, observable) + local onNext, onError, onCompleted = observableSpy(subject:map(tostring)) + expect(onNext).to.equal({{'42'}}) + end) + end) + + describe('subscribe', function() + it('returns a Subscription', function() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create() + local subject = Rx.AnonymousSubject.create(observer, observable) + expect(subject:subscribe(Rx.Observer.create())).to.be.an(Rx.Subscription) + end) + + it('accepts 3 functions as arguments', function() + local onNext, onCompleted = spy(), spy() + local observable = Rx.Observable.of(42) + local observer = Rx.Observer.create(onNext, nil, onCompleted) + local subject = Rx.AnonymousSubject.create(observer, observable) + subject:subscribe(observer) + expect(onNext).to.equal({{42}}) + expect(#onCompleted).to.equal(1) + end) + end) +end) diff --git a/tests/runner.lua b/tests/runner.lua index 7d05749..721c657 100644 --- a/tests/runner.lua +++ b/tests/runner.lua @@ -90,6 +90,7 @@ else 'observable', 'subscription', 'subject', + 'anonymoussubject', 'asyncsubject', 'behaviorsubject', 'replaysubject' diff --git a/tools/build.lua b/tools/build.lua index 6cac333..c32b63d 100644 --- a/tools/build.lua +++ b/tools/build.lua @@ -64,6 +64,7 @@ local files = { 'src/schedulers/cooperativescheduler.lua', 'src/schedulers/timeoutscheduler.lua', 'src/subjects/subject.lua', + 'src/subjects/anonymoussubject.lua', 'src/subjects/asyncsubject.lua', 'src/subjects/behaviorsubject.lua', 'src/subjects/replaysubject.lua', @@ -96,6 +97,7 @@ local footer = [[return { CooperativeScheduler = CooperativeScheduler, TimeoutScheduler = TimeoutScheduler, Subject = Subject, + AnonymousSubject = AnonymousSubject, AsyncSubject = AsyncSubject, BehaviorSubject = BehaviorSubject, ReplaySubject = ReplaySubject