Skip to content

Commit

Permalink
docs(samples): Add Dataflow to Pub/Sub snippet (#11104)
Browse files Browse the repository at this point in the history
* docs(samples): Add Dataflow to Pub/Sub snippet

Adds a snippet that shows how to write messages to Pub/Sub from
Dataflow.

* Fix region tag

* Skip Python 3.12 test

see apache/beam#29149

* Fix copyright date

* Improve IT by reading messages from Pub/Sub

* Incorporate review feedback

* Fix testfor 3.8

* Fix linter error
  • Loading branch information
VeronicaWasson authored Jan 14, 2024
1 parent da40d4e commit 93429e0
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 4 deletions.
9 changes: 7 additions & 2 deletions dataflow/snippets/batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@
# limitations under the License.

# [START dataflow_batch_write_to_storage]
import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv=None):
def write_to_cloud_storage(argv : List[str] = None) -> None:
# Parse the pipeline options passed into the application.
class MyOptions(PipelineOptions):
@classmethod
# Define a custom pipeline option that specfies the Cloud Storage bucket.
def _add_argparse_args(cls, parser):
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--output", required=True)

wordsList = ["1", "2", "3", "4"]
Expand Down
42 changes: 42 additions & 0 deletions dataflow/snippets/noxfile_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Default TEST_CONFIG_OVERRIDE for python repos.

# You can copy this file into your directory, then it will be imported from
# the noxfile.py.

# The source of truth:
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/noxfile_config.py

TEST_CONFIG_OVERRIDE = {
# You can opt out from the test for specific Python versions.
"ignored_versions": ["2.7", "3.7", "3.9", "3.10", "3.12"],
# Old samples are opted out of enforcing Python type hints
# All new samples should feature them
"enforce_type_hints": True,
# An envvar key for determining the project id to use. Change it
# to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a
# build specific Cloud project. You can also use your own string
# to use your own Cloud project.
"gcloud_project_env": "GOOGLE_CLOUD_PROJECT",
# 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT',
# If you need to use a specific version of pip,
# change pip_version_override to the string representation
# of the version number, for example, "20.2.4"
"pip_version_override": None,
# A dictionary you want to inject into your test. Don't put any
# secrets here. These values will override predefined values.
"envs": {},
}
4 changes: 2 additions & 2 deletions dataflow/snippets/tests/test_batch_write_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@


@pytest.fixture(scope="function")
def setup_and_teardown():
def setup_and_teardown() -> None:
try:
bucket = storage_client.create_bucket(bucket_name)
yield
finally:
bucket.delete(force=True)


def test_write_to_cloud_storage(setup_and_teardown):
def test_write_to_cloud_storage(setup_and_teardown: None) -> None:
sys.argv = ['', f'--output=gs://{bucket_name}/output/out-']
write_to_cloud_storage()

Expand Down
96 changes: 96 additions & 0 deletions dataflow/snippets/tests/test_write_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# !/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time
from unittest.mock import patch
import uuid

from google.cloud import pubsub_v1

import pytest

from ..write_pubsub import write_to_pubsub


topic_id = f'test-topic-{uuid.uuid4()}'
subscription_id = f'{topic_id}-sub'
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

NUM_MESSAGES = 4
TIMEOUT = 60 * 5


@pytest.fixture(scope="function")
def setup_and_teardown() -> None:
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

try:
publisher.create_topic(request={"name": topic_path})
subscriber.create_subscription(
request={"name": subscription_path, "topic": topic_path}
)
yield
finally:
subscriber.delete_subscription(
request={"subscription": subscription_path})
publisher.delete_topic(request={"topic": topic_path})


def read_messages() -> None:
received_messages = []
ack_ids = []

# Read messages from Pub/Sub. It might be necessary to read multiple
# batches, Use a timeout value to avoid potentially looping forever.
start_time = time.time()
while time.time() - start_time <= TIMEOUT:
# Pull messages from Pub/Sub.
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
)
received_messages.append(response.received_messages)

for received_message in response.received_messages:
ack_ids.append(received_message.ack_id)

# Acknowledge the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)

if (len(received_messages) >= NUM_MESSAGES):
break

time.sleep(5)

return received_messages


def test_write_to_pubsub(setup_and_teardown: None) -> None:
with patch("sys.argv", [
"", '--streaming', f'--project={project_id}', f'--topic={topic_id}'
]):
write_to_pubsub()

# Read from Pub/Sub to verify the pipeline successfully wrote messages.
# Duplicate reads are possible.
messages = read_messages()
assert (len(messages) >= NUM_MESSAGES)
75 changes: 75 additions & 0 deletions dataflow/snippets/write_pubsub.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# [START dataflow_pubsub_write_with_attributes]i
import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
attributes = {
'buyer': item['name'],
'timestamp': str(item['ts'])
}
data = bytes(item['product'], 'utf-8')

return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:

# Parse the pipeline options passed into the application. Example:
# --project=$PROJECT_ID --topic=$TOPIC_NAME --streaming
# For more information, see
# https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
class MyOptions(PipelineOptions):
@classmethod
# Define custom pipeline options that specify the project ID and Pub/Sub
# topic.
def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
parser.add_argument("--project", required=True)
parser.add_argument("--topic", required=True)

example_data = [
{'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
{'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
{'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
{'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
]
options = MyOptions()

with beam.Pipeline(options=options) as pipeline:
(
pipeline
| "Create elements" >> beam.Create(example_data)
| "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
| WriteToPubSub(
topic=f'projects/{options.project}/topics/{options.topic}',
with_attributes=True)
)

print('Pipeline ran successfully.')
# [END dataflow_pubsub_write_with_attributes]


if __name__ == "__main__":
write_to_pubsub()

0 comments on commit 93429e0

Please sign in to comment.