Skip to content

Commit

Permalink
Forgot to include stuff to only extract certain fields
Browse files Browse the repository at this point in the history
  • Loading branch information
floptical committed Feb 7, 2024
1 parent a56dc51 commit c373a7d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 33 deletions.
71 changes: 38 additions & 33 deletions databridge_etl_tools/airtable/airtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,34 @@


class Airtable():
def __init__(self, app_id:str, api_key:str, table_name:str, s3_bucket:str, s3_key:str):
def __init__(self, app_id:str, api_key:str, table_name:str, s3_bucket:str, s3_key:str, get_fields:str):
self.app_id = app_id
self.api_key = api_key
self.table_name = table_name
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.offset = None
self.rows_per_page = 1000
self.get_fields = get_fields.split(',')
self.csv_path = f'/tmp/{self.table_name}.csv'

def get_fieldnames(self):

request_stmt = f'https://api.airtable.com/v0/{self.app_id}/{self.table_name}?maxRecords={self.rows_per_page}'

if self.get_fields:
for field in self.get_fields:
request_stmt = request_stmt + '&fields%5B%5D=' + field

print(f'Airtable endpoint: {request_stmt}')

response = requests.get(
f'https://api.airtable.com/v0/{self.app_id}/{self.table_name}?maxRecords={self.rows_per_page}',
request_stmt,
headers={
f'Authorization': f'Bearer {self.api_key}'
'Authorization': f'Bearer {self.api_key}'
}
)

data = response.json()

fieldnames = []
Expand All @@ -43,25 +52,34 @@ def get_fieldnames(self):

return fieldnames

def get_records(self):
endpoint = f'https://api.airtable.com/v0/{self.app_id}/{self.table_name}?maxRecords={self.rows_per_page}'
print(f'Starting extract from airtable endpoint: {endpoint}')
def get_records(self, offset=None):
#if 'offset' in kwargs.keys():
# offset = kwargs['offset']
#else:
# offset = None

request_stmt = f'https://api.airtable.com/v0/{self.app_id}/{self.table_name}?maxRecords={self.rows_per_page}'

if self.get_fields:
for field in self.get_fields:
request_stmt = request_stmt + '&fields%5B%5D=' + field


response = requests.get(
endpoint,
request_stmt,
headers={
'Authorization': f'Bearer {self.api_key}'
},
params={
'offset': self.offset
'offset': offset
}
)

data = response.json()
yield data['records']

if 'offset' in data:
yield from get_records(self.app_id, self.api_key, self.table_name, offset=data['offset'], rows_per_page=1000)
yield from self.get_records(offset=data['offset'])

def process_row(self, row: Dict) -> Dict:
for key, value in row.items():
Expand All @@ -84,34 +102,21 @@ def extract(self):

fieldnames = self.get_fieldnames()

if (self.s3_bucket and self.s3_key):

with open(self.csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)

writer.writeheader()

for records_batch in self.get_records():
for record in records_batch:
row = self.process_row(record['fields'])
writer.writerow(row)

num_lines = sum(1 for _ in open(self.csv_path)) - 1
file_size = size(os.path.getsize(self.csv_path))
print(f'Extraction successful? File size: {file_size}, total lines: {num_lines}')
self.load_to_s3()
self.clean_up()

else:
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
with open(self.csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)

writer.writeheader()

for records_batch in self.get_records(self.app_id, self.api_key, self.table_name):
for records_batch in self.get_records():
for record in records_batch:
row = self.process_row(record['fields'])
writer.writerow(row)

sys.stdout.flush()

num_lines = sum(1 for _ in open(self.csv_path)) - 1
file_size = size(os.path.getsize(self.csv_path))
print(f'Extraction successful? File size: {file_size}, total lines: {num_lines}')
self.load_to_s3()
self.clean_up()



1 change: 1 addition & 0 deletions databridge_etl_tools/airtable/airtable_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@click.option('--table_name', required=True)
@click.option('--s3_bucket', required=True)
@click.option('--s3_key', required=True)
@click.option('--get_fields', required=False, help='Fields you want to extract, comma separated string.')
def airtable(ctx, **kwargs):
ctx.obj = Airtable(**kwargs)

Expand Down

0 comments on commit c373a7d

Please sign in to comment.