Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
0b55ff5
wip
lukesneeringer Jun 3, 2017
210ef3b
Wrote some docs; not much else.
lukesneeringer Jun 3, 2017
7cd6156
subscriber wip
lukesneeringer Jun 4, 2017
fc4ead3
WIP
Jun 5, 2017
890de3a
wip
Jun 5, 2017
12ace0e
wip
Jun 7, 2017
7305000
wip
lukesneeringer Jun 14, 2017
e5a27ae
Fix a couple minor lint issues.
Jun 27, 2017
d50a22e
Adapting a subscriber that will work.
Jun 27, 2017
14f200a
WIP
Jun 27, 2017
6a7e846
Implement lease management.
Jun 28, 2017
3bb130b
WIP
Jun 29, 2017
f97dc23
WIP
Jun 29, 2017
303436c
WIP
Jul 5, 2017
933d2f3
WIP
Jul 13, 2017
1df0ccf
WIP
Jul 13, 2017
acb4534
WIP
Jul 13, 2017
2fb2785
Update subscriber client config to be sane.
Jul 13, 2017
ef178e9
Start adding unit tests.
Jul 18, 2017
147ad18
Beginning work on unit tests.
Jul 19, 2017
c96367a
Merge branch 'pubsub-publisher' into pubsub-subscriber
Jul 20, 2017
9c701e3
Publisher tests complete.
Jul 25, 2017
de38b83
subscriber/client.py tests
Jul 26, 2017
faeaa8e
Consumer tests
Jul 26, 2017
d467719
Fix minor linting error.
Jul 26, 2017
c821d33
Histogram tests
Jul 26, 2017
ed750b2
Minor fix based on Max feedback.
Jul 26, 2017
216310c
starting on helper thread tests
Jul 26, 2017
a1fd287
Add tests for helper_threads.
Jul 27, 2017
32701e1
Almost done with unit tests.
Jul 27, 2017
34272ad
Full coverage.
lukesneeringer Jul 27, 2017
e1c7c84
Do not send policy across the concurrency boundary.
Jul 31, 2017
2b21f48
Shift flow control to the policy class.
Jul 31, 2017
7f4b91c
Move the request queue to using keyword arguments.
Jul 31, 2017
3852805
Can has flow control.
Aug 1, 2017
b697be2
Merge branch 'public-master' into pubsub-subscriber
Aug 1, 2017
81b37f4
Subscription fixes.
Aug 3, 2017
5784d4d
Change batch time, add gRPC time logging.
Aug 3, 2017
97d8431
Unit test fix.
Aug 3, 2017
cb7dc05
Minor RST fixes (thanks @jonparrott).
Aug 4, 2017
6994465
Remove the ignore in .flake8.
Aug 4, 2017
eae7e14
Set gRPC limit to 20MB + 1
Aug 9, 2017
6afcd2a
Suppress not-working grpc options.
lukesneeringer Aug 15, 2017
e8c0a78
Merge branch 'public-master' into pubsub-subscriber
Aug 18, 2017
41cfc08
Merge branch 'public-master' into pubsub-subscriber
Aug 21, 2017
dd096e1
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
b76d363
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
760bef6
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
f196b5e
Fix some tests to match new futures.
Aug 21, 2017
5dbfd0a
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
ee144aa
Move the future tests to match the code.
Aug 21, 2017
40ea1e6
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 21, 2017
8cb8f98
Fix a publish failure test.
Aug 21, 2017
47678c3
Fix final test.
Aug 21, 2017
90ef40f
Sane max_workers default for 2.7 and 3.4
Aug 21, 2017
34c8273
Mock credentials appropriately.
Aug 21, 2017
831fe75
Remove fail_under from .coveragerc.
Aug 22, 2017
02fa81f
Make histogram and helper_threads private.
Aug 22, 2017
2458b55
Add a publishing system test.
Aug 22, 2017
aa56a9b
Merge branch 'pubsub-subscriber' of github.com:GoogleCloudPlatform/go…
Aug 22, 2017
17b6544
Subscription system test.
Aug 22, 2017
f469381
Merge branch 'pubsub-publisher' into pubsub-subscriber
Aug 22, 2017
a24c0a7
Update tests.
Aug 22, 2017
a054324
Make the wait test work on 2.7
Aug 22, 2017
3735da5
Discarding unused mocks.
Aug 22, 2017
ac9f182
Make _consumer a private module.
Aug 22, 2017
f272eca
Switch from recursion to while for maintain_leases.
Aug 22, 2017
e6bcbe7
Add exception logging in the callback.
Aug 22, 2017
b9115ca
Fix a long line. Whups.
Aug 23, 2017
6ae46cb
Accept an executor.
Aug 23, 2017
852438e
Fix a minor flake8 complaint.
Aug 23, 2017
13c2205
No longer need to use inf for exception timeout.
Aug 23, 2017
6a03f48
Fixes discussed in chat with @jonparrott.
Aug 23, 2017
f1dde8f
Pub/Sub Docs (#3849)
lukesneeringer Aug 23, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
datastore/usage
dns/usage
language/usage
pubsub/usage
pubsub/index
resource-manager/api
runtimeconfig/usage
spanner/usage
Expand Down
6 changes: 0 additions & 6 deletions docs/pubsub/client.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/pubsub/iam.rst

This file was deleted.

117 changes: 117 additions & 0 deletions docs/pubsub/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#######
Pub/Sub
#######

`Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that
allows you to send and receive messages between independent applications. You
can leverage Cloud Pub/Sub’s flexibility to decouple systems and components
hosted on Google Cloud Platform or elsewhere on the Internet. By building on
the same technology Google uses, Cloud Pub/Sub is designed to provide “at
least once” delivery at low latency with on-demand scalability to 1 million
messages per second (and beyond).

.. _Google Cloud Pub/Sub: https://cloud.google.com/pubsub/

********************************
Authentication and Configuration
********************************

- For an overview of authentication in ``google-cloud-python``,
see :doc:`/core/auth`.

- In addition to any authentication configuration, you should also set the
:envvar:`GOOGLE_CLOUD_PROJECT` environment variable for the project you'd
like to interact with. If the :envvar:`GOOGLE_CLOUD_PROJECT` environment
variable is not present, the project ID from JSON file credentials is used.

If you are using Google App Engine or Google Compute Engine
this will be detected automatically.

- After configuring your environment, create a
:class:`~google.cloud.pubsub_v1.PublisherClient` or
:class:`~google.cloud.pubsub_v1.SubscriberClient`.

.. code-block:: python

>>> from google.cloud import pubsub
>>> publisher = pubsub.PublisherClient()
>>> subscriber = pubsub.SubscriberClient()

or pass in ``credentials`` explicitly.

.. code-block:: python

>>> from google.cloud import pubsub
>>> client = pubsub.PublisherClient(
... credentials=creds,
... )

**********
Publishing
**********

To publish data to Cloud Pub/Sub you must create a topic, and then publish
messages to it

.. code-block:: python

>>> import os
>>> from google.cloud import pubsub
>>>
>>> publisher = pubsub.PublisherClient()
>>> topic = 'projects/{project_id}/topics/{topic}'.format(
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
... topic='MY_TOPIC_NAME', # Set this to something appropriate.
... )
>>> publisher.create_topic()
>>> publisher.publish(topic, b'My first message!', spam='eggs')

To learn more, consult the :doc:`publishing documentation <publisher/index>`.


***********
Subscribing
***********

To subscribe to data in Cloud Pub/Sub, you create a subscription based on
the topic, and subscribe to that.

.. code-block:: python

>>> import os
>>> from google.cloud import pubsub
>>>
>>> subscriber = pubsub.SubscriberClient()
>>> topic = 'projects/{project_id}/topics/{topic}'.format(
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
... topic='MY_TOPIC_NAME', # Set this to something appropriate.
... )
>>> subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
... sub='MY_SUBSCRIPTION_NAME', # Set this to something appropriate.
... )
>>> subscription = subscriber.create_subscription(topic, subscription)

The subscription is opened asychronously, and messages are processed by
use of a callback.

.. code-block:: python

>>> def callback(message):
... print(message.data)
... message.ack()
>>> subscription.open(callback)

To learn more, consult the :doc:`subscriber documentation <subscriber/index>`.


**********
Learn More
**********

.. toctree::
:maxdepth: 3

publisher/index
subscriber/index
types
6 changes: 0 additions & 6 deletions docs/pubsub/message.rst

This file was deleted.

8 changes: 8 additions & 0 deletions docs/pubsub/publisher/api/batch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
:orphan:

Batch API
=========

.. automodule:: google.cloud.pubsub_v1.publisher.batch.thread
:members:
:inherited-members:
6 changes: 6 additions & 0 deletions docs/pubsub/publisher/api/client.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Publisher Client API
====================

.. automodule:: google.cloud.pubsub_v1.publisher.client
:members:
:inherited-members:
126 changes: 126 additions & 0 deletions docs/pubsub/publisher/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
Publishing Messages
===================

Publishing messages is handled through the
:class:`~.pubsub_v1.publisher.client.Client` class (aliased as
``google.cloud.pubsub.PublisherClient``). This class provides methods to
create topics, and (most importantly) a
:meth:`~.pubsub_v1.publisher.client.Client.publish` method that publishes
messages to Pub/Sub.

Instantiating a publishing client is straightforward:

.. code-block:: python

from google.cloud import pubsub
publish_client = pubsub.PublisherClient()


Publish a Message
-----------------

To publish a message, use the
:meth:`~.pubsub_v1.publisher.client.Client.publish` method. This method accepts
two positional arguments: the topic to publish to, and the body of the message.
It also accepts arbitrary keyword arguments, which are passed along as
attributes of the message.

The topic is passed along as a string; all topics have the canonical form of
``projects/{project_name}/topics/{topic_name}``.

Therefore, a very basic publishing call looks like:

.. code-block:: python

topic = 'projects/{project}/topics/{topic}'
publish_client.publish(topic, b'This is my message.')

.. note::

The message data in Pub/Sub is an opaque blob of bytes, and as such, you
*must* send a ``bytes`` object in Python 3 (``str`` object in Python 2).
If you send a text string (``str`` in Python 3, ``unicode`` in Python 2),
the method will raise :exc:`TypeError`.

The reason it works this way is because there is no reasonable guarantee
that the same language or environment is being used by the subscriber,
and so it is the responsibility of the publisher to properly encode
the payload.

If you want to include attributes, simply add keyword arguments:

.. code-block:: python

topic = 'projects/{project}/topics/{topic}'
publish_client.publish(topic, b'This is my message.', foo='bar')


Batching
--------

Whenever you publish a message, a
:class:`~.pubsub_v1.publisher.batch.thread.Batch` is automatically created.
This way, if you publish a large volume of messages, it reduces the number of
requests made to the server.

The way that this works is that on the first message that you send, a new
:class:`~.pubsub_v1.publisher.batch.thread.Batch` is created automatically.
For every subsequent message, if there is already a valid batch that is still
accepting messages, then that batch is used. When the batch is created, it
begins a countdown that publishes the batch once sufficient time has
elapsed (by default, this is 0.05 seconds).

If you need different batching settings, simply provide a
:class:`~.pubsub_v1.types.BatchSettings` object when you instantiate the
:class:`~.pubsub_v1.publisher.client.Client`:

.. code-block:: python

from google.cloud import pubsub
from google.cloud.pubsub import types

client = pubsub.PublisherClient(
batch_settings=BatchSettings(max_messages=500),
)

Pub/Sub accepts a maximum of 1,000 messages in a batch, and the size of a
batch can not exceed 10 megabytes.


Futures
-------

Every call to :meth:`~.pubsub_v1.publisher.client.Client.publish` will return
a class that conforms to the :class:`~concurrent.futures.Future` interface.
You can use this to ensure that the publish succeeded:

.. code-block:: python

# The .result() method will block until the future is complete.
# If there is an error, it will raise an exception.
future = client.publish(topic, b'My awesome message.')
message_id = future.result()

You can also attach a callback to the future:

.. code-block:: python

# Callbacks receive the future as their only argument, as defined in
# the Future interface.
def callback(future):
message_id = future.result()
do_something_with(message_id)

# The callback is added once you get the future. If you add a callback
# and the future is already done, it will simply be executed immediately.
future = client.publish(topic, b'My awesome message.')
future.add_done_callback(callback)


API Reference
-------------

.. toctree::
:maxdepth: 2

api/client
Loading