From abf909aab6ca8a9c5e543c1050414370688018ba Mon Sep 17 00:00:00 2001 From: Mat Date: Sat, 7 Jan 2023 14:17:28 +0800 Subject: [PATCH 1/5] Switchmap operator Minor docs amendments --- docs/operators.rst | 22 +-- docs/testing.rst | 85 +++++----- reactivex/operators/__init__.py | 39 +++++ tests/test_observable/test_switchmap.py | 196 ++++++++++++++++++++++++ 4 files changed, 293 insertions(+), 49 deletions(-) create mode 100644 tests/test_observable/test_switchmap.py diff --git a/docs/operators.rst b/docs/operators.rst index 63001ebf..d15eb3c0 100644 --- a/docs/operators.rst +++ b/docs/operators.rst @@ -26,16 +26,18 @@ Operator Description Transforming Observables ------------------------ -================================================ ================================================ -Operator Description -================================================ ================================================ -:func:`buffer ` Periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time. -:func:`flat_map ` Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. -:func:`group_by ` Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key. -:func:`map ` Transform the items emitted by an Observable by applying a function to each item. -:func:`scan ` Apply a function to each item emitted by an Observable, sequentially, and emit each successive value. -:func:`window ` Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time. -================================================ ================================================ +================================================ ================================================ +Operator Description +================================================ ================================================ +:func:`buffer ` Periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a time. +:func:`flat_map ` Transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable. +:func:`concat_map ` Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next. +:func:`switch_map ` Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable. +:func:`group_by ` Divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by key. +:func:`map ` Transform the items emitted by an Observable by applying a function to each item. +:func:`scan ` Apply a function to each item emitted by an Observable, sequentially, and emit each successive value. +:func:`window ` Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time. +================================================ ================================================ Filtering Observables ---------------------- diff --git a/docs/testing.rst b/docs/testing.rst index a4c27e89..f51bb72a 100644 --- a/docs/testing.rst +++ b/docs/testing.rst @@ -16,18 +16,13 @@ Basic example from reactivex.testing import ReactiveTest, TestScheduler from reactivex import operators - # setting up aliases for more concise code - on_next = ReactiveTest.on_next - on_error = ReactiveTest.on_error - on_completed = ReactiveTest.on_completed - def test_double(): # Create a scheduler scheduler = TestScheduler() # Define one or more source source = scheduler.create_hot_observable( - on_next(250, 3), - on_next(350, 5), + ReactiveTest.on_next(250, 3), + ReactiveTest.on_next(350, 5), ) # Define how the observable/operator is used on the source @@ -39,8 +34,8 @@ Basic example # check the messages and potentially subscriptions assert results.messages == [ - on_next(250, 6), - on_next(350, 10), + ReactiveTest.on_next(250, 6), + ReactiveTest.on_next(350, 10), ] @@ -53,6 +48,10 @@ or with full control, you can easily test various situations and combinations .. _in_sequence_or_throw: .. code:: python + # setting up aliases for more concise code + on_next = ReactiveTest.on_next + on_error = ReactiveTest.on_error + on_completed = ReactiveTest.on_completed def test_operator(): # Code to test; takes a sequence of integers and passes through, @@ -72,9 +71,8 @@ or with full control, you can easily test various situations and combinations source = scheduler.create_cold_observable( on_next(300, 1), on_next(400, 2), on_next(500, 3), on_completed(600) ) - # Here is another way to create the same observable, - # as long as we set the correct scheduler - source = reactivex.from_marbles('------1-2-3-|', timespan=50, scheduler=scheduler) + # Here is another way to create the same observable + source = reactivex.from_marbles('------1-2-3-|', timespan=50) # You can shorten the "create" function from the basic example to a lambda with no arguments result = scheduler.start(lambda: source.pipe( in_sequence_or_throw(), @@ -91,20 +89,20 @@ Timeline When ``scheduler.start`` is called, the test scheduler starts moving its virtual clock forward. Some important timestamps are however hidden as defaults, as listed below. -These values can be modified using kwargs in the ``scheduler.start(...)`` call: +These values can be modified using `kwargs` in the ``scheduler.start(...)`` call: 1. ``created`` [100]: When is the observable created. - That is when the ``create`` function seen in the basic example. + That is when the ``create`` function seen in the basic example is called. 2. ``subscribed`` [200]: When does the subscription occur. This explains the above emission timestamps: consider the first emission @500; given that we are using a cold observable, - and subscribe to it at 200, the "source"'s timeline starts at 200 and only 300 ticks later, it emits. + and subscribe to it at 200, the `source`'s timeline starts at 200 and only 300 ticks later, it emits. 3. ``disposed`` [1000]: When the subscription is disposed -Keep the following in mind when modifying these values: +Gotchas when modifying these values: -1. Do not use `0` as values since the code ignores that -2. If you change ``subscribed`` to be lower than 100, you need to change ``created`` as well +1. Do not use `0` as values for created/subscribed since the code would ignore it. +2. If you change ``subscribed`` to be lower than 100, you need to change ``created`` as well, otherwise nothing will happen. An alternative using marbles @@ -134,13 +132,17 @@ There is a simplified flow available in `reactivex.testing.marbles` and here's a assert results == outcome This method makes for very quick to write, and easy to read, tests. +At this moment however, it does not allow for testing subscriptions. Testing an observable factory ............................. -An observable created from `Observable(subscribe)` can be just as easily tested. -Let's use this example to additionally test a Disposable case. +An observable created directly from :class:`Observable ` +can be just as easily tested. + +In this example, we will additionally test a case where a +:class:`Disposable ` is used. .. code:: python @@ -163,7 +165,7 @@ Let's use this example to additionally test a Disposable case. on_next(220, 0), on_completed(220) ] - assert a == 43 + assert a == 43 # shows that our Disposable's action was as expected Testing errors @@ -188,20 +190,20 @@ Let's remedy that below. # At times it's better not to test the exact exception, # maybe its message changes with time or other reasons # We can test a specific notification's details as follows: - message, err = result.messages - assert message.time == 130 - assert err.time == 230 - assert message.value.kind == 'N' # Notification - assert err.value.kind == 'E' # E for errors - assert message.value.value == 1 - assert type(err.value.exception) == ValueError # look at .exception for errors + first_notification, error_notification = result.messages + assert first_notification.time == 130 + assert error_notification.time == 230 + assert first_notification.value.kind == 'N' # Notification + assert error_notification.value.kind == 'E' # E for errors + assert first_notification.value.value == 1 + assert type(error_notification.value.exception) == ValueError # look at .exception for errors Testing subscriptions, multiple observables, hot observables ............................................................ ``scheduler.start`` only allows for a single subscription. -Some cases like e.g. `operators.partition` require more. +Some cases like e.g. ``operators.partition`` require more. The examples below showcase some less commonly needed testing tools. .. code:: python @@ -218,7 +220,9 @@ The examples below showcase some less commonly needed testing tools. even.subscribe(steven) odd.subscribe(todd) - # Note! Since it's not "start" which creates the subscription, they actually occur at t=0 + # Note! Since the subscription is not created within + # `scheduler.start` below, the usual `subscribed` delay of t=200 + # is not in effect. The subscriptions therefore occur at t=0 scheduler.start() assert steven.messages == [ @@ -242,20 +246,23 @@ The examples below showcase some less commonly needed testing tools. shared = source.pipe( operators.share() ) - """first sub""" + # Creating our story: + # first sub is set to occur at t=200; this creates a sub on source scheduler.schedule_relative(200, lambda *_: subs.append(shared.subscribe(scheduler=scheduler))) - # second sub, should not sub to source itself + # second sub does not create a new sub on source, due to the `share` operator scheduler.schedule_relative(300, lambda *_: subs.append(shared.subscribe(scheduler=scheduler))) + # second sub ends scheduler.schedule_relative(500, lambda *_: subs[1].dispose()) + # first sub ends… and since there is no sub remaining, the only sub on source should be disposed too scheduler.schedule_relative(600, lambda *_: subs[0].dispose()) - """end first sub""" - # no existing sub should sub again onto source - we never dispose of it + # no existing sub on source, therefore this will create a new one + # we never dispose of it; we will test that infinite sub in the assertions scheduler.schedule_relative(900, lambda *_: subs.append(shared.subscribe(scheduler=scheduler))) scheduler.start() # Check that the submissions on the source are as expected assert source.subscriptions == [ - Subscription(200, 600), + Subscription(200, 600), # only one sub from 200 to 600 Subscription(900), # represents an infinite subscription ] @@ -279,9 +286,9 @@ The examples below showcase some less commonly needed testing tools. # the subscription starts at 200; # since `source` is a hot observable, the notification @190 will not be caught # the next notification is at 300 ticks, - # which, on our subscription, will show at 100 ticks (300-200 from subscribed) - # or 5 "-" each representing 20 ticks (timespan=20 in to_marbles) - # then the 42 is received - # and then nothing for another 200 ticks, so 10 "-" before complete + # which, on our subscription, will show at 100 ticks (300-200 from subscription delay) + # or 5 "-" each representing 20 ticks (timespan=20 in `to_marbles`). + # Then the "42" notification is received + # and then nothing for another 200 ticks, which is equal to 10 "-", before complete assert message.value.value == '-----(42)----------|' diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index cbf9b551..bc8a2346 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -3362,6 +3362,45 @@ def switch_latest() -> Callable[ return switch_latest_() +def switch_map( + project: Callable[[_T1, int], Observable[_T2]] +) -> Callable[[Observable[_T1]], Observable[_T2]]: + """Projects each source value to an Observable which is merged in + the output Observable, emitting values only from the most recently + projected Observable. + + + .. marble:: + :alt: switch_map + + ---a----------b-------c---------------| + [ switch_map(x,i: x*i---x*i---x*i|) ] + ---a---a---a--bb---bb-ccc---ccc---ccc-| + + Examples: + >>> op = switch_map(lambda x, i: reactivex.timer(1.0).pipe(map(x*i))) + + Args: + project: Projecting function which takes the outer observable value + and the emission index and emits the inner observable + + Returns: + An operator function that maps each value to the inner observable + and emits its values in order, emitting values only from the + most recently projected Observable. + + + If an inner observable complete, the resulting sequence does *not* + complete. + If an inner observable errors, the resulting sequence errors as well. + If the outer observable completes/errors, the resulting sequence + completes/errors. + + """ + + return compose(map_indexed(project), switch_latest()) + + def take(count: int) -> Callable[[Observable[_T]], Observable[_T]]: """Returns a specified number of contiguous elements from the start of an observable sequence. diff --git a/tests/test_observable/test_switchmap.py b/tests/test_observable/test_switchmap.py new file mode 100644 index 00000000..6cf93f8d --- /dev/null +++ b/tests/test_observable/test_switchmap.py @@ -0,0 +1,196 @@ +import unittest + +from reactivex import operators as ops, interval +from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.subscription import Subscription + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + + +class TestSwitchMap(unittest.TestCase): + def test_switch_map_uses_index(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next( + 300, + 'a' + ), + on_next( + 400, + 'b' + ), + on_next( + 500, + 'c' + ), + ) + + def create_inner(x: str, i: int): + def create_changing(j: int): + return (i, j, x) + return interval(20).pipe(ops.map(create_changing)) + + def create(): + return xs.pipe(ops.switch_map( + project=create_inner + )) + + results = scheduler.start(create, disposed=580) + # (i, j, x): i is the index of the outer emit; + # j is the value of the inner interval; + # x is the value of the outer emission + assert results.messages == [ + on_next(320, (0, 0, 'a')), + on_next(340, (0, 1, 'a')), + on_next(360, (0, 2, 'a')), + on_next(380, (0, 3, 'a')), + on_next(420, (1, 0, 'b')), + on_next(440, (1, 1, 'b')), + on_next(460, (1, 2, 'b')), + on_next(480, (1, 3, 'b')), + on_next(520, (2, 0, 'c')), + on_next(540, (2, 1, 'c')), + on_next(560, (2, 2, 'c')), + ] + assert xs.subscriptions == [ + Subscription(200, 580) + ] + + def test_switch_map_inner_throws(self): + """Inner throwing causes outer to throw""" + ex = "ex" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, 'a'), on_next(300, 'aa')), + scheduler.create_cold_observable(on_next(50, 'b'), on_error(120, ex)), + scheduler.create_cold_observable(on_next(50, 'wont happen'), on_error(120, "no")), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next( + 400, + 1 + ), + on_next( + 550, + 2, + ), + ) + + def create_inner(x: int, _i: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, 'a'), + on_next(450, 'b'), + on_error(520, ex), + ] + assert sources[0].subscriptions == [ + Subscription(250, 400) + ] + assert sources[1].subscriptions == [ + Subscription(400, 520) + ] + assert sources[2].subscriptions == [] + + def test_switch_map_outer_throws(self): + """Outer throwing unsubscribes from all""" + ex = "ABC" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, 'a'), on_next(300, 'aa')), + scheduler.create_cold_observable(on_next(50, 'b'), on_error(120, ex)), + scheduler.create_cold_observable(on_next(50, 'wont happen'), on_error(120, "no")), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next( + 400, + 1 + ), + on_error(430, ex), + ) + + def create_inner(x: int, _i: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, 'a'), + on_error(430, ex), + ] + assert sources[0].subscriptions == [ + Subscription(250, 400) + ] + assert sources[1].subscriptions == [ + Subscription(400, 430) + ] + assert sources[2].subscriptions == [] + + def test_switch_map_no_inner(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_completed(500)) + # Fake inner which should never be subscribed to + sources = [ + scheduler.create_cold_observable(on_next(20, 2)) + ] + + def create_inner(_x: int, i: int): + return sources[i] + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [on_completed(500)] + assert xs.subscriptions == [Subscription(200, 500)] + assert sources[0].subscriptions == [] + + def test_switch_map_inner_completes(self): + """Inner completions do not affect outer""" + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next( + 300, + 'd' + ), + on_next( + 330, + 'f' + ), + on_completed(540), + ) + + def create_inner(x: str, i: int): + """An observable which will complete after 40 ticks""" + return interval(20).pipe(ops.map(lambda j: (i, j, x)), ops.take(2)) + + def create(): + return xs.pipe(ops.switch_map(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(320, (0,0,'d')), + on_next(350, (1,0,'f')), + on_next(370, (1,1,'f')), # here the current inner is unsubscribed but not the outer + on_completed(540), # only outer completion affects + ] From 390cb50081cbf37ea8e79eff000c3129676b8fe0 Mon Sep 17 00:00:00 2001 From: Mat Date: Sat, 7 Jan 2023 14:23:59 +0800 Subject: [PATCH 2/5] Black --- reactivex/operators/__init__.py | 4 +- tests/test_observable/test_switchmap.py | 125 ++++++++++-------------- 2 files changed, 51 insertions(+), 78 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index bc8a2346..02d678c7 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -3389,11 +3389,11 @@ def switch_map( and emits its values in order, emitting values only from the most recently projected Observable. - + If an inner observable complete, the resulting sequence does *not* complete. If an inner observable errors, the resulting sequence errors as well. - If the outer observable completes/errors, the resulting sequence + If the outer observable completes/errors, the resulting sequence completes/errors. """ diff --git a/tests/test_observable/test_switchmap.py b/tests/test_observable/test_switchmap.py index 6cf93f8d..ba33eba1 100644 --- a/tests/test_observable/test_switchmap.py +++ b/tests/test_observable/test_switchmap.py @@ -1,6 +1,7 @@ import unittest -from reactivex import operators as ops, interval +from reactivex import interval +from reactivex import operators as ops from reactivex.testing import ReactiveTest, TestScheduler from reactivex.testing.subscription import Subscription @@ -17,69 +18,56 @@ class TestSwitchMap(unittest.TestCase): def test_switch_map_uses_index(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable( - on_next( - 300, - 'a' - ), - on_next( - 400, - 'b' - ), - on_next( - 500, - 'c' - ), + on_next(300, "a"), + on_next(400, "b"), + on_next(500, "c"), ) def create_inner(x: str, i: int): def create_changing(j: int): return (i, j, x) + return interval(20).pipe(ops.map(create_changing)) def create(): - return xs.pipe(ops.switch_map( - project=create_inner - )) + return xs.pipe(ops.switch_map(project=create_inner)) results = scheduler.start(create, disposed=580) - # (i, j, x): i is the index of the outer emit; - # j is the value of the inner interval; + # (i, j, x): i is the index of the outer emit; + # j is the value of the inner interval; # x is the value of the outer emission assert results.messages == [ - on_next(320, (0, 0, 'a')), - on_next(340, (0, 1, 'a')), - on_next(360, (0, 2, 'a')), - on_next(380, (0, 3, 'a')), - on_next(420, (1, 0, 'b')), - on_next(440, (1, 1, 'b')), - on_next(460, (1, 2, 'b')), - on_next(480, (1, 3, 'b')), - on_next(520, (2, 0, 'c')), - on_next(540, (2, 1, 'c')), - on_next(560, (2, 2, 'c')), - ] - assert xs.subscriptions == [ - Subscription(200, 580) + on_next(320, (0, 0, "a")), + on_next(340, (0, 1, "a")), + on_next(360, (0, 2, "a")), + on_next(380, (0, 3, "a")), + on_next(420, (1, 0, "b")), + on_next(440, (1, 1, "b")), + on_next(460, (1, 2, "b")), + on_next(480, (1, 3, "b")), + on_next(520, (2, 0, "c")), + on_next(540, (2, 1, "c")), + on_next(560, (2, 2, "c")), ] + assert xs.subscriptions == [Subscription(200, 580)] def test_switch_map_inner_throws(self): """Inner throwing causes outer to throw""" ex = "ex" scheduler = TestScheduler() sources = [ - scheduler.create_cold_observable(on_next(100, 'a'), on_next(300, 'aa')), - scheduler.create_cold_observable(on_next(50, 'b'), on_error(120, ex)), - scheduler.create_cold_observable(on_next(50, 'wont happen'), on_error(120, "no")), + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), ] xs = scheduler.create_hot_observable( on_next( 250, 0, ), - on_next( - 400, - 1 - ), + on_next(400, 1), on_next( 550, 2, @@ -94,36 +82,31 @@ def create(): results = scheduler.start(create) assert results.messages == [ - on_next(350, 'a'), - on_next(450, 'b'), + on_next(350, "a"), + on_next(450, "b"), on_error(520, ex), ] - assert sources[0].subscriptions == [ - Subscription(250, 400) - ] - assert sources[1].subscriptions == [ - Subscription(400, 520) - ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 520)] assert sources[2].subscriptions == [] - + def test_switch_map_outer_throws(self): """Outer throwing unsubscribes from all""" ex = "ABC" scheduler = TestScheduler() sources = [ - scheduler.create_cold_observable(on_next(100, 'a'), on_next(300, 'aa')), - scheduler.create_cold_observable(on_next(50, 'b'), on_error(120, ex)), - scheduler.create_cold_observable(on_next(50, 'wont happen'), on_error(120, "no")), + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), ] xs = scheduler.create_hot_observable( on_next( 250, 0, ), - on_next( - 400, - 1 - ), + on_next(400, 1), on_error(430, ex), ) @@ -135,24 +118,18 @@ def create(): results = scheduler.start(create) assert results.messages == [ - on_next(350, 'a'), + on_next(350, "a"), on_error(430, ex), ] - assert sources[0].subscriptions == [ - Subscription(250, 400) - ] - assert sources[1].subscriptions == [ - Subscription(400, 430) - ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 430)] assert sources[2].subscriptions == [] def test_switch_map_no_inner(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable(on_completed(500)) # Fake inner which should never be subscribed to - sources = [ - scheduler.create_cold_observable(on_next(20, 2)) - ] + sources = [scheduler.create_cold_observable(on_next(20, 2))] def create_inner(_x: int, i: int): return sources[i] @@ -169,14 +146,8 @@ def test_switch_map_inner_completes(self): """Inner completions do not affect outer""" scheduler = TestScheduler() xs = scheduler.create_hot_observable( - on_next( - 300, - 'd' - ), - on_next( - 330, - 'f' - ), + on_next(300, "d"), + on_next(330, "f"), on_completed(540), ) @@ -189,8 +160,10 @@ def create(): results = scheduler.start(create) assert results.messages == [ - on_next(320, (0,0,'d')), - on_next(350, (1,0,'f')), - on_next(370, (1,1,'f')), # here the current inner is unsubscribed but not the outer + on_next(320, (0, 0, "d")), + on_next(350, (1, 0, "f")), + on_next( + 370, (1, 1, "f") + ), # here the current inner is unsubscribed but not the outer on_completed(540), # only outer completion affects ] From 85c7a0be7e928937ca727d3081276f164551421c Mon Sep 17 00:00:00 2001 From: matiboy Date: Sun, 8 Jan 2023 07:45:14 +0800 Subject: [PATCH 3/5] Apply suggestions from code review Co-authored-by: Dag Brattli --- reactivex/operators/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 02d678c7..7c2cdeb3 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -3362,8 +3362,8 @@ def switch_latest() -> Callable[ return switch_latest_() -def switch_map( - project: Callable[[_T1, int], Observable[_T2]] +def switch_map_indexed( + project: Optional[MapperIndexed[_T1, Observable[_T2]]] ) -> Callable[[Observable[_T1]], Observable[_T2]]: """Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently From 4e5681957e9c22ae2d2c8dc2674b9983c36ffe84 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 8 Jan 2023 17:55:04 +0800 Subject: [PATCH 4/5] Change to switch_map and switch_map_index Add test for default projector --- reactivex/operators/__init__.py | 51 ++++- tests/test_observable/test_switchmap.py | 62 +++--- .../test_observable/test_switchmapindexed.py | 185 ++++++++++++++++++ 3 files changed, 270 insertions(+), 28 deletions(-) create mode 100644 tests/test_observable/test_switchmapindexed.py diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 7c2cdeb3..7a1a5b45 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -26,6 +26,7 @@ Observable, abc, compose, + just, typing, ) from reactivex.internal.basic import identity @@ -3362,8 +3363,48 @@ def switch_latest() -> Callable[ return switch_latest_() +def switch_map( + project: Optional[Mapper[_T1, Observable[_T2]]] = None +) -> Callable[[Observable[_T1]], Observable[_T2]]: + """Projects each source value to an Observable which is merged in + the output Observable, emitting values only from the most recently + projected Observable. + + + .. marble:: + :alt: switch_map + + ---a----------b-------c---------| + [ switch_map(x: x---x---x|) ] + ---a---a---a--b---b---c---c---c-| + + Examples: + >>> op = switch_map(lambda x: reactivex.timer(1.0).pipe(map(lambda x: x))) + >>> op = switch_map() + + Args: + project: Projecting function which takes the outer observable value + and the emission index and emits the inner observable; defaults to `identity` + + Returns: + An operator function that maps each value to the inner observable + and emits its values in order, emitting values only from the + most recently projected Observable. + + + If an inner observable complete, the resulting sequence does *not* + complete. + If an inner observable errors, the resulting sequence errors as well. + If the outer observable completes/errors, the resulting sequence + completes/errors. + + """ + + return compose(map(project), switch_latest()) + + def switch_map_indexed( - project: Optional[MapperIndexed[_T1, Observable[_T2]]] + project: Optional[MapperIndexed[_T1, Observable[_T2]]] = None ) -> Callable[[Observable[_T1]], Observable[_T2]]: """Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently @@ -3373,12 +3414,12 @@ def switch_map_indexed( .. marble:: :alt: switch_map - ---a----------b-------c---------------| - [ switch_map(x,i: x*i---x*i---x*i|) ] - ---a---a---a--bb---bb-ccc---ccc---ccc-| + ---a----------b-------c---------------------| + [ switch_map_indexed(x,i: x*i---x*i---x*i|) ] + ---a---a---a--bb---bb-ccc---ccc---ccc-------| Examples: - >>> op = switch_map(lambda x, i: reactivex.timer(1.0).pipe(map(x*i))) + >>> op = switch_map_indexed(lambda x, i: reactivex.timer(1.0).pipe(map(x*i))) Args: project: Projecting function which takes the outer observable value diff --git a/tests/test_observable/test_switchmap.py b/tests/test_observable/test_switchmap.py index ba33eba1..44a1433e 100644 --- a/tests/test_observable/test_switchmap.py +++ b/tests/test_observable/test_switchmap.py @@ -3,6 +3,7 @@ from reactivex import interval from reactivex import operators as ops from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.marbles import marbles_testing from reactivex.testing.subscription import Subscription on_next = ReactiveTest.on_next @@ -15,7 +16,7 @@ class TestSwitchMap(unittest.TestCase): - def test_switch_map_uses_index(self): + def test_switch_map(self): scheduler = TestScheduler() xs = scheduler.create_hot_observable( on_next(300, "a"), @@ -23,9 +24,9 @@ def test_switch_map_uses_index(self): on_next(500, "c"), ) - def create_inner(x: str, i: int): + def create_inner(x: str): def create_changing(j: int): - return (i, j, x) + return (j, x) return interval(20).pipe(ops.map(create_changing)) @@ -37,17 +38,17 @@ def create(): # j is the value of the inner interval; # x is the value of the outer emission assert results.messages == [ - on_next(320, (0, 0, "a")), - on_next(340, (0, 1, "a")), - on_next(360, (0, 2, "a")), - on_next(380, (0, 3, "a")), - on_next(420, (1, 0, "b")), - on_next(440, (1, 1, "b")), - on_next(460, (1, 2, "b")), - on_next(480, (1, 3, "b")), - on_next(520, (2, 0, "c")), - on_next(540, (2, 1, "c")), - on_next(560, (2, 2, "c")), + on_next(320, (0, "a")), + on_next(340, (1, "a")), + on_next(360, (2, "a")), + on_next(380, (3, "a")), + on_next(420, (0, "b")), + on_next(440, (1, "b")), + on_next(460, (2, "b")), + on_next(480, (3, "b")), + on_next(520, (0, "c")), + on_next(540, (1, "c")), + on_next(560, (2, "c")), ] assert xs.subscriptions == [Subscription(200, 580)] @@ -74,7 +75,7 @@ def test_switch_map_inner_throws(self): ), ) - def create_inner(x: int, _i: int): + def create_inner(x: int): return sources[x] def create(): @@ -110,7 +111,7 @@ def test_switch_map_outer_throws(self): on_error(430, ex), ) - def create_inner(x: int, _i: int): + def create_inner(x: int): return sources[x] def create(): @@ -131,8 +132,8 @@ def test_switch_map_no_inner(self): # Fake inner which should never be subscribed to sources = [scheduler.create_cold_observable(on_next(20, 2))] - def create_inner(_x: int, i: int): - return sources[i] + def create_inner(_x: int): + return sources.pop(0) def create(): return xs.pipe(ops.switch_map(create_inner)) @@ -151,19 +152,34 @@ def test_switch_map_inner_completes(self): on_completed(540), ) - def create_inner(x: str, i: int): + def create_inner(x: str): """An observable which will complete after 40 ticks""" - return interval(20).pipe(ops.map(lambda j: (i, j, x)), ops.take(2)) + return interval(20).pipe(ops.map(lambda j: (j, x)), ops.take(2)) def create(): return xs.pipe(ops.switch_map(create_inner)) results = scheduler.start(create) assert results.messages == [ - on_next(320, (0, 0, "d")), - on_next(350, (1, 0, "f")), + on_next(320, (0, "d")), + on_next(350, (0, "f")), on_next( - 370, (1, 1, "f") + 370, (1, "f") ), # here the current inner is unsubscribed but not the outer on_completed(540), # only outer completion affects ] + + def test_switch_map_default_mapper(self): + with marbles_testing(timespan=10) as (start, cold, hot, exp): + xs = hot( + " ---a---b------c-----", + { + "a": cold(" --1--2"), + "b": cold(" --1-2-3-4-5|"), + "c": cold(" --1--2"), + }, + None, + ) + expected = exp(" -----1---1-2-3--1--2", None, None) + result = start(xs.pipe(ops.switch_map())) + assert result == expected diff --git a/tests/test_observable/test_switchmapindexed.py b/tests/test_observable/test_switchmapindexed.py new file mode 100644 index 00000000..09736d5f --- /dev/null +++ b/tests/test_observable/test_switchmapindexed.py @@ -0,0 +1,185 @@ +import unittest + +from reactivex import interval +from reactivex import operators as ops +from reactivex.testing import ReactiveTest, TestScheduler +from reactivex.testing.subscription import Subscription +from reactivex.testing.marbles import marbles_testing + +on_next = ReactiveTest.on_next +on_completed = ReactiveTest.on_completed +on_error = ReactiveTest.on_error +subscribe = ReactiveTest.subscribe +subscribed = ReactiveTest.subscribed +disposed = ReactiveTest.disposed +created = ReactiveTest.created + + +class TestSwitchMapIndex(unittest.TestCase): + def test_switch_map_indexed_uses_index(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(300, "a"), + on_next(400, "b"), + on_next(500, "c"), + ) + + def create_inner(x: str, i: int): + def create_changing(j: int): + return (i, j, x) + + return interval(20).pipe(ops.map(create_changing)) + + def create(): + return xs.pipe(ops.switch_map_indexed(project=create_inner)) + + results = scheduler.start(create, disposed=580) + # (i, j, x): i is the index of the outer emit; + # j is the value of the inner interval; + # x is the value of the outer emission + assert results.messages == [ + on_next(320, (0, 0, "a")), + on_next(340, (0, 1, "a")), + on_next(360, (0, 2, "a")), + on_next(380, (0, 3, "a")), + on_next(420, (1, 0, "b")), + on_next(440, (1, 1, "b")), + on_next(460, (1, 2, "b")), + on_next(480, (1, 3, "b")), + on_next(520, (2, 0, "c")), + on_next(540, (2, 1, "c")), + on_next(560, (2, 2, "c")), + ] + assert xs.subscriptions == [Subscription(200, 580)] + + def test_switch_map_indexed_inner_throws(self): + """Inner throwing causes outer to throw""" + ex = "ex" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next(400, 1), + on_next( + 550, + 2, + ), + ) + + def create_inner(x: int, _i: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, "a"), + on_next(450, "b"), + on_error(520, ex), + ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 520)] + assert sources[2].subscriptions == [] + + def test_switch_map_indexed_outer_throws(self): + """Outer throwing unsubscribes from all""" + ex = "ABC" + scheduler = TestScheduler() + sources = [ + scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")), + scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)), + scheduler.create_cold_observable( + on_next(50, "wont happen"), on_error(120, "no") + ), + ] + xs = scheduler.create_hot_observable( + on_next( + 250, + 0, + ), + on_next(400, 1), + on_error(430, ex), + ) + + def create_inner(x: int, _i: int): + return sources[x] + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(350, "a"), + on_error(430, ex), + ] + assert sources[0].subscriptions == [Subscription(250, 400)] + assert sources[1].subscriptions == [Subscription(400, 430)] + assert sources[2].subscriptions == [] + + def test_switch_map_indexed_no_inner(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable(on_completed(500)) + # Fake inner which should never be subscribed to + sources = [scheduler.create_cold_observable(on_next(20, 2))] + + def create_inner(_x: int, i: int): + return sources[i] + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [on_completed(500)] + assert xs.subscriptions == [Subscription(200, 500)] + assert sources[0].subscriptions == [] + + def test_switch_map_indexed_inner_completes(self): + """Inner completions do not affect outer""" + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(300, "d"), + on_next(330, "f"), + on_completed(540), + ) + + def create_inner(x: str, i: int): + """An observable which will complete after 40 ticks""" + return interval(20).pipe(ops.map(lambda j: (i, j, x)), ops.take(2)) + + def create(): + return xs.pipe(ops.switch_map_indexed(create_inner)) + + results = scheduler.start(create) + assert results.messages == [ + on_next(320, (0, 0, "d")), + on_next(350, (1, 0, "f")), + on_next( + 370, (1, 1, "f") + ), # here the current inner is unsubscribed but not the outer + on_completed(540), # only outer completion affects + ] + + def test_switch_map_default_mapper(self): + with marbles_testing(timespan=10) as (start, cold, hot, exp): + xs = hot( + " ---a---b------c-----", + { + "a": cold(" --1--2"), + "b": cold(" --1-2-3-4-5|"), + "c": cold(" --1--2"), + }, + None, + ) + expected = exp(" -----1---1-2-3--1--2", None, None) + result = start(xs.pipe(ops.switch_map_indexed())) + assert result == expected From 2a7d7212c02ed55f445a68f65e27136256105060 Mon Sep 17 00:00:00 2001 From: mat Date: Sun, 8 Jan 2023 18:02:08 +0800 Subject: [PATCH 5/5] Black fixes --- reactivex/operators/__init__.py | 1 - tests/test_observable/test_switchmap.py | 6 +++--- tests/test_observable/test_switchmapindexed.py | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/reactivex/operators/__init__.py b/reactivex/operators/__init__.py index 7a1a5b45..76bd13c0 100644 --- a/reactivex/operators/__init__.py +++ b/reactivex/operators/__init__.py @@ -26,7 +26,6 @@ Observable, abc, compose, - just, typing, ) from reactivex.internal.basic import identity diff --git a/tests/test_observable/test_switchmap.py b/tests/test_observable/test_switchmap.py index 44a1433e..363b818a 100644 --- a/tests/test_observable/test_switchmap.py +++ b/tests/test_observable/test_switchmap.py @@ -174,9 +174,9 @@ def test_switch_map_default_mapper(self): xs = hot( " ---a---b------c-----", { - "a": cold(" --1--2"), - "b": cold(" --1-2-3-4-5|"), - "c": cold(" --1--2"), + "a": cold(" --1--2", None, None), + "b": cold(" --1-2-3-4-5|", None, None), + "c": cold(" --1--2", None, None), }, None, ) diff --git a/tests/test_observable/test_switchmapindexed.py b/tests/test_observable/test_switchmapindexed.py index 09736d5f..5d5b3d15 100644 --- a/tests/test_observable/test_switchmapindexed.py +++ b/tests/test_observable/test_switchmapindexed.py @@ -3,8 +3,8 @@ from reactivex import interval from reactivex import operators as ops from reactivex.testing import ReactiveTest, TestScheduler -from reactivex.testing.subscription import Subscription from reactivex.testing.marbles import marbles_testing +from reactivex.testing.subscription import Subscription on_next = ReactiveTest.on_next on_completed = ReactiveTest.on_completed @@ -174,9 +174,9 @@ def test_switch_map_default_mapper(self): xs = hot( " ---a---b------c-----", { - "a": cold(" --1--2"), - "b": cold(" --1-2-3-4-5|"), - "c": cold(" --1--2"), + "a": cold(" --1--2", None, None), + "b": cold(" --1-2-3-4-5|", None, None), + "c": cold(" --1--2", None, None), }, None, )