-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Pubsub subscriber #3637
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Pubsub subscriber #3637
Changes from 21 commits
Commits
Show all changes
74 commits
Select commit
Hold shift + click to select a range
0b55ff5
wip
lukesneeringer 210ef3b
Wrote some docs; not much else.
lukesneeringer 7cd6156
subscriber wip
lukesneeringer fc4ead3
WIP
890de3a
wip
12ace0e
wip
7305000
wip
lukesneeringer e5a27ae
Fix a couple minor lint issues.
d50a22e
Adapting a subscriber that will work.
14f200a
WIP
6a7e846
Implement lease management.
3bb130b
WIP
f97dc23
WIP
303436c
WIP
933d2f3
WIP
1df0ccf
WIP
acb4534
WIP
2fb2785
Update subscriber client config to be sane.
ef178e9
Start adding unit tests.
147ad18
Beginning work on unit tests.
c96367a
Merge branch 'pubsub-publisher' into pubsub-subscriber
9c701e3
Publisher tests complete.
de38b83
subscriber/client.py tests
faeaa8e
Consumer tests
d467719
Fix minor linting error.
c821d33
Histogram tests
ed750b2
Minor fix based on Max feedback.
216310c
starting on helper thread tests
a1fd287
Add tests for helper_threads.
32701e1
Almost done with unit tests.
34272ad
Full coverage.
lukesneeringer e1c7c84
Do not send policy across the concurrency boundary.
2b21f48
Shift flow control to the policy class.
7f4b91c
Move the request queue to using keyword arguments.
3852805
Can has flow control.
b697be2
Merge branch 'public-master' into pubsub-subscriber
81b37f4
Subscription fixes.
5784d4d
Change batch time, add gRPC time logging.
97d8431
Unit test fix.
cb7dc05
Minor RST fixes (thanks @jonparrott).
6994465
Remove the ignore in .flake8.
eae7e14
Set gRPC limit to 20MB + 1
6afcd2a
Suppress not-working grpc options.
lukesneeringer e8c0a78
Merge branch 'public-master' into pubsub-subscriber
41cfc08
Merge branch 'public-master' into pubsub-subscriber
dd096e1
Merge branch 'pubsub-publisher' into pubsub-subscriber
b76d363
Merge branch 'pubsub-publisher' into pubsub-subscriber
760bef6
Merge branch 'pubsub-publisher' into pubsub-subscriber
f196b5e
Fix some tests to match new futures.
5dbfd0a
Merge branch 'pubsub-publisher' into pubsub-subscriber
ee144aa
Move the future tests to match the code.
40ea1e6
Merge branch 'pubsub-publisher' into pubsub-subscriber
8cb8f98
Fix a publish failure test.
47678c3
Fix final test.
90ef40f
Sane max_workers default for 2.7 and 3.4
34c8273
Mock credentials appropriately.
831fe75
Remove fail_under from .coveragerc.
02fa81f
Make histogram and helper_threads private.
2458b55
Add a publishing system test.
aa56a9b
Merge branch 'pubsub-subscriber' of github.com:GoogleCloudPlatform/go…
17b6544
Subscription system test.
f469381
Merge branch 'pubsub-publisher' into pubsub-subscriber
a24c0a7
Update tests.
a054324
Make the wait test work on 2.7
3735da5
Discarding unused mocks.
ac9f182
Make _consumer a private module.
f272eca
Switch from recursion to while for maintain_leases.
e6bcbe7
Add exception logging in the callback.
b9115ca
Fix a long line. Whups.
6ae46cb
Accept an executor.
852438e
Fix a minor flake8 complaint.
13c2205
No longer need to use inf for exception timeout.
6a03f48
Fixes discussed in chat with @jonparrott.
f1dde8f
Pub/Sub Docs (#3849)
lukesneeringer File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| # Copyright 2017, Google Inc. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import absolute_import | ||
|
|
||
| from google.cloud.pubsub_v1.subscriber.client import Client | ||
|
|
||
|
|
||
| __all__ = ( | ||
| 'Client', | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| # Copyright 2017, Google Inc. All rights reserved. | ||
| # | ||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||
| # you may not use this file except in compliance with the License. | ||
| # You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| from __future__ import absolute_import | ||
|
|
||
| import pkg_resources | ||
|
|
||
| from google.cloud.gapic.pubsub.v1 import subscriber_client | ||
|
|
||
| from google.cloud.pubsub_v1 import _gapic | ||
| from google.cloud.pubsub_v1.subscriber.policy import thread | ||
|
|
||
|
|
||
| __VERSION__ = pkg_resources.get_distribution('google-cloud-pubsub').version | ||
|
|
||
|
|
||
| @_gapic.add_methods(subscriber_client.SubscriberClient, | ||
| blacklist=('pull', 'streaming_pull')) | ||
| class Client(object): | ||
| """A subscriber client for Google Cloud Pub/Sub. | ||
| This creates an object that is capable of subscribing to messages. | ||
| Generally, you can instantiate this client with no arguments, and you | ||
| get sensible defaults. | ||
| Args: | ||
| flow_control (~.pubsub_v1.types.FlowControl): The flow control | ||
| settings. Use this to prevent situations where you are | ||
| inundated with too many messages at once. | ||
| policy_class (class): A class that describes how to handle | ||
This comment was marked as spam.
Sorry, something went wrong. |
||
| subscriptions. You may subclass the | ||
| :class:`.pubsub_v1.subscriber.policy.base.BasePolicy` | ||
| class in order to define your own consumer. This is primarily | ||
| provided to allow use of different concurrency models; the default | ||
| is based on :class:`threading.Thread`. | ||
| kwargs (dict): Any additional arguments provided are sent as keyword | ||
| keyword arguments to the underlying | ||
| :class:`~.gapic.pubsub.v1.subscriber_client.SubscriberClient`. | ||
| Generally, you should not need to set additional keyword | ||
| arguments. | ||
| """ | ||
| def __init__(self, flow_control=(), policy_class=thread.Policy, | ||
| **kwargs): | ||
| # Add the metrics headers, and instantiate the underlying GAPIC | ||
| # client. | ||
| kwargs['lib_name'] = 'gccl' | ||
| kwargs['lib_version'] = __VERSION__ | ||
| self.api = subscriber_client.SubscriberClient(**kwargs) | ||
|
|
||
| # The subcription class is responsible to retrieving and dispatching | ||
| # messages. | ||
| self._policy_class = policy_class | ||
|
|
||
| def subscribe(self, subscription, callback=None): | ||
| """Return a representation of an individual subscription. | ||
| This method creates and returns a ``Consumer`` object (that is, a | ||
| :class:`~.pubsub_v1.subscriber.consumer.base.BaseConsumer`) | ||
| subclass) bound to the topic. It does `not` create the subcription | ||
| on the backend (or do any API call at all); it simply returns an | ||
| object capable of doing these things. | ||
| If the ``callback`` argument is provided, then the :meth:`open` method | ||
| is automatically called on the returned object. If ``callback`` is | ||
| not provided, the subscription is returned unopened. | ||
| .. note:: | ||
| It only makes sense to provide ``callback`` here if you have | ||
| already created the subscription manually in the API. | ||
| Args: | ||
| subscription (str): The name of the subscription. The | ||
| subscription should have already been created (for example, | ||
| by using :meth:`create_subscription`). | ||
| callback (function): The callback function. This function receives | ||
| the :class:`~.pubsub_v1.types.PubsubMessage` as its only | ||
| argument. | ||
| flow_control (~.pubsub_v1.types.FlowControl): The flow control | ||
| settings. Use this to prevent situations where you are | ||
| inundated with too many messages at once. | ||
| Returns: | ||
| ~.pubsub_v1.subscriber.consumer.base.BaseConsumer: An instance | ||
| of the defined ``consumer_class`` on the client. | ||
| """ | ||
| subscr = self._policy_class(self, subscription) | ||
|
||
| if callable(callback): | ||
| subscr.open(callback) | ||
| return subscr | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.
This comment was marked as spam.
Sorry, something went wrong.
Uh oh!
There was an error while loading. Please reload this page.