Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Infer 'REQUIRED' mode with a flag for consistently filled in values #28

Merged
merged 6 commits into from
Mar 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion bigquery_schema_generator/generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ class SchemaGenerator:

def __init__(self,
input_format='json',
infer_mode=False,
keep_nulls=False,
quoted_values_are_strings=False,
debugging_interval=1000,
debugging_map=False):
self.input_format = input_format
self.infer_mode = infer_mode
self.keep_nulls = keep_nulls
self.quoted_values_are_strings = quoted_values_are_strings
self.debugging_interval = debugging_interval
Expand Down Expand Up @@ -198,6 +200,14 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry):
old_status = old_schema_entry['status']
new_status = new_schema_entry['status']

if old_status == 'soft' or new_status == 'soft':
update = {'is_always_filled_in': 'no'}
old_schema_entry.update(update)
new_schema_entry.update(update)

if old_schema_entry.get('is_always_filled_in') == 'undetermined' and new_status == 'hard':
new_schema_entry.update({'is_always_filled_in': 'yes'})

# new 'soft' does not clobber old 'hard'
if old_status == 'hard' and new_status == 'soft':
return old_schema_entry
Expand Down Expand Up @@ -258,12 +268,16 @@ def merge_schema_entry(self, old_schema_entry, new_schema_entry):
# might seem reasonable to allow a NULLABLE {primitive_type} to be
# upgraded to a REPEATED {primitive_type}, but currently 'bq load' does
# not support that so we must also follow that rule.
if old_mode != new_mode:
if old_mode != new_mode and not self.infer_mode:
raise Exception(('Mismatched mode for non-RECORD: '
'old=(%s,%s,%s,%s); new=(%s,%s,%s,%s)') %
(old_status, old_name, old_mode, old_type,
new_status, new_name, new_mode, new_type))

if new_schema_entry.get('info').get('mode') == 'NULLABLE':
if new_schema_entry.get('is_always_filled_in') == 'yes' and self.infer_mode:
new_schema_entry.get('info').update({'mode': 'REQUIRED'})

candidate_type = convert_type(old_type, new_type)
if not candidate_type:
raise Exception(
Expand Down Expand Up @@ -327,6 +341,7 @@ def get_schema_entry(self, key, value):
else:
status = 'hard'
schema_entry = OrderedDict([('status', status),
('is_always_filled_in', 'undetermined'),
('info', OrderedDict([
('mode', value_mode),
('name', key),
Expand Down Expand Up @@ -638,6 +653,11 @@ def main():
'--quoted_values_are_strings',
help='Quoted values should be interpreted as strings',
action="store_true")
parser.add_argument(
'--infer_mode',
help="If set to 'true', keys consistently having non-null values will gain 'REQUIRED' mode in the schema.",
action='store_false'
)
parser.add_argument(
'--debugging_interval',
help='Number of lines between heartbeat debugging messages',
Expand All @@ -655,6 +675,7 @@ def main():

generator = SchemaGenerator(
input_format=args.input_format,
infer_mode=args.infer_mode,
keep_nulls=args.keep_nulls,
quoted_values_are_strings=args.quoted_values_are_strings,
debugging_interval=args.debugging_interval,
Expand Down
2 changes: 2 additions & 0 deletions tests/test_generate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ def verify_data_chunk(self, chunk_count, chunk):
data_flags = chunk['data_flags']
input_format = 'csv' if ('csv' in data_flags) else 'json'
keep_nulls = ('keep_nulls' in data_flags)
infer_mode = ('infer_mode' in data_flags)
quoted_values_are_strings = ('quoted_values_are_strings' in data_flags)
records = chunk['records']
expected_errors = chunk['errors']
Expand All @@ -422,6 +423,7 @@ def verify_data_chunk(self, chunk_count, chunk):
# Generate schema.
generator = SchemaGenerator(
input_format=input_format,
infer_mode=infer_mode,
keep_nulls=keep_nulls,
quoted_values_are_strings=quoted_values_are_strings)
schema_map, error_logs = generator.deduce_schema(records)
Expand Down
63 changes: 63 additions & 0 deletions tests/testdata.txt
Original file line number Diff line number Diff line change
Expand Up @@ -831,3 +831,66 @@ SCHEMA
}
]
END

# Infer 'REQUIRED' mode for a consistently filled in value - simple
DATA csv infer_mode
a,b,c,d,e
,ho,hi,true
3,hu,he,
SCHEMA
[
{
"mode": "NULLABLE",
"name": "a",
"type": "INTEGER"
},
{
"mode": "REQUIRED",
"name": "b",
"type": "STRING"
},
{
"mode": "REQUIRED",
"name": "c",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "d",
"type": "BOOLEAN"
},
{
"mode": "NULLABLE",
"name": "e",
"type": "STRING"
}
]
END

# Infer 'REQUIRED' mode for a consistently filled in value - complex
DATA csv infer_mode
name,surname,age
John
Michael,,
Maria,Smith,30
Joanna,Anders,21
SCHEMA
[
{
"mode": "REQUIRED",
"name": "name",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "surname",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "age",
"type": "INTEGER"
}
]
END