Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added static typing to data_readers/base_data.py and data_readers/json_data.py #666

Merged
merged 12 commits into from
Oct 7, 2022
14 changes: 10 additions & 4 deletions dataprofiler/data_readers/avro_data.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Contains class for saving and loading spreadsheet data."""
from io import BytesIO, StringIO
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Union, cast

import fastavro

Expand Down Expand Up @@ -56,7 +56,7 @@ def file_encoding(self) -> Optional[str]:
def file_encoding(self, value: Any) -> None:
"""Do nothing.

Required by mypy because the inherited self.file_encoding is read-write).
Required by mypy because the inherited self.file_encoding is read-write.
"""
pass

Expand Down Expand Up @@ -91,13 +91,19 @@ def is_match(
options = dict()

# get current position of stream
if data_utils.is_stream_buffer(file_path) and not isinstance(file_path, str):
if data_utils.is_stream_buffer(file_path):
Sanketh7 marked this conversation as resolved.
Show resolved Hide resolved
file_path = cast(
Union[StringIO, BytesIO], file_path
) # guaranteed by is_stream_buffer
starting_location = file_path.tell()

is_valid_avro = fastavro.is_avro(file_path)

# return to original position in stream
if data_utils.is_stream_buffer(file_path) and not isinstance(file_path, str):
if data_utils.is_stream_buffer(file_path):
file_path = cast(
Union[StringIO, BytesIO], file_path
) # guaranteed by is_stream_buffer
file_path.seek(starting_location, 0)

return is_valid_avro
Expand Down
61 changes: 34 additions & 27 deletions dataprofiler/data_readers/base_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import sys
from collections import OrderedDict
from io import StringIO
from typing import Any, Dict, Generator, List, Optional, Union

import numpy as np
import pandas as pd
Expand All @@ -16,10 +17,12 @@
class BaseData(object):
"""Abstract class for data loading and saving."""

data_type = None
info = None
data_type: Optional[str] = None
info: Optional[str] = None

def __init__(self, input_file_path, data, options):
def __init__(
self, input_file_path: Optional[str], data: Any, options: Dict
) -> None:
"""
Initialize Base class for loading a dataset.

Expand All @@ -39,7 +42,7 @@ def __init__(self, input_file_path, data, options):

# Public properties
self.input_file_path = input_file_path
self.options = options
self.options: Optional[Dict] = options

# 'Private' properties
# _data_formats: dict containing data_formats (key) and function
Expand All @@ -53,12 +56,12 @@ def __init__(self, input_file_path, data, options):
# constant across function calls.
# _tmp_file_name: randomly set variables for file name usable by system
# _file_encoding: contains the suggested file encoding for reading data
self._data_formats = OrderedDict()
self._selected_data_format = None
self._data = data
self._batch_info = dict(perm=list(), iter=0)
self._tmp_file_name = None
self._file_encoding = options.get("encoding", None)
self._data_formats: Dict[str, Any] = OrderedDict()
self._selected_data_format: Optional[str] = None
self._data: Optional[Any] = data
self._batch_info: Dict = dict(perm=list(), iter=0)
self._tmp_file_name: Optional[str] = None
self._file_encoding: Optional[str] = options.get("encoding", None)

@property
def data(self):
Expand All @@ -79,17 +82,12 @@ def data(self):
)

@property
def data_format(self):
def data_format(self) -> Optional[str]:
"""Return data format."""
return self._selected_data_format

@property
def is_structured(self):
"""Determine compatibility with StructuredProfiler."""
raise NotImplementedError

@data_format.setter
def data_format(self, value):
def data_format(self, value: str):
allowed_data_formats = list(self._data_formats.keys())
if value.lower() not in allowed_data_formats:
raise ValueError(
Expand All @@ -100,7 +98,12 @@ def data_format(self, value):
self._selected_data_format = value.lower()

@property
def file_encoding(self):
def is_structured(self) -> bool:
"""Determine compatibility with StructuredProfiler."""
raise NotImplementedError

@property
def file_encoding(self) -> Optional[str]:
"""Return file encoding."""
if not self._file_encoding:
# get system default, but if set to ascii, just update to utf-8
Expand All @@ -122,7 +125,7 @@ def file_encoding(self):
return self._file_encoding

@file_encoding.setter
def file_encoding(self, value):
def file_encoding(self, value: str) -> None:
"""Set file encoding."""
valid_user_set_encodings = ["ascii", "utf-8", "utf-16", "utf-32"]
if not value or value.lower() not in valid_user_set_encodings:
Expand All @@ -134,19 +137,21 @@ def file_encoding(self, value):
self._file_encoding = value

@staticmethod
def _check_and_return_options(options):
def _check_and_return_options(options: Optional[Dict]) -> Dict:
"""Return options or raise error."""
if not options:
options = dict()
elif not isinstance(options, dict):
raise ValueError("Options must be a dictionary.")
return options

def _load_data(self, data=None):
def _load_data(self, data: Optional[Any] = None) -> None:
"""Load data."""
raise NotImplementedError()

def get_batch_generator(self, batch_size):
def get_batch_generator(
self, batch_size: int
) -> Generator[Union[pd.DataFrame, List], None, None]:
"""Get batch generator."""
data_length = len(self.data)
indices = np.random.permutation(data_length)
Expand All @@ -157,11 +162,13 @@ def get_batch_generator(self, batch_size):
yield list(self.data[k] for k in indices[i : i + batch_size])

@classmethod
def is_match(cls, input_file_path, options):
def is_match(cls, input_file_path: str, options: Optional[Dict]) -> bool:
"""Return true if match, false otherwise."""
raise NotImplementedError()

def reload(self, input_file_path, data, options):
def reload(
self, input_file_path: Optional[str], data: Any, options: Optional[Dict]
) -> None:
"""
Reload the data class with a new dataset.

Expand All @@ -185,7 +192,7 @@ def reload(self, input_file_path, data, options):
self.options = None
self._batch_info = dict(perm=list(), iter=0)

def __len__(self):
def __len__(self) -> int:
"""
Return the length of the dataset which is loaded.

Expand All @@ -194,15 +201,15 @@ def __len__(self):
return len(self.data)

@property
def length(self):
def length(self) -> int:
"""
Return the length of the dataset which is loaded.

:return: length of the dataset
"""
return len(self)

def __getattribute__(self, name):
def __getattribute__(self, name: Any) -> Any:
"""
Override getattr for the class.

Expand Down
75 changes: 46 additions & 29 deletions dataprofiler/data_readers/json_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import warnings
from collections import OrderedDict
from typing import Dict, List, Optional, Union

import numpy as np
import pandas as pd
Expand All @@ -16,9 +17,14 @@
class JSONData(SpreadSheetDataMixin, BaseData):
"""SpreadsheetData class to save and load spreadsheet data."""

data_type = "json"
data_type: Optional[str] = "json"

def __init__(self, input_file_path=None, data=None, options=None):
def __init__(
self,
input_file_path: Optional[str] = None,
data: Optional[Union[str, pd.DataFrame]] = None,
options: Optional[Dict] = None,
):
"""
Initialize Data class for loading datasets of type JSON.

Expand Down Expand Up @@ -66,30 +72,32 @@ def __init__(self, input_file_path=None, data=None, options=None):
self._data_formats[
"flattened_dataframe"
] = self._get_data_as_flattened_dataframe
self._selected_data_format = options.get("data_format", "flattened_dataframe")
self._payload_keys = options.get("payload_keys", ["data", "payload"])
self._selected_data_format: str = options.get(
"data_format", "flattened_dataframe"
)
self._payload_keys: List[str] = options.get("payload_keys", ["data", "payload"])
if not isinstance(self._payload_keys, list):
self._payload_keys = [self._payload_keys]
self._key_separator = options.get("key_separator", ".")
self._selected_keys = options.get("selected_keys", list())
self._metadata = None
self._key_separator: str = options.get("key_separator", ".")
self._selected_keys: Optional[List[str]] = options.get("selected_keys", list())
self._metadata: Optional[pd.DataFrame] = None
if data is not None:
self._load_data(data)

@property
def selected_keys(self):
def selected_keys(self) -> Optional[List[str]]:
"""Return selected keys."""
return self._selected_keys

@property
def metadata(self):
def metadata(self) -> Optional[pd.DataFrame]:
"""Return a data frame that contains the metadata."""
if self._metadata is None or self._metadata.empty:
warnings.warn("No metadata was detected.")
return self._metadata

@property
def data_and_metadata(self):
def data_and_metadata(self) -> Optional[pd.DataFrame]:
"""Return a data frame that joins the data and the metadata."""
data = self.data
if self._metadata is not None and not self._metadata.empty:
Expand Down Expand Up @@ -227,13 +235,13 @@ def _get_data_as_flattened_dataframe(self, json_lines):

return data

def _load_data_from_str(self, data_as_str):
def _load_data_from_str(self, data_as_str: str) -> List:
"""
Load the data from a string.

:param data_as_str: data in string format.
:type data_as_str: str
:return:
:return: dict
"""
try:
data = json.loads(data_as_str)
Expand All @@ -246,7 +254,7 @@ def _load_data_from_str(self, data_as_str):
)
return data

def _load_data_from_file(self, input_file_path):
def _load_data_from_file(self, input_file_path: str) -> List:
"""
Load the data from a file.

Expand All @@ -268,36 +276,38 @@ def _load_data_from_file(self, input_file_path):
)
return data

def _get_data_as_records(self, data):
def _get_data_as_records(self, data: List) -> List[str]:
"""
Extract the data as a record format.

:param data: raw data
:type data: list
:return: dataframe in record format
"""
data = self._get_data_as_df(data)
data = data.to_dict(orient="records", into=OrderedDict)
for i, sample in enumerate(data):
data[i] = json.dumps(
_data: Union[pd.DataFrame, List]
_data = self._get_data_as_df(data)
_data = _data.to_dict(orient="records", into=OrderedDict)
for i, sample in enumerate(_data):
_data[i] = json.dumps(
self._convert_flat_to_nested_cols(sample), ensure_ascii=False
)
return super(JSONData, self)._get_data_as_records(data)
return super(JSONData, self)._get_data_as_records(_data)

def _get_data_as_json(self, data):
def _get_data_as_json(self, data: List) -> List[str]:
"""
Extract the data as a json format.

:param data: raw data
:type data: list
:return: dataframe in json format
"""
data = self._get_data_as_df(data)
data = data.to_json(orient="records")
char_per_line = min(len(data), self.SAMPLES_PER_LINE_DEFAULT)
return list(map("".join, zip(*[iter(data)] * char_per_line)))
_data: Union[pd.DataFrame, List]
_data = self._get_data_as_df(data)
_data = _data.to_json(orient="records")
char_per_line = min(len(_data), self.SAMPLES_PER_LINE_DEFAULT)
return list(map("".join, zip(*[iter(_data)] * char_per_line)))

def _get_data_as_df(self, data):
def _get_data_as_df(self, data: Union[pd.DataFrame, Dict, List]) -> pd.DataFrame:
"""
Extract the data as pandas formats it.

Expand All @@ -316,7 +326,7 @@ def _get_data_as_df(self, data):
return data

@classmethod
def _convert_flat_to_nested_cols(cls, dic, separator="."):
def _convert_flat_to_nested_cols(cls, dic: Dict, separator: str = ".") -> Dict:
"""
Convert a flat dict to nested dict.

Expand Down Expand Up @@ -350,7 +360,9 @@ def _convert_flat_to_nested_cols(cls, dic, separator="."):
return dic

@classmethod
def is_match(cls, file_path, options=None):
def is_match(
cls, file_path: Union[str, StringIO], options: Optional[Dict] = None
) -> bool:
"""
Test whether first 1000 lines of file has valid JSON format or not.

Expand Down Expand Up @@ -402,7 +414,12 @@ def is_match(cls, file_path, options=None):
else:
return False

def reload(self, input_file_path=None, data=None, options=None):
def reload(
self,
input_file_path: Optional[str] = None,
data: Optional[Union[str, pd.DataFrame]] = None,
options: Optional[Dict] = None,
) -> None:
"""
Reload the data class with a new dataset.

Expand All @@ -419,4 +436,4 @@ def reload(self, input_file_path=None, data=None, options=None):
"""
self._selected_keys = None
super(JSONData, self).reload(input_file_path, data, options)
self.__init__(self.input_file_path, data, options)
self.__init__(self.input_file_path, data, options) # type: ignore
Loading