Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions rx.lua
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,21 @@ function Observable.create(subscribe)
return setmetatable(self, Observable)
end

--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
-- @arg {function} onNext - Called when the Observable produces a value.
--- Attaches an Observer to this Observable and returns a corresponding Subscription.
-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable
-- produces a value. If a function, then the three-argument semantics are used, which are a
-- shorthand for creating an Observer and passing it to this same method.
-- @arg {function} onError - Called when the Observable terminates due to an error.
-- @arg {function} onCompleted - Called when the Observable completes normally.
-- @returns {Subscription}
function Observable:subscribe(onNext, onError, onCompleted)
local s
if type(onNext) == 'table' then
return self._subscribe(onNext)
s = self._subscribe(onNext)
else
return self._subscribe(Observer.create(onNext, onError, onCompleted))
s = self._subscribe(Observer.create(onNext, onError, onCompleted))
end
return s or Subscription.create(util.noop)
end

--- Returns an Observable that immediately completes without producing a value.
Expand Down Expand Up @@ -2076,6 +2081,67 @@ end

Subject.__call = Subject.onNext

--- @class AnonymousSubject
-- @description Like Subject, AnonymousSubjects function both as an Observer and as an Observable.
-- The functionality of both sides of an AnonymousSubject is provided by an external observer
-- and observable. Usually, the entity that creates the AnonymousSubject will subscribe to
-- the observable so that pushing a value to the AnonymousSubject results in an effect on the
-- observer.

local AnonymousSubject = setmetatable({}, Observable)
AnonymousSubject.__index = AnonymousSubject
AnonymousSubject.__tostring = util.constant('AnonymousSubject')

--- Creates a new AnonymousSubject
-- @arg{Observer} destination - the observer (input) side of the AnonymousSubject
-- @arg{Observable} source - the observable (output) side of the AnonymousSubject
-- @returns {AnonymousSubject}
function AnonymousSubject.create(_destination, _source)
local self = {
destination = _destination,
source = _source
}

return setmetatable(self, AnonymousSubject)
end

--- Attaches an Observer to this AnonymousSubject's observable.
-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable
-- produces a value. If a function, then the three-argument semantics are used, which are a
-- shorthand for creating an Observer and passing it to this same method.
-- @arg {function} onError - Called when the Observable terminates due to an error.
-- @arg {function} onCompleted - Called when the Observable completes normally.
function AnonymousSubject:subscribe(onNext, onError, onCompleted)
if self.source then
return self.source:subscribe(onNext, onError, onCompleted)
else
return Subscription.create(util.noop)
end
end

--- Pushes zero or more values to the AnonymousSubject.
-- @arg {*...} values
function AnonymousSubject:onNext(...)
if self.destination then
self.destination:onNext(...)
end
end

--- Signal to the AnonymousSubject that an error has occurred.
-- @arg {string=} message - A string describing what went wrong.
function AnonymousSubject:onError(message)
if self.destination then
self.destination:onError(message)
end
end

--- Signal to the AnonymousSubject that its observer will not be fed any more values.
function AnonymousSubject:onCompleted()
if self.destination then
self.destination:onCompleted()
end
end

--- @class AsyncSubject
-- @description AsyncSubjects are subjects that produce either no values or a single value. If
-- multiple values are produced via onNext, only the last one is used. If onError is called, then
Expand Down Expand Up @@ -2307,6 +2373,7 @@ return {
CooperativeScheduler = CooperativeScheduler,
TimeoutScheduler = TimeoutScheduler,
Subject = Subject,
AnonymousSubject = AnonymousSubject,
AsyncSubject = AsyncSubject,
BehaviorSubject = BehaviorSubject,
ReplaySubject = ReplaySubject
Expand Down
13 changes: 9 additions & 4 deletions src/observable.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ function Observable.create(subscribe)
return setmetatable(self, Observable)
end

--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
-- @arg {function} onNext - Called when the Observable produces a value.
--- Attaches an Observer to this Observable and returns a corresponding Subscription.
-- @arg {Observer|function} onNext - An Observer, or a function which is called when the Observable
-- produces a value. If a function, then the three-argument semantics are used, which are a
-- shorthand for creating an Observer and passing it to this same method.
-- @arg {function} onError - Called when the Observable terminates due to an error.
-- @arg {function} onCompleted - Called when the Observable completes normally.
-- @returns {Subscription}
function Observable:subscribe(onNext, onError, onCompleted)
local s
if type(onNext) == 'table' then
return self._subscribe(onNext)
s = self._subscribe(onNext)
else
return self._subscribe(Observer.create(onNext, onError, onCompleted))
s = self._subscribe(Observer.create(onNext, onError, onCompleted))
end
return s or Subscription.create(util.noop)
end

--- Returns an Observable that immediately completes without producing a value.
Expand Down
53 changes: 53 additions & 0 deletions tests/anonymoussubject.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
describe('AnonymousSubject', function()
describe('create', function()
it('returns an AnonymousSubject', function()
local observable = Rx.Observable.of(42)
local observer = Rx.Observer.create(onNext, nil, onCompleted)
local subject = Rx.AnonymousSubject.create(observer, observable)
expect(subject).to.be.an(Rx.AnonymousSubject)
end)
it('returns an Observer', function()
local onNext, onCompleted = spy(), spy()
local observable = Rx.Observable.of(42)
local observer = Rx.Observer.create(onNext, nil, onCompleted)
local subject = Rx.AnonymousSubject.create(observer, observable)
subject:onNext(1)
subject:onCompleted()
expect(onNext).to.equal({{1}})
expect(#onCompleted).to.equal(1)
end)
it('returns an Observable', function()
local observable = Rx.Observable.of(42)
local observer = Rx.Observer.create()
local subject = Rx.AnonymousSubject.create(observer, observable)
local onNext, onError, onCompleted = observableSpy(subject)
expect(onNext).to.equal({{42}})
end)
it('supports operators', function()
local observable = Rx.Observable.of(42)
local observer = Rx.Observer.create()
local subject = Rx.AnonymousSubject.create(observer, observable)
local onNext, onError, onCompleted = observableSpy(subject:map(tostring))
expect(onNext).to.equal({{'42'}})
end)
end)

describe('subscribe', function()
it('returns a Subscription', function()
local observable = Rx.Observable.of(42)
local observer = Rx.Observer.create()
local subject = Rx.AnonymousSubject.create(observer, observable)
expect(subject:subscribe(Rx.Observer.create())).to.be.an(Rx.Subscription)
end)

it('accepts 3 functions as arguments', function()
local onNext, onCompleted = spy(), spy()
local observable = Rx.Observable.of(42)
local observer = Rx.Observer.create(onNext, nil, onCompleted)
local subject = Rx.AnonymousSubject.create(observer, observable)
subject:subscribe(observer)
expect(onNext).to.equal({{42}})
expect(#onCompleted).to.equal(1)
end)
end)
end)
1 change: 1 addition & 0 deletions tests/runner.lua
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ else
'observable',
'subscription',
'subject',
'anonymoussubject',
'asyncsubject',
'behaviorsubject',
'replaysubject'
Expand Down
2 changes: 2 additions & 0 deletions tools/build.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ local files = {
'src/schedulers/cooperativescheduler.lua',
'src/schedulers/timeoutscheduler.lua',
'src/subjects/subject.lua',
'src/subjects/anonymoussubject.lua',
'src/subjects/asyncsubject.lua',
'src/subjects/behaviorsubject.lua',
'src/subjects/replaysubject.lua',
Expand Down Expand Up @@ -96,6 +97,7 @@ local footer = [[return {
CooperativeScheduler = CooperativeScheduler,
TimeoutScheduler = TimeoutScheduler,
Subject = Subject,
AnonymousSubject = AnonymousSubject,
AsyncSubject = AsyncSubject,
BehaviorSubject = BehaviorSubject,
ReplaySubject = ReplaySubject
Expand Down