Skip to content

Commit

Permalink
Merge pull request #25916 Use built-in csv and json readers.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Mar 24, 2023
2 parents 5be2697 + c637363 commit d80f1ff
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
16 changes: 0 additions & 16 deletions sdks/python/apache_beam/yaml/yaml_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,22 +314,6 @@ def expand(self, pcolls):
for key in dir(apache_beam.io)
if key.startswith('ReadFrom') or key.startswith('WriteTo')
}
ios['ReadFromCsv'] = lambda **kwargs: apache_beam.dataframe.io.ReadViaPandas(
'csv', **kwargs)
ios['WriteToCsv'] = lambda **kwargs: apache_beam.dataframe.io.WriteViaPandas(
'csv', **kwargs)
ios['ReadFromJson'] = (
lambda *,
orient='records',
lines=True,
**kwargs: apache_beam.dataframe.io.ReadViaPandas(
'json', orient=orient, lines=lines, **kwargs))
ios['WriteToJson'] = (
lambda *,
orient='records',
lines=True,
**kwargs: apache_beam.dataframe.io.WriteViaPandas(
'json', orient=orient, lines=lines, **kwargs))

return InlineProvider(
dict({
Expand Down
43 changes: 43 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
# limitations under the License.
#

import glob
import logging
import os
import tempfile
import unittest

import apache_beam as beam
Expand Down Expand Up @@ -102,6 +105,46 @@ def test_chain_with_root(self):
''')
assert_that(result, equal_to([41, 43, 47, 53, 61, 71, 83, 97, 113, 131]))

def test_csv_to_json(self):
try:
import pandas as pd
except ImportError:
raise unittest.SkipTest('Pandas not available.')

with tempfile.TemporaryDirectory() as tmpdir:
data = pd.DataFrame([
{
'label': '11a', 'rank': 0
},
{
'label': '37a', 'rank': 1
},
{
'label': '389a', 'rank': 2
},
])
input = os.path.join(tmpdir, 'input.csv')
output = os.path.join(tmpdir, 'output.json')
data.to_csv(input, index=False)

with beam.Pipeline() as p:
result = p | YamlTransform(
'''
type: chain
transforms:
- type: ReadFromCsv
path: %s
- type: WriteToJson
path: %s
num_shards: 1
''' % (repr(input), repr(output)))

output_shard = list(glob.glob(output + "*"))[0]
result = pd.read_json(
output_shard, orient='records',
lines=True).sort_values('rank').reindex()
pd.testing.assert_frame_equal(data, result)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down

0 comments on commit d80f1ff

Please sign in to comment.