Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
f784121
[WIP] Getting started on Pub/Sub.
May 25, 2017
c8a9fd6
stuff
May 25, 2017
9b09b8f
WIP
May 25, 2017
8f0832e
wip
lukesneeringer May 25, 2017
02d7658
WIP
May 30, 2017
3a3ebe9
Merge branch 'pubsub' of github.com:GoogleCloudPlatform/google-cloud-…
May 30, 2017
503d11f
WIP, fixing bugs.
May 30, 2017
3b21b93
WIP
May 30, 2017
1e879a1
wip
May 31, 2017
4bf0552
wip
May 31, 2017
c6dc098
wip
May 31, 2017
13821a7
wip
May 31, 2017
c2d7af8
wip
Jun 1, 2017
4c0bf84
Merge branch 'public-master' into pubsub
Jun 1, 2017
783e9dd
Merge branch 'pubsub' into pubsub-publisher
Jun 2, 2017
7dd719f
Clean up a couple small things.
Jun 2, 2017
c1042ac
A couple more small fixes.
Jun 2, 2017
ccaa865
WIP
Jun 2, 2017
f2ee4d4
Rework based on @jonparrott concurrency ideas.
Jun 2, 2017
12d5546
Refactor the batching implementation.
Jun 2, 2017
e99d959
Remove unrelated files.
Jun 2, 2017
4774f2a
wip
lukesneeringer Jun 3, 2017
1a53c37
Honor size and message count limits.
lukesneeringer Jun 15, 2017
9a6b7cb
Update publisher to be thread-based.
Jul 20, 2017
b0f03a8
Merge branch 'public-master' into pubsub-publisher
Aug 21, 2017
dfe52ef
Merge branch 'pubsub' into pubsub-publisher
Aug 21, 2017
f553fd6
Add better Batch docstring.
Aug 21, 2017
356749a
Improve the max latency thread comments.
Aug 21, 2017
8242c9d
Collapse property docstrings.
Aug 21, 2017
db87dab
More @jonparrott feedback.
Aug 21, 2017
58072b8
Remove the client as a public item in the base batch.
Aug 21, 2017
8f67488
Remove the rejection batch.
Aug 21, 2017
a86d9b7
Lock batch acquisition.
Aug 21, 2017
df18615
Alter exception superclass.
Aug 21, 2017
5f0549b
Inherit from google.api.core.future.Future.
Aug 21, 2017
101d9ca
Move to @jonparrott's Future interface.
Aug 21, 2017
e6e58bb
Move Future off into its own module.
Aug 21, 2017
9fd490c
Add is not None.
Aug 21, 2017
469400e
Feedback.
Aug 22, 2017
31c0c81
Use concurrent.futures.TimeoutError where possible.
Aug 22, 2017
04ac278
Add a lock on commit to ensure no duplication.
Aug 22, 2017
37ee908
Move to an Event.
Aug 22, 2017
12d44be
Make _names private.
Aug 22, 2017
6fe1309
Pubsub subscriber (#3637)
lukesneeringer Aug 24, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
datastore/usage
dns/usage
language/usage
pubsub/usage
pubsub/index
resource-manager/api
runtimeconfig/usage
spanner/usage
Expand Down
6 changes: 0 additions & 6 deletions docs/pubsub/client.rst

This file was deleted.

7 changes: 0 additions & 7 deletions docs/pubsub/iam.rst

This file was deleted.

117 changes: 117 additions & 0 deletions docs/pubsub/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#######
Pub/Sub
#######

`Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that
allows you to send and receive messages between independent applications. You
can leverage Cloud Pub/Sub’s flexibility to decouple systems and components
hosted on Google Cloud Platform or elsewhere on the Internet. By building on
the same technology Google uses, Cloud Pub/Sub is designed to provide “at
least once” delivery at low latency with on-demand scalability to 1 million
messages per second (and beyond).

.. _Google Cloud Pub/Sub: https://cloud.google.com/pubsub/

********************************
Authentication and Configuration
********************************

- For an overview of authentication in ``google-cloud-python``,
see :doc:`/core/auth`.

- In addition to any authentication configuration, you should also set the
:envvar:`GOOGLE_CLOUD_PROJECT` environment variable for the project you'd
like to interact with. If the :envvar:`GOOGLE_CLOUD_PROJECT` environment
variable is not present, the project ID from JSON file credentials is used.

If you are using Google App Engine or Google Compute Engine
this will be detected automatically.

- After configuring your environment, create a
:class:`~google.cloud.pubsub_v1.PublisherClient` or
:class:`~google.cloud.pubsub_v1.SubscriberClient`.

.. code-block:: python

>>> from google.cloud import pubsub
>>> publisher = pubsub.PublisherClient()
>>> subscriber = pubsub.SubscriberClient()

or pass in ``credentials`` explicitly.

.. code-block:: python

>>> from google.cloud import pubsub
>>> client = pubsub.PublisherClient(
... credentials=creds,
... )

**********
Publishing
**********

To publish data to Cloud Pub/Sub you must create a topic, and then publish
messages to it

.. code-block:: python

>>> import os
>>> from google.cloud import pubsub
>>>
>>> publisher = pubsub.PublisherClient()
>>> topic = 'projects/{project_id}/topics/{topic}'.format(
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
... topic='MY_TOPIC_NAME', # Set this to something appropriate.
... )
>>> publisher.create_topic()
>>> publisher.publish(topic, b'My first message!', spam='eggs')

To learn more, consult the :doc:`publishing documentation <publisher/index>`.


***********
Subscribing
***********

To subscribe to data in Cloud Pub/Sub, you create a subscription based on
the topic, and subscribe to that.

.. code-block:: python

>>> import os
>>> from google.cloud import pubsub
>>>
>>> subscriber = pubsub.SubscriberClient()
>>> topic = 'projects/{project_id}/topics/{topic}'.format(
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
... topic='MY_TOPIC_NAME', # Set this to something appropriate.
... )
>>> subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
... project_id=os.getenv('GOOGLE_CLOUD_PROJECT'),
... sub='MY_SUBSCRIPTION_NAME', # Set this to something appropriate.
... )
>>> subscription = subscriber.create_subscription(topic, subscription)

The subscription is opened asychronously, and messages are processed by
use of a callback.

.. code-block:: python

>>> def callback(message):
... print(message.data)
... message.ack()
>>> subscription.open(callback)

To learn more, consult the :doc:`subscriber documentation <subscriber/index>`.


**********
Learn More
**********

.. toctree::
:maxdepth: 3

publisher/index
subscriber/index
types
6 changes: 0 additions & 6 deletions docs/pubsub/message.rst

This file was deleted.

8 changes: 8 additions & 0 deletions docs/pubsub/publisher/api/batch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
:orphan:

Batch API
=========

.. automodule:: google.cloud.pubsub_v1.publisher.batch.thread
:members:
:inherited-members:
6 changes: 6 additions & 0 deletions docs/pubsub/publisher/api/client.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Publisher Client API
====================

.. automodule:: google.cloud.pubsub_v1.publisher.client
:members:
:inherited-members:
126 changes: 126 additions & 0 deletions docs/pubsub/publisher/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
Publishing Messages
===================

Publishing messages is handled through the
:class:`~.pubsub_v1.publisher.client.Client` class (aliased as
``google.cloud.pubsub.PublisherClient``). This class provides methods to
create topics, and (most importantly) a
:meth:`~.pubsub_v1.publisher.client.Client.publish` method that publishes
messages to Pub/Sub.

Instantiating a publishing client is straightforward:

.. code-block:: python

from google.cloud import pubsub
publish_client = pubsub.PublisherClient()


Publish a Message
-----------------

To publish a message, use the
:meth:`~.pubsub_v1.publisher.client.Client.publish` method. This method accepts
two positional arguments: the topic to publish to, and the body of the message.
It also accepts arbitrary keyword arguments, which are passed along as
attributes of the message.

The topic is passed along as a string; all topics have the canonical form of
``projects/{project_name}/topics/{topic_name}``.

Therefore, a very basic publishing call looks like:

.. code-block:: python

topic = 'projects/{project}/topics/{topic}'
publish_client.publish(topic, b'This is my message.')

.. note::

The message data in Pub/Sub is an opaque blob of bytes, and as such, you
*must* send a ``bytes`` object in Python 3 (``str`` object in Python 2).
If you send a text string (``str`` in Python 3, ``unicode`` in Python 2),
the method will raise :exc:`TypeError`.

The reason it works this way is because there is no reasonable guarantee
that the same language or environment is being used by the subscriber,
and so it is the responsibility of the publisher to properly encode
the payload.

If you want to include attributes, simply add keyword arguments:

.. code-block:: python

topic = 'projects/{project}/topics/{topic}'
publish_client.publish(topic, b'This is my message.', foo='bar')


Batching
--------

Whenever you publish a message, a
:class:`~.pubsub_v1.publisher.batch.thread.Batch` is automatically created.
This way, if you publish a large volume of messages, it reduces the number of
requests made to the server.

The way that this works is that on the first message that you send, a new
:class:`~.pubsub_v1.publisher.batch.thread.Batch` is created automatically.
For every subsequent message, if there is already a valid batch that is still
accepting messages, then that batch is used. When the batch is created, it
begins a countdown that publishes the batch once sufficient time has
elapsed (by default, this is 0.05 seconds).

If you need different batching settings, simply provide a
:class:`~.pubsub_v1.types.BatchSettings` object when you instantiate the
:class:`~.pubsub_v1.publisher.client.Client`:

.. code-block:: python

from google.cloud import pubsub
from google.cloud.pubsub import types

client = pubsub.PublisherClient(
batch_settings=BatchSettings(max_messages=500),
)

Pub/Sub accepts a maximum of 1,000 messages in a batch, and the size of a
batch can not exceed 10 megabytes.


Futures
-------

Every call to :meth:`~.pubsub_v1.publisher.client.Client.publish` will return
a class that conforms to the :class:`~concurrent.futures.Future` interface.
You can use this to ensure that the publish succeeded:

.. code-block:: python

# The .result() method will block until the future is complete.
# If there is an error, it will raise an exception.
future = client.publish(topic, b'My awesome message.')
message_id = future.result()

You can also attach a callback to the future:

.. code-block:: python

# Callbacks receive the future as their only argument, as defined in
# the Future interface.
def callback(future):
message_id = future.result()
do_something_with(message_id)

# The callback is added once you get the future. If you add a callback
# and the future is already done, it will simply be executed immediately.
future = client.publish(topic, b'My awesome message.')
future.add_done_callback(callback)


API Reference
-------------

.. toctree::
:maxdepth: 2

api/client
Loading