Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
39d6a18
initial add az_read_file feat.
dmaresma Feb 25, 2025
f1e1418
initial add az_read_file feat.
dmaresma Feb 26, 2025
96a0ef4
Merge branch 'feature/azure_json_check' of https://github.com/dmaresm…
dmaresma Feb 26, 2025
1ff252f
add sql ddl for view, implement lineage tables
dmaresma Mar 24, 2025
7f97dbe
Merge branch 'main' into feature/azure_json_check
dmaresma Mar 24, 2025
d8c31c7
bring back simple-ddl-parser
dmaresma Mar 25, 2025
101a88f
improve csv test with duckdb 1.2.1 (force upgrade) to support utf-8 load
dmaresma Mar 31, 2025
7bcf6d6
fix emojy encoding issue on windows when export html format and catalog
dmaresma Mar 31, 2025
fd19ceb
improve json validation
dmaresma Apr 1, 2025
1c419cb
add mermaid entity relation diagram rendering in catalog
dmaresma Apr 3, 2025
366aa5c
improve odcs support with nested objects and arrays field
dmaresma Apr 4, 2025
f52e6f2
fix array importer
dmaresma Apr 4, 2025
d24ce43
fix
dmaresma Apr 4, 2025
6da434f
reorder code
dmaresma Apr 4, 2025
f1ea8a7
improve
dmaresma Apr 4, 2025
42e2f06
fix regression on required of not
dmaresma Apr 4, 2025
c890dec
add enum and reverse required add lineage to field
dmaresma Apr 7, 2025
92010c5
fix Typo on logicalType
dmaresma Apr 8, 2025
0940eec
fix server.roles mapping
dmaresma Apr 8, 2025
79af4ff
space clean
dmaresma Apr 8, 2025
cefcc82
refactor
dmaresma Apr 8, 2025
cee3c5b
Added relations between tables in erDiagram
ezhao-mck Apr 9, 2025
2c49c1a
Merge pull request #4 from EmmaZhao12/feature/azure_json_check
dmaresma Apr 9, 2025
da29a06
add db2 support test only
dmaresma Apr 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

### Changed
- Azure Storage Account json Check
- Azure token `{year}, {month}, {day}, {date}, {day-1}, {quarter}`

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions datacontract/catalog/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def create_data_contract_html(contracts, file: Path, path: Path, schema: str):
file_without_suffix = file.with_suffix(".html")
html_filepath = path / file_without_suffix
html_filepath.parent.mkdir(parents=True, exist_ok=True)
with open(html_filepath, "w") as f:
with open(html_filepath, "w", encoding='utf-8') as f:
f.write(html)
contracts.append(
DataContractView(
Expand All @@ -42,7 +42,7 @@ class DataContractView:

def create_index_html(contracts, path):
index_filepath = path / "index.html"
with open(index_filepath, "w") as f:
with open(index_filepath, "w", encoding='utf-8') as f:
# Load templates from templates folder
package_loader = PackageLoader("datacontract", "templates")
env = Environment(
Expand Down
6 changes: 3 additions & 3 deletions datacontract/engines/data_contract_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ def execute_data_contract_test(
run.checks.extend(create_checks(data_contract_specification, server))

# TODO check server is supported type for nicer error messages
# TODO check server credentials are complete for nicer error messages
if server.format == "json" and server.type != "kafka":
check_jsonschema(run, data_contract_specification, server)
#if server.format == "json" and server.type in ("azure", "s3"):
# check_jsonschema(run, data_contract_specification, server)
# with soda
check_soda_execute(run, data_contract_specification, server, spark, duckdb_connection)


Expand Down
68 changes: 68 additions & 0 deletions datacontract/engines/fastjsonschema/az/az_read_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import os

from datacontract.model.exceptions import DataContractException
from datacontract.model.run import Run, ResultEnum


def yield_az_files(run: Run, az_storageAccount, az_location):
fs = az_fs(az_storageAccount)
files = fs.glob(az_location)
for file in files:
with fs.open(file) as f:
run.log_info(f"Downloading file {file}")
yield f.read()


def az_fs(az_storageAccount):
try:
import adlfs
except ImportError as e:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="az extra missing",
reason="Install the extra datacontract-cli\\[azure] to use az",
engine="datacontract",
original_exception=e,
)

az_client_id = os.getenv("DATACONTRACT_AZURE_CLIENT_ID")
if az_client_id is None:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="az env. variable DATACONTRACT_AZURE_CLIENT_ID missing",
reason="configure export DATACONTRACT_AZURE_CLIENT_ID=*** ",
engine="datacontract",
original_exception=e,
)

az_client_secret = os.getenv("DATACONTRACT_AZURE_CLIENT_SECRET")
if az_client_secret is None:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="az env. variable DATACONTRACT_AZURE_CLIENT_SECRET missing",
reason="configure export DATACONTRACT_AZURE_CLIENT_SECRET=*** ",
engine="datacontract",
original_exception=e,
)

az_tenant_id = os.getenv("DATACONTRACT_AZURE_TENANT_ID")
if az_tenant_id is None:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="az env. variable DATACONTRACT_AZURE_TENANT_ID missing",
reason="configure export DATACONTRACT_AZURE_TENANT_ID=*** ",
engine="datacontract",
original_exception=e,
)

return adlfs.AzureBlobFileSystem(
account_name=az_storageAccount,
client_id=az_client_id,
client_secret=az_client_secret,
tenant_id=az_tenant_id,
anon=az_client_id is None,
)
80 changes: 62 additions & 18 deletions datacontract/engines/fastjsonschema/check_jsonschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
import logging
import os
import threading
from datetime import datetime
from typing import List, Optional

import fastjsonschema
from fastjsonschema import JsonSchemaValueException

from datacontract.engines.fastjsonschema.s3.s3_read_files import yield_s3_files
from datacontract.engines.fastjsonschema.az.az_read_files import yield_az_files
from datacontract.export.jsonschema_converter import to_jsonschema
from datacontract.model.data_contract_specification import DataContractSpecification, Server
from datacontract.model.exceptions import DataContractException
Expand Down Expand Up @@ -85,15 +87,15 @@ def process_exceptions(run, exceptions: List[DataContractException]):


def validate_json_stream(
schema: dict, model_name: str, validate: callable, json_stream: list[dict]
run: Run, schema: dict, model_name: str, validate: callable, json_stream: list[dict]
) -> List[DataContractException]:
logging.info(f"Validating JSON stream for model: '{model_name}'.")
run.log_info(f"Validating JSON stream for model: '{model_name}'.")
exceptions: List[DataContractException] = []
for json_obj in json_stream:
try:
validate(json_obj)
except JsonSchemaValueException as e:
logging.warning(f"Validation failed for JSON object with type: '{model_name}'.")
run.log_warn(f"Validation failed for JSON object with type: '{model_name}'.")
primary_key_value = get_primary_key_value(schema, model_name, json_obj)
exceptions.append(
DataContractException(
Expand All @@ -107,7 +109,7 @@ def validate_json_stream(
)
)
if not exceptions:
logging.info(f"All JSON objects in the stream passed validation for model: '{model_name}'.")
run.log_info(f"All JSON objects in the stream passed validation for model: '{model_name}'.")
return exceptions


Expand Down Expand Up @@ -151,7 +153,7 @@ def process_json_file(run, schema, model_name, validate, file, delimiter):
json_stream = read_json_file(file)

# Validate the JSON stream and collect exceptions.
exceptions = validate_json_stream(schema, model_name, validate, json_stream)
exceptions = validate_json_stream(run, schema, model_name, validate, json_stream)

# Handle all errors from schema validation.
process_exceptions(run, exceptions)
Expand All @@ -165,7 +167,7 @@ def process_local_file(run, server, schema, model_name, validate):
if os.path.isdir(path):
return process_directory(run, path, server, model_name, validate)
else:
logging.info(f"Processing file {path}")
run.log_info(f"Processing file {path}")
with open(path, "r") as file:
process_json_file(run, schema, model_name, validate, file, server.delimiter)

Expand All @@ -189,7 +191,7 @@ def process_s3_file(run, server, schema, model_name, validate):
s3_location = s3_location.format(model=model_name)
json_stream = None

for file_content in yield_s3_files(s3_endpoint_url, s3_location):
for file_content in yield_s3_files(run, s3_endpoint_url, s3_location):
if server.delimiter == "new_line":
json_stream = read_json_lines_content(file_content)
elif server.delimiter == "array":
Expand All @@ -207,11 +209,62 @@ def process_s3_file(run, server, schema, model_name, validate):
)

# Validate the JSON stream and collect exceptions.
exceptions = validate_json_stream(schema, model_name, validate, json_stream)
exceptions = validate_json_stream(run, schema, model_name, validate, json_stream)

# Handle all errors from schema validation.
process_exceptions(run, exceptions)

def process_azure_file(run, server, schema, model_name, validate):

if server.storageAccount is None:
raise DataContractException(
type="schema",
name="Check that JSON has valid schema",
result="warning",
reason=f"Cannot retrieve storageAccount in Server config",
engine="datacontract",
)

az_storageAccount = server.storageAccount
az_location = server.location
date = datetime.today()

if "{model}" in az_location:
date = datetime.today()
month_to_quarter = { 1: "Q1", 2: "Q1", 3: "Q1", 4: "Q2", 5: "Q2", 6: "Q2",
7: "Q3", 8: "Q3", 9: "Q3",10: "Q4", 11: "Q4", 12: "Q4" }

az_location = az_location.format(model=model_name,
year=date.strftime('%Y'),
month=date.strftime('%m'),
day=date.strftime('%d'),
date=date.strftime('%Y-%m-%d'),
quarter=month_to_quarter.get(date.month))

json_stream = None

for file_content in yield_az_files(run, az_storageAccount, az_location):
if server.delimiter == "new_line":
json_stream = read_json_lines_content(file_content)
elif server.delimiter == "array":
json_stream = read_json_array_content(file_content)
else:
json_stream = read_json_file_content(file_content)

if json_stream is None:
raise DataContractException(
type="schema",
name="Check that JSON has valid schema",
result="warning",
reason=f"Cannot find any file in {az_location}",
engine="datacontract",
)

# Validate the JSON stream and collect exceptions.
exceptions = validate_json_stream(run, schema, model_name, validate, json_stream)

# Handle all errors from schema validation.
process_exceptions(run, exceptions)

def check_jsonschema(run: Run, data_contract: DataContractSpecification, server: Server):
run.log_info("Running engine jsonschema")
Expand Down Expand Up @@ -262,16 +315,7 @@ def check_jsonschema(run: Run, data_contract: DataContractSpecification, server:
)
)
elif server.type == "azure":
run.checks.append(
Check(
type="schema",
name="Check that JSON has valid schema",
model=model_name,
result=ResultEnum.info,
reason="JSON Schema check skipped for azure, as azure is currently not supported",
engine="jsonschema",
)
)
process_azure_file(run, server, schema, model_name, validate)
else:
run.checks.append(
Check(
Expand Down
37 changes: 33 additions & 4 deletions datacontract/engines/fastjsonschema/s3/s3_read_files.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import logging
import os

from datacontract.model.exceptions import DataContractException
from datacontract.model.run import ResultEnum
from datacontract.model.run import Run, ResultEnum


def yield_s3_files(s3_endpoint_url, s3_location):
def yield_s3_files(run: Run, s3_endpoint_url, s3_location):
fs = s3_fs(s3_endpoint_url)
files = fs.glob(s3_location)
for file in files:
with fs.open(file) as f:
logging.info(f"Downloading file {file}")
run.log_info(f"Downloading file {file}")
yield f.read()


Expand All @@ -28,8 +27,38 @@ def s3_fs(s3_endpoint_url):
)

aws_access_key_id = os.getenv("DATACONTRACT_S3_ACCESS_KEY_ID")
if aws_access_key_id is None:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="s3 env. variable DATACONTRACT_S3_ACCESS_KEY_ID missing",
reason="configure export DATACONTRACT_S3_ACCESS_KEY_ID=*** ",
engine="datacontract",
original_exception=e,
)

aws_secret_access_key = os.getenv("DATACONTRACT_S3_SECRET_ACCESS_KEY")
if aws_secret_access_key is None:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="s3 env. variable DATACONTRACT_S3_SECRET_ACCESS_KEY missing",
reason="configure export DATACONTRACT_S3_SECRET_ACCESS_KEY=*** ",
engine="datacontract",
original_exception=e,
)

aws_session_token = os.getenv("DATACONTRACT_S3_SESSION_TOKEN")
if aws_session_token is None:
raise DataContractException(
type="schema",
result=ResultEnum.failed,
name="s3 env. variable DATACONTRACT_S3_SESSION_TOKEN missing",
reason="configure export DATACONTRACT_S3_SESSION_TOKEN=*** ",
engine="datacontract",
original_exception=e,
)

return s3fs.S3FileSystem(
key=aws_access_key_id,
secret=aws_secret_access_key,
Expand Down
5 changes: 5 additions & 0 deletions datacontract/engines/soda/check_soda_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from datacontract.engines.soda.connections.postgres import to_postgres_soda_configuration
from datacontract.engines.soda.connections.snowflake import to_snowflake_soda_configuration
from datacontract.engines.soda.connections.sqlserver import to_sqlserver_soda_configuration
from datacontract.engines.soda.connections.db2 import to_db2_soda_configuration
from datacontract.engines.soda.connections.trino import to_trino_soda_configuration
from datacontract.export.sodacl_converter import to_sodacl_yaml
from datacontract.model.data_contract_specification import DataContractSpecification, Server
Expand Down Expand Up @@ -92,6 +93,10 @@ def check_soda_execute(
logging.info("Use Spark to connect to data source")
scan.add_spark_session(spark, data_source_name="datacontract-cli")
scan.set_data_source_name("datacontract-cli")
elif server.type == "db2":
soda_configuration_str = to_db2_soda_configuration(server)
scan.add_configuration_yaml_str(soda_configuration_str)
scan.set_data_source_name(server.type)
elif server.type == "kafka":
if spark is None:
spark = create_spark_session()
Expand Down
35 changes: 35 additions & 0 deletions datacontract/engines/soda/connections/db2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os

import yaml

from datacontract.model.data_contract_specification import Server


def to_db2_soda_configuration(server: Server) -> str:
"""Serialize server config to soda configuration.


### Example:
type: DB2
host: 127.0.0.1
port: '50000'
username: simple
password: simple_pass
database: database
schema: public
"""
# with service account key, using an external json file
soda_configuration = {
f"data_source {server.type}": {
"type": "db2",
"host": server.host,
"port": str(server.port),
"username": os.getenv("DATACONTRACT_DB2_USERNAME", ""),
"password": os.getenv("DATACONTRACT_DB2_PASSWORD", ""),
"database": server.database,
"schema": server.schema_,
}
}

soda_configuration_str = yaml.dump(soda_configuration)
return soda_configuration_str
Loading