Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13016] Remove avro-python3 dependency from Beam #15900

Merged
merged 12 commits into from
Nov 11, 2021
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
* Upgraded the GCP Libraries BOM version to 24.0.0 and associated dependencies ([BEAM-11205](
https://issues.apache.org/jira/browse/BEAM-11205)). For Google Cloud client library versions set by this BOM,
see [this table](https://storage.googleapis.com/cloud-opensource-java-dashboard/com.google.cloud/libraries-bom/24.0.0/artifact_details.html).
* Removed avro-python3 dependency in AvroIO. Fastavro has already been our Avro library of choice on Python 3. Boolean use_fastavro is left for api compatibility, but will have no effect.([BEAM-13016](https://github.com/apache/beam/pull/15900)).

## Breaking Changes

Expand Down
36 changes: 19 additions & 17 deletions sdks/python/apache_beam/examples/avro_bitcoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import argparse
import logging

from avro.schema import Parse
from fastavro.schema import parse_schema

import apache_beam as beam
from apache_beam.io.avroio import ReadFromAvro
Expand Down Expand Up @@ -84,23 +84,26 @@ def process(self, elem):
}]


SCHEMA = Parse(
'''
{
SCHEMA = parse_schema({
"namespace": "example.avro",
"type": "record",
"name": "Transaction",
"fields": [
{"name": "transaction_id", "type": "string"},
{"name": "timestamp", "type": "long"},
{"name": "block_id", "type": "string"},
{"name": "previous_block", "type": "string"},
{"name": "num_inputs", "type": "int"},
{"name": "num_outputs", "type": "int"},
{"name": "sum_output", "type": "long"}
]
}
''')
"fields": [{
"name": "transaction_id", "type": "string"
}, {
"name": "timestamp", "type": "long"
}, {
"name": "block_id", "type": "string"
}, {
"name": "previous_block", "type": "string"
}, {
"name": "num_inputs", "type": "int"
}, {
"name": "num_outputs", "type": "int"
}, {
"name": "sum_output", "type": "long"
}]
})


def run(argv=None):
Expand Down Expand Up @@ -140,7 +143,7 @@ def run(argv=None):

# Read the avro file[pattern] into a PCollection.
records = \
p | 'read' >> ReadFromAvro(opts.input, use_fastavro=opts.use_fastavro)
p | 'read' >> ReadFromAvro(opts.input)

measured = records | 'scan' >> beam.ParDo(BitcoinTxnCountDoFn())

Expand All @@ -150,7 +153,6 @@ def run(argv=None):
opts.output,
schema=SCHEMA,
codec=('deflate' if opts.compress else 'null'),
use_fastavro=opts.use_fastavro
)

result = p.run()
Expand Down
97 changes: 45 additions & 52 deletions sdks/python/apache_beam/examples/fastavro_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

"""End-to-end test for Avro IO's fastavro support.

Writes a configurable number of records to a temporary location with each of
{avro,fastavro}, then reads them back in, joins the two read datasets, and
verifies they have the same elements.
Writes a configurable number of records to a temporary location with fastavro,
then reads them back in from source, joins the generated records and records
that are read from the source, and verifies they have the same elements.



Usage:

Expand Down Expand Up @@ -51,7 +53,6 @@
import uuid

import pytest
from avro.schema import Parse
from fastavro import parse_schema

from apache_beam.io.avroio import ReadAllFromAvro
Expand All @@ -78,6 +79,18 @@ def record(i):
}


def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))


def check(element):
assert element['color'] in COLORS
assert element['label'] in LABELS
assertEqual(
sorted(element.keys()), ['color', 'label', 'number', 'number_str'])


class FastavroIT(unittest.TestCase):

SCHEMA_STRING = '''
Expand All @@ -102,6 +115,7 @@ def setUp(self):
def test_avro_it(self):
num_records = self.test_pipeline.get_option('records')
num_records = int(num_records) if num_records else 1000000
fastavro_output = '/'.join([self.output, 'fastavro'])

# Seed a `PCollection` with indices that will each be FlatMap'd into
# `batch_size` records, to avoid having a too-large list in memory at
Expand All @@ -123,65 +137,44 @@ def batch_indices(start):
| 'expand-batches' >> FlatMap(batch_indices) \
| 'create-records' >> Map(record)

fastavro_output = '/'.join([self.output, 'fastavro'])
avro_output = '/'.join([self.output, 'avro'])

# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_fastavro' >> WriteToAvro(
fastavro_output,
parse_schema(json.loads(self.SCHEMA_STRING)),
use_fastavro=True
)
result = self.test_pipeline.run()
result.wait_until_finish()
fastavro_pcoll = self.test_pipeline \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future changes, prefer not to use \ for concatenating strings, since it's error prone. Using brackets is preferable.
From PEP8: (https://www.python.org/dev/peps/pep-0008/)

The preferred way of wrapping long lines is by using Python's implied line continuation inside parentheses, brackets and braces. Long lines can be broken over multiple lines by wrapping expressions in parentheses. These should be used in preference to using a backslash for line continuation.

(this was a preexisting issue in this file).

| 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
| 'read-fastavro' >> ReadAllFromAvro()

# pylint: disable=expression-not-assigned
records_pcoll \
| 'write_avro' >> WriteToAvro(
avro_output,
Parse(self.SCHEMA_STRING),
use_fastavro=False
)
mapped_fastavro_pcoll = fastavro_pcoll | "map_fastavro" >> Map(
lambda x: (x['number'], x))
mapped_record_pcoll = records_pcoll | "map_record" >> Map(
lambda x: (x['number'], x))

def validate_record(elem):
v = elem[1]

def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))

assertEqual(sorted(v.keys()), ['fastavro', 'record_pcoll'])
record_pcoll_values = v['record_pcoll']
fastavro_values = v['fastavro']
assertEqual(record_pcoll_values, fastavro_values)
assertEqual(len(record_pcoll_values), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What guarantees that we have unique keys for each record (so that after GBK we only have 1 value per key)?

Copy link
Contributor Author

@AnandInguva AnandInguva Nov 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will have unique keys for each record as the keys are integers in ascending order. This is an example of the output after CoGroupByKey. Here the number of records are 20 and keys are the integer(number attribute of the pcollection)

(0, {'record_pcoll': [{'label': 'abc', 'number': 0, 'number_str': '0', 'color': 'RED'}], 'fastavro': [{'label': 'abc', 'number': 0, 'number_str': '0', 'color': 'RED'}]}) (1, {'record_pcoll': [{'label': 'def', 'number': 1, 'number_str': '1', 'color': 'ORANGE'}], 'fastavro': [{'label': 'def', 'number': 1, 'number_str': '1', 'color': 'ORANGE'}]}) (2, {'record_pcoll': [{'label': 'ghi', 'number': 2, 'number_str': '2', 'color': 'YELLOW'}], 'fastavro': [{'label': 'ghi', 'number': 2, 'number_str': '2', 'color': 'YELLOW'}]}) (3, {'record_pcoll': [{'label': 'jkl', 'number': 3, 'number_str': '3', 'color': 'GREEN'}], 'fastavro': [{'label': 'jkl', 'number': 3, 'number_str': '3', 'color': 'GREEN'}]}) (4, {'record_pcoll': [{'label': 'mno', 'number': 4, 'number_str': '4', 'color': 'BLUE'}], 'fastavro': [{'label': 'mno', 'number': 4, 'number_str': '4', 'color': 'BLUE'}]}) (5, {'record_pcoll': [{'label': 'pqr', 'number': 5, 'number_str': '5', 'color': 'PURPLE'}], 'fastavro': [{'label': 'pqr', 'number': 5, 'number_str': '5', 'color': 'PURPLE'}]}) (6, {'record_pcoll': [{'label': 'stu', 'number': 6, 'number_str': '6', 'color': None}], 'fastavro': [{'label': 'stu', 'number': 6, 'number_str': '6', 'color': None}]}) (7, {'record_pcoll': [{'label': 'vwx', 'number': 7, 'number_str': '7', 'color': 'RED'}], 'fastavro': [{'label': 'vwx', 'number': 7, 'number_str': '7', 'color': 'RED'}]}) (8, {'record_pcoll': [{'label': 'abc', 'number': 8, 'number_str': '8', 'color': 'ORANGE'}], 'fastavro': [{'label': 'abc', 'number': 8, 'number_str': '8', 'color': 'ORANGE'}]}) (9, {'record_pcoll': [{'label': 'def', 'number': 9, 'number_str': '9', 'color': 'YELLOW'}], 'fastavro': [{'label': 'def', 'number': 9, 'number_str': '9', 'color': 'YELLOW'}]}) (10, {'record_pcoll': [{'label': 'ghi', 'number': 10, 'number_str': '10', 'color': 'GREEN'}], 'fastavro': [{'label': 'ghi', 'number': 10, 'number_str': '10', 'color': 'GREEN'}]}) (11, {'record_pcoll': [{'label': 'jkl', 'number': 11, 'number_str': '11', 'color': 'BLUE'}], 'fastavro': [{'label': 'jkl', 'number': 11, 'number_str': '11', 'color': 'BLUE'}]}) (12, {'record_pcoll': [{'label': 'mno', 'number': 12, 'number_str': '12', 'color': 'PURPLE'}], 'fastavro': [{'label': 'mno', 'number': 12, 'number_str': '12', 'color': 'PURPLE'}]}) (13, {'record_pcoll': [{'label': 'pqr', 'number': 13, 'number_str': '13', 'color': None}], 'fastavro': [{'label': 'pqr', 'number': 13, 'number_str': '13', 'color': None}]}) (14, {'record_pcoll': [{'label': 'stu', 'number': 14, 'number_str': '14', 'color': 'RED'}], 'fastavro': [{'label': 'stu', 'number': 14, 'number_str': '14', 'color': 'RED'}]}) (15, {'record_pcoll': [{'label': 'vwx', 'number': 15, 'number_str': '15', 'color': 'ORANGE'}], 'fastavro': [{'label': 'vwx', 'number': 15, 'number_str': '15', 'color': 'ORANGE'}]}) (16, {'record_pcoll': [{'label': 'abc', 'number': 16, 'number_str': '16', 'color': 'YELLOW'}], 'fastavro': [{'label': 'abc', 'number': 16, 'number_str': '16', 'color': 'YELLOW'}]}) (17, {'record_pcoll': [{'label': 'def', 'number': 17, 'number_str': '17', 'color': 'GREEN'}], 'fastavro': [{'label': 'def', 'number': 17, 'number_str': '17', 'color': 'GREEN'}]}) (18, {'record_pcoll': [{'label': 'ghi', 'number': 18, 'number_str': '18', 'color': 'BLUE'}], 'fastavro': [{'label': 'ghi', 'number': 18, 'number_str': '18', 'color': 'BLUE'}]}) (19, {'record_pcoll': [{'label': 'jkl', 'number': 19, 'number_str': '19', 'color': 'PURPLE'}], 'fastavro': [{'label': 'jkl', 'number': 19, 'number_str': '19', 'color': 'PURPLE'}]})


{
"record_pcoll": mapped_record_pcoll, "fastavro": mapped_fastavro_pcoll
} | CoGroupByKey() | Map(validate_record)

result = self.test_pipeline.run()
result.wait_until_finish()
assert result.state == PipelineState.DONE

with TestPipeline(is_integration_test=True) as fastavro_read_pipeline:

fastavro_records = \
fastavro_read_pipeline \
| 'create-fastavro' >> Create(['%s*' % fastavro_output]) \
| 'read-fastavro' >> ReadAllFromAvro(use_fastavro=True) \
| Map(lambda rec: (rec['number'], rec))

avro_records = \
fastavro_read_pipeline \
| 'create-avro' >> Create(['%s*' % avro_output]) \
| 'read-avro' >> ReadAllFromAvro(use_fastavro=False) \
| Map(lambda rec: (rec['number'], rec))

def check(elem):
v = elem[1]

def assertEqual(l, r):
if l != r:
raise BeamAssertException('Assertion failed: %s == %s' % (l, r))

assertEqual(sorted(v.keys()), ['avro', 'fastavro'])
avro_values = v['avro']
fastavro_values = v['fastavro']
assertEqual(avro_values, fastavro_values)
assertEqual(len(avro_values), 1)

# pylint: disable=expression-not-assigned
{
'avro': avro_records,
'fastavro': fastavro_records
} \
| CoGroupByKey() \
| Map(check)

self.addCleanup(delete_files, [self.output])
self.addCleanup(delete_files, [self.output])
assert result.state == PipelineState.DONE


Expand Down
Loading