Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions datacontract/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,13 +482,14 @@ def _get_uvicorn_arguments(port: int, host: str, context: typer.Context) -> dict
}

# Create a list of the extra arguments, remove the leading -- from the cli arguments
trimmed_keys = list(map(lambda x : str(x).replace("--", ""),context.args[::2]))
trimmed_keys = list(map(lambda x: str(x).replace("--", ""), context.args[::2]))
# Merge the two dicts and return them as one dict
return default_args | dict(zip(trimmed_keys, context.args[1::2]))


@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True})
def api(
ctx: Annotated[typer.Context, typer.Option(help="Extra arguments to pass to uvicorn.run().")],
ctx: Annotated[typer.Context, typer.Option(help="Extra arguments to pass to uvicorn.run().")],
port: Annotated[int, typer.Option(help="Bind socket to this port.")] = 4242,
host: Annotated[
str, typer.Option(help="Bind socket to this host. Hint: For running in docker, set it to 0.0.0.0")
Expand Down
105 changes: 13 additions & 92 deletions datacontract/export/odcs_v3_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,14 @@ def to_odcs_schema(model_key, model_value: Model) -> SchemaObject:
if model_value.description is not None:
schema_obj.description = model_value.description

properties = to_properties(model_value.fields)
if properties:
schema_obj.properties = properties
# Export fields with custom properties
schema_obj.properties = to_properties(model_value.fields)

model_quality = to_odcs_quality_list(model_value.quality)
if len(model_quality) > 0:
schema_obj.quality = model_quality

custom_properties = []
if model_value.model_extra is not None:
for key, value in model_value.model_extra.items():
custom_properties.append(CustomProperty(property=key, value=value))

if len(custom_properties) > 0:
schema_obj.customProperties = custom_properties
# Export custom properties for the model
if model_value.get_all_custom_properties():
schema_obj.customProperties = [
CustomProperty(property=key, value=value) for key, value in model_value.get_all_custom_properties().items()
]

return schema_obj

Expand Down Expand Up @@ -230,89 +223,17 @@ def to_physical_type(config: Dict[str, Any]) -> str | None:
def to_property(field_name: str, field: Field) -> SchemaProperty:
property = SchemaProperty(name=field_name)

if field.fields:
properties = []
for field_name_, field_ in field.fields.items():
property_ = to_property(field_name_, field_)
properties.append(property_)
property.properties = properties

if field.items:
items = to_property(field_name, field.items)
items.name = None # Clear the name for items
property.items = items

if field.title is not None:
property.businessName = field.title

if field.type is not None:
property.logicalType = to_logical_type(field.type)
property.physicalType = to_physical_type(field.config) or field.type
property.physicalType = field.type

if field.description is not None:
property.description = field.description

if field.required is not None:
property.required = field.required

if field.unique is not None:
property.unique = field.unique

if field.classification is not None:
property.classification = field.classification

if field.examples is not None:
property.examples = field.examples.copy()

if field.example is not None:
property.examples = [field.example]

if field.primaryKey is not None and field.primaryKey:
property.primaryKey = field.primaryKey
property.primaryKeyPosition = 1

if field.primary is not None and field.primary:
property.primaryKey = field.primary
property.primaryKeyPosition = 1

custom_properties = []
if field.model_extra is not None:
for key, value in field.model_extra.items():
custom_properties.append(CustomProperty(property=key, value=value))

if field.pii is not None:
custom_properties.append(CustomProperty(property="pii", value=field.pii))

if len(custom_properties) > 0:
property.customProperties = custom_properties

if field.tags is not None and len(field.tags) > 0:
property.tags = field.tags

logical_type_options = {}
if field.minLength is not None:
logical_type_options["minLength"] = field.minLength
if field.maxLength is not None:
logical_type_options["maxLength"] = field.maxLength
if field.pattern is not None:
logical_type_options["pattern"] = field.pattern
if field.minimum is not None:
logical_type_options["minimum"] = field.minimum
if field.maximum is not None:
logical_type_options["maximum"] = field.maximum
if field.exclusiveMinimum is not None:
logical_type_options["exclusiveMinimum"] = field.exclusiveMinimum
if field.exclusiveMaximum is not None:
logical_type_options["exclusiveMaximum"] = field.exclusiveMaximum

if logical_type_options:
property.logicalTypeOptions = logical_type_options

if field.quality is not None:
quality_list = field.quality
quality_property = to_odcs_quality_list(quality_list)
if len(quality_property) > 0:
property.quality = quality_property
# Export custom properties for the field
if field.get_all_custom_properties():
property.customProperties = [
CustomProperty(property=key, value=value) for key, value in field.get_all_custom_properties().items()
]

return property

Expand Down
81 changes: 25 additions & 56 deletions datacontract/imports/odcs_v3_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,17 +199,21 @@ def import_models(odcs: Any) -> Dict[str, Model]:
result = {}

for odcs_schema in odcs_schemas:
schema_name = odcs_schema.name
schema_physical_name = odcs_schema.physicalName
schema_description = odcs_schema.description if odcs_schema.description is not None else ""
model_name = schema_physical_name if schema_physical_name is not None else schema_name
model = Model(description=" ".join(schema_description.splitlines()) if schema_description else "", type="table")
model.fields = import_fields(odcs_schema.properties, custom_type_mappings, server_type=get_server_type(odcs))
if odcs_schema.quality is not None:
model.quality = convert_quality_list(odcs_schema.quality)
model.title = schema_name
if odcs_schema.dataGranularityDescription is not None:
model.config = {"dataGranularityDescription": odcs_schema.dataGranularityDescription}
model_name = odcs_schema.name
model = Model(
description=odcs_schema.description,
type=odcs_schema.physicalType,
title=odcs_schema.name,
)

# Import fields with custom properties
model.fields = import_fields(odcs_schema.properties, custom_type_mappings, server_type=None)

# Import custom properties for the model
if odcs_schema.customProperties is not None:
for custom_property in odcs_schema.customProperties:
model.add_custom_property(custom_property.property, custom_property.value)

result[model_name] = model

return result
Expand Down Expand Up @@ -319,59 +323,24 @@ def has_composite_primary_key(odcs_properties: List[SchemaProperty]) -> bool:
def import_fields(
odcs_properties: List[SchemaProperty], custom_type_mappings: Dict[str, str], server_type
) -> Dict[str, Field]:
logger = logging.getLogger(__name__)
result = {}

if odcs_properties is None:
return result

for odcs_property in odcs_properties:
mapped_type = map_type(odcs_property.logicalType, custom_type_mappings)
if mapped_type is not None:
property_name = odcs_property.name
description = odcs_property.description if odcs_property.description is not None else None
field = Field(
description=" ".join(description.splitlines()) if description is not None else None,
type=mapped_type,
title=odcs_property.businessName,
required=odcs_property.required if odcs_property.required is not None else None,
primaryKey=odcs_property.primaryKey
if not has_composite_primary_key(odcs_properties) and odcs_property.primaryKey is not None
else False,
unique=odcs_property.unique if odcs_property.unique else None,
examples=odcs_property.examples if odcs_property.examples is not None else None,
classification=odcs_property.classification if odcs_property.classification is not None else None,
tags=odcs_property.tags if odcs_property.tags is not None else None,
quality=convert_quality_list(odcs_property.quality),
fields=import_fields(odcs_property.properties, custom_type_mappings, server_type)
if odcs_property.properties is not None
else {},
config=import_field_config(odcs_property, server_type),
format=getattr(odcs_property, "format", None),
)
# mapped_type is array
if field.type == "array" and odcs_property.items is not None:
# nested array object
if odcs_property.items.logicalType == "object":
field.items = Field(
type="object",
fields=import_fields(odcs_property.items.properties, custom_type_mappings, server_type),
)
# array of simple type
elif odcs_property.items.logicalType is not None:
field.items = Field(type=odcs_property.items.logicalType)
field = Field(
name=odcs_property.name,
type=odcs_property.physicalType,
description=odcs_property.description,
)

# enum from quality validValues as enum
if field.type == "string":
for q in field.quality:
if hasattr(q, "validValues"):
field.enum = q.validValues
# Import custom properties for the field
if odcs_property.customProperties is not None:
for custom_property in odcs_property.customProperties:
field.add_custom_property(custom_property.property, custom_property.value)

result[property_name] = field
else:
logger.info(
f"Can't map {odcs_property.name} to the Datacontract Mapping types, as there is no equivalent or special mapping. Consider introducing a customProperty 'dc_mapping_{odcs_property.logicalType}' that defines your expected type as the 'value'"
)
result[odcs_property.name] = field

return result

Expand Down