diff --git a/rx.lua b/rx.lua index fcb5cf2..38fbb89 100644 --- a/rx.lua +++ b/rx.lua @@ -35,7 +35,6 @@ Subscription.__tostring = util.constant('Subscription') function Subscription.create(action) local self = { action = action or util.noop, - unsubscribed = false } return setmetatable(self, Subscription) @@ -43,9 +42,9 @@ end --- Unsubscribes the subscription, performing any necessary cleanup work. function Subscription:unsubscribe() - if self.unsubscribed then return end - self.action(self) - self.unsubscribed = true + action = self.action + self.action = util.noop + action(self) end --- @class Observer @@ -298,10 +297,12 @@ function Observable:all(predicate) predicate = predicate or util.identity return Observable.create(function(observer) + local subscription local function onNext(...) util.tryWithObserver(observer, function(...) if not predicate(...) then observer:onNext(false) + if subscription then subscription:unsubscribe() end observer:onCompleted() end end, ...) @@ -316,7 +317,8 @@ function Observable:all(predicate) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end @@ -455,6 +457,7 @@ function Observable:catch(handler) local function onError(e) if not handler then + if subscription then subscription:unsubscribe() end return observer:onCompleted() end @@ -877,10 +880,13 @@ function Observable:find(predicate) predicate = predicate or util.identity return Observable.create(function(observer) + local subscription + local function onNext(...) util.tryWithObserver(observer, function(...) if predicate(...) then observer:onNext(...) + if subscription then subscription:unsubscribe() end return observer:onCompleted() end end, ...) @@ -894,7 +900,8 @@ function Observable:find(predicate) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end @@ -1537,6 +1544,7 @@ function Observable:take(n) n = n or 1 return Observable.create(function(observer) + local subscription if n <= 0 then observer:onCompleted() return @@ -1550,6 +1558,7 @@ function Observable:take(n) i = i + 1 if i > n then + if subscription then subscription:unsubscribe() end observer:onCompleted() end end @@ -1562,7 +1571,8 @@ function Observable:take(n) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end @@ -1605,6 +1615,7 @@ end -- @returns {Observable} function Observable:takeUntil(other) return Observable.create(function(observer) + local subscription local function onNext(...) return observer:onNext(...) end @@ -1614,12 +1625,14 @@ function Observable:takeUntil(other) end local function onCompleted() + if subscription then subscription:unsubscribe() end return observer:onCompleted() end other:subscribe(onCompleted, onCompleted, onCompleted) - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end @@ -1631,6 +1644,7 @@ function Observable:takeWhile(predicate) return Observable.create(function(observer) local taking = true + local subscription local function onNext(...) if taking then @@ -1641,6 +1655,7 @@ function Observable:takeWhile(predicate) if taking then return observer:onNext(...) else + if subscription then subscription:unsubscribe() end return observer:onCompleted() end end @@ -1654,7 +1669,8 @@ function Observable:takeWhile(predicate) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end diff --git a/src/operators/all.lua b/src/operators/all.lua index 9431711..7acd67c 100644 --- a/src/operators/all.lua +++ b/src/operators/all.lua @@ -7,10 +7,12 @@ function Observable:all(predicate) predicate = predicate or util.identity return Observable.create(function(observer) + local subscription local function onNext(...) util.tryWithObserver(observer, function(...) if not predicate(...) then observer:onNext(false) + if subscription then subscription:unsubscribe() end observer:onCompleted() end end, ...) @@ -25,6 +27,7 @@ function Observable:all(predicate) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end diff --git a/src/operators/catch.lua b/src/operators/catch.lua index 0e81581..272eb6f 100644 --- a/src/operators/catch.lua +++ b/src/operators/catch.lua @@ -18,6 +18,7 @@ function Observable:catch(handler) local function onError(e) if not handler then + if subscription then subscription:unsubscribe() end return observer:onCompleted() end diff --git a/src/operators/find.lua b/src/operators/find.lua index 623a798..6bd3203 100644 --- a/src/operators/find.lua +++ b/src/operators/find.lua @@ -8,10 +8,13 @@ function Observable:find(predicate) predicate = predicate or util.identity return Observable.create(function(observer) + local subscription + local function onNext(...) util.tryWithObserver(observer, function(...) if predicate(...) then observer:onNext(...) + if subscription then subscription:unsubscribe() end return observer:onCompleted() end end, ...) @@ -25,6 +28,7 @@ function Observable:find(predicate) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end diff --git a/src/operators/take.lua b/src/operators/take.lua index cadeb2c..2507514 100644 --- a/src/operators/take.lua +++ b/src/operators/take.lua @@ -7,6 +7,7 @@ function Observable:take(n) n = n or 1 return Observable.create(function(observer) + local subscription if n <= 0 then observer:onCompleted() return @@ -20,6 +21,7 @@ function Observable:take(n) i = i + 1 if i > n then + if subscription then subscription:unsubscribe() end observer:onCompleted() end end @@ -32,6 +34,7 @@ function Observable:take(n) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end diff --git a/src/operators/takeUntil.lua b/src/operators/takeUntil.lua index d431969..f0b3bd2 100644 --- a/src/operators/takeUntil.lua +++ b/src/operators/takeUntil.lua @@ -5,6 +5,7 @@ local Observable = require 'observable' -- @returns {Observable} function Observable:takeUntil(other) return Observable.create(function(observer) + local subscription local function onNext(...) return observer:onNext(...) end @@ -14,11 +15,13 @@ function Observable:takeUntil(other) end local function onCompleted() + if subscription then subscription:unsubscribe() end return observer:onCompleted() end other:subscribe(onCompleted, onCompleted, onCompleted) - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end diff --git a/src/operators/takeWhile.lua b/src/operators/takeWhile.lua index 70a56d1..e31b3a3 100644 --- a/src/operators/takeWhile.lua +++ b/src/operators/takeWhile.lua @@ -9,6 +9,7 @@ function Observable:takeWhile(predicate) return Observable.create(function(observer) local taking = true + local subscription local function onNext(...) if taking then @@ -19,6 +20,7 @@ function Observable:takeWhile(predicate) if taking then return observer:onNext(...) else + if subscription then subscription:unsubscribe() end return observer:onCompleted() end end @@ -32,6 +34,7 @@ function Observable:takeWhile(predicate) return observer:onCompleted() end - return self:subscribe(onNext, onError, onCompleted) + subscription = self:subscribe(onNext, onError, onCompleted) + return subscription end) end diff --git a/src/subscription.lua b/src/subscription.lua index e78b364..2221390 100644 --- a/src/subscription.lua +++ b/src/subscription.lua @@ -14,7 +14,6 @@ Subscription.__tostring = util.constant('Subscription') function Subscription.create(action) local self = { action = action or util.noop, - unsubscribed = false } return setmetatable(self, Subscription) @@ -22,9 +21,9 @@ end --- Unsubscribes the subscription, performing any necessary cleanup work. function Subscription:unsubscribe() - if self.unsubscribed then return end - self.action(self) - self.unsubscribed = true + action = self.action + self.action = util.noop + action(self) end return Subscription