Skip to content

Commit

Permalink
make knack extract actually work based on dannemans gh code in extrac…
Browse files Browse the repository at this point in the history
…t_knack
  • Loading branch information
floptical committed Feb 14, 2024
1 parent 9ca5f34 commit 41df9ba
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 26 deletions.
1 change: 1 addition & 0 deletions databridge_etl_tools/airtable/airtable.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ def extract(self):
writer.writerow(row)

num_lines = sum(1 for _ in open(self.csv_path)) - 1
assert num_lines > 0, 'CSV file contains 0 lines??'
file_size = size(os.path.getsize(self.csv_path))
print(f'Extraction successful? File size: {file_size}, total lines: {num_lines}')
self.load_to_s3()
Expand Down
175 changes: 149 additions & 26 deletions databridge_etl_tools/knack/knack.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@

import requests
import csv
import json
import os,sys
import boto3
import stringcase
from datetime import datetime
from hurry.filesize import size

csv.field_size_limit(sys.maxsize)
Expand All @@ -25,42 +28,162 @@ def __init__(self,
self.s3_key = s3_key
self.csv_path = '/tmp/output.csv'

def get_type(self, knack_type):
if knack_type == 'boolean':
return 'boolean'
if knack_type == 'number':
return 'number'
if knack_type == 'auto_increment':
return 'integer'
if knack_type == 'date_time':
return 'datetime'
if knack_type == 'multiple_choice':
return 'array'
if knack_type == 'address':
return 'object'
if knack_type == 'connection':
return 'array'
return 'string'

def convert_knack_schema(self, knack_fields):
fields = [{
'name': 'id',
'type': 'string',
'constraints': {
'required': True
}
}]

for field in knack_fields:
name = (
stringcase
.snakecase(field['label'])
.replace(' ', '')
.replace('(', '')
.replace(')', '')
.replace('__', '_')
.replace('_i_d', '_id')
)

field_def = {
'name': name,
'knack_key': field['key'],
'knack_type': field['type'],
'type': self.get_type(field['type'])
}

if field['required'] == True:
field_def['constraints'] = { 'required': True }

fields.append(field_def)

return {
'primaryKey': ['id'],
'missingValues': [''],
'fields': fields
}

def get_schema(self):
response = requests.get(
f'https://api.knack.com/v1/objects/{self.knack_objectid}/fields',
headers={
'X-Knack-Application-Id': self.app_id,
'X-Knack-REST-API-KEY': self.api_key
})

data = response.json()

return self.convert_knack_schema(data['fields'])

def get_records(self, page=1, rows_per_page=1000):
response = requests.get(
f'https://api.knack.com/v1/objects/{self.knack_objectid}/records',
params={
'rows_per_page': rows_per_page,
'page': page
},
headers={
'X-Knack-Application-Id': self.app_id,
'X-Knack-REST-API-KEY': self.api_key
})

data = response.json()

if not data['records']:
raise Exception(f"Failed to fetch data. Status Code: {response.status_code}. Reason: {response.text}")

yield data['records']

if int(data['current_page']) < data['total_pages']:
yield from self.get_records(self.app_id, self.api_key, self.knack_objectid, page=int(data['current_page']) + 1)

def convert_type(self, local_type, knack_type, value):
if value == None or value == '':
return None
if knack_type == 'connection':
return json.dumps(list(map(lambda x: x['id'], value)))
if knack_type == 'phone':
return value['full']
if knack_type == 'date_time':
return datetime.strptime(value['timestamp'], '%m/%d/%Y %I:%M %p').isoformat() + 'Z'
if local_type == 'array':
if not isinstance(value, list):
return json.dumps([value])
return json.dumps(value)
if local_type == 'object':
return json.dumps(value)
return value


def convert_to_csv_row(self, schema, record):
out = {}

for field in schema['fields']:
if 'knack_key' in field and (field['knack_key'] + '_raw') in record:
value = record[field['knack_key'] + '_raw']
elif 'knack_key' in field and field['knack_key'] in record:
value = record[field['knack_key']]
else:
value = record[field['name']]

if 'knack_type' in field:
out[field['name']] = self.convert_type(field['type'], field['knack_type'], value)
else:
out[field['name']] = value

return out



def load_to_s3(self):
s3 = boto3.resource('s3')
s3.Object(self.s3_bucket, self.s3_key).put(Body=open(self.csv_path, 'rb'))

def extract(self):
schema = self.get_schema()

# Knack API Endpoint
endpoint = f'https://api.knack.com/v1/objects/{self.knack_objectid}/records'
print(f'Starting extract from Knack endpoint: {endpoint}, app_id: {self.app_id}')

headers = {
'X-Knack-Application-Id': self.app_id,
'X-Knack-REST-API-Key': self.api_key,
'Content-Type': 'application/json'
}
headers = []
for field in schema['fields']:
headers.append(field['name'])

with open(self.csv_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=headers)

response = requests.get(endpoint, headers=headers)

if response.status_code == 200:
data = response.json()
records = data['records']

# Check if records exist and write to CSV
if records:
with open(self.csv_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=records[0].keys())
writer.writeheader()
for record in records:
writer.writerow(record)
num_lines = sum(1 for _ in open(self.csv_path)) - 1
file_size = size(os.path.getsize(self.csv_path))
print(f'Extraction successful? File size: {file_size}, total lines: {num_lines}')
self.load_to_s3()
else:
print("No records found.")
writer.writeheader()

else:
print(f"Failed to fetch data. Status Code: {response.status_code}. Reason: {response.text}")
for records_batch in self.get_records():
for record in records_batch:
out_record = self.convert_to_csv_row(schema, record)
writer.writerow(out_record)

num_lines = sum(1 for _ in open(self.csv_path)) - 1
assert num_lines > 0, 'CSV file contains 0 lines??'
file_size = size(os.path.getsize(self.csv_path))
print(f'Extraction successful? File size: {file_size}, total lines: {num_lines}')
self.load_to_s3()


1 change: 1 addition & 0 deletions databridge_etl_tools/knack/knack_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
@click.option('--knack_objectid', required=True, help='Not an objectid in the ESRI sense, refers to a table under an "app" in Knack')
@click.option('--s3_bucket', required=True, help='Bucket to place the extracted csv in.')
@click.option('--s3_key', required=True, help='key under the bucket, example: "staging/dept/table_name.csv')
@click.option('--indent', type=int, default=None, help='???')
def knack(ctx, **kwargs):
ctx.obj = Knack(**kwargs)

Expand Down
1 change: 1 addition & 0 deletions docker-fast-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ six==1.16.0
sniffio==1.3.0
soupsieve==2.4
stack-data==0.6.2
stringcase==1.2.0
terminado==0.17.1
tinycss2==1.2.1
tomli==2.0.1
Expand Down

0 comments on commit 41df9ba

Please sign in to comment.