Skip to content

Commit

Permalink
make remove_fields work plus some helpful prints
Browse files Browse the repository at this point in the history
  • Loading branch information
floptical committed Feb 6, 2024
1 parent 478b221 commit b21c6b7
Showing 1 changed file with 23 additions and 6 deletions.
29 changes: 23 additions & 6 deletions databridge_etl_tools/ais_geocoder/ais_geocoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
import codecs

import click
from smart_open import open
from smart_open import open as smopen
import boto3
import botocore
from sys import getsizeof
from hurry.filesize import size

from .ais_request import ais_request

Expand Down Expand Up @@ -36,6 +39,7 @@ def __init__(self,
self.s3_output_key = s3_output_key
self.query_fields = query_fields.split(',')
self.ais_fields = ais_fields.split(',')
self.remove_fields = remove_fields.split(',') if remove_fields else None
self.csv_path = '/tmp/output.csv'


Expand All @@ -53,12 +57,13 @@ def ais_inner_geocode(self):
input_file = f's3://{self.s3_bucket}/{self.s3_input_key}'
output_file = f's3://{self.s3_bucket}/{self.s3_output_key}'

errors = 0

# Use smart_open imported over built-in open, which will opaquely use boto3 to stream our data out of and back into S3
# https://github.com/piskvorky/smart_open
with open(input_file, 'r', transport_params={'client': session.client('s3')}) as input_stream:
with open(output_file, 'w', transport_params={'client': session.client('s3')}) as output_stream:
with smopen(input_file, 'r', transport_params={'client': session.client('s3')}) as input_stream:
with smopen(output_file, 'w', transport_params={'client': session.client('s3')}) as output_stream:

#rows = csv.DictReader(codecs.iterdecode(input_stream, 'utf-8'))
rows = csv.DictReader(input_stream)

for row in rows:
Expand Down Expand Up @@ -86,11 +91,23 @@ def ais_inner_geocode(self):
row[ais_field] = feature['properties'][ais_field]
else:
print('Could not geocode "{}"'.format(query_elements))
errors += 1

if out_rows == None:
headers = rows._fieldnames + self.ais_fields
print(f'Headers!: {headers}')
if self.remove_fields:
headers = [x for x in headers if x not in self.remove_fields]
out_rows = csv.DictWriter(output_stream, headers)
out_rows.writeheader()

out_rows.writerow(row)
if self.remove_fields:
filtered_row = {field: row[field] for field in row if field not in self.remove_fields}
out_rows.writerow(filtered_row)
else:
out_rows.writerow(row)

s3 = boto3.resource('s3')
object = s3.Object(self.s3_bucket, self.s3_input_key)
filesize = size(object.content_length)

print(f'Saved to s3://{self.s3_bucket}/{self.s3_output_key} with {errors} geocoding errors. Output size: {filesize}')

0 comments on commit b21c6b7

Please sign in to comment.