Skip to content

Commit

Permalink
added static typing to data_utils.py (#662)
Browse files Browse the repository at this point in the history
* added static typing to data_utils.py

* changed re.Pattern type annotation to typing.Pattern

* removed logging import

* changed castings to if statement

* fixed formatting with black

* changed isinstance to cast

* updated read_csv_df() type signature to include None's

* changed isinstance to cast in read_json_df

* removed is_buf_wrapped in favor of isinstance

* added deleted comment

* added is_buf_wrapped back

* added JSONType

* fixed formatting

* added JSONType to read_json()

* updated unicode_to_str docstring

* moved JSONType to _typing.py

* changed JSONType to be nonrecursive and removed cast on key

* fix docstring

Co-authored-by: Michael Davis <[email protected]>
Co-authored-by: Taylor Turner <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2022
1 parent 44a3256 commit e4e54b6
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 33 deletions.
3 changes: 2 additions & 1 deletion dataprofiler/_typing.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Contains typing aliases."""
from typing import Union
from typing import Dict, List, Union

import numpy as np
import pandas as pd

DataArray = Union[pd.DataFrame, pd.Series, np.ndarray]
JSONType = Union[str, int, float, bool, None, List, Dict]
110 changes: 78 additions & 32 deletions dataprofiler/data_readers/data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,19 @@
import urllib
from builtins import next
from collections import OrderedDict
from io import BytesIO, TextIOWrapper
from io import BytesIO, StringIO, TextIOWrapper
from typing import (
Any,
Dict,
Generator,
Iterator,
List,
Optional,
Pattern,
Tuple,
Union,
cast,
)

import dateutil
import pandas as pd
Expand All @@ -13,12 +25,13 @@
from chardet.universaldetector import UniversalDetector

from .. import dp_logging
from .._typing import JSONType
from .filepath_or_buffer import FileOrBufferHandler, is_stream_buffer # NOQA

logger = dp_logging.get_child_logger(__name__)


def data_generator(data_list):
def data_generator(data_list: List[str]) -> Generator[str, None, None]:
"""
Take a list and return a generator on the list.
Expand All @@ -31,7 +44,9 @@ def data_generator(data_list):
yield item


def generator_on_file(file_object):
def generator_on_file(
file_object: Union[StringIO, BytesIO]
) -> Generator[Union[str, bytes], None, None]:
"""
Take a file and return a generator that returns lines.
Expand All @@ -49,7 +64,7 @@ def generator_on_file(file_object):
file_object.close()


def convert_int_to_string(x):
def convert_int_to_string(x: int) -> str:
"""
Convert the given input to string.
Expand All @@ -69,12 +84,12 @@ def convert_int_to_string(x):
return str(x)


def unicode_to_str(data, ignore_dicts=False):
def unicode_to_str(data: JSONType, ignore_dicts: bool = False) -> JSONType:
"""
Convert data to string representation if it is a unicode string.
:param data: input data
:type data: str
:type data: JSONType
:param ignore_dicts: if set, ignore the dictionary type processing
:type ignore_dicts: boolean
:return: string representation of data
Expand All @@ -99,12 +114,16 @@ def unicode_to_str(data, ignore_dicts=False):
return data


def json_to_dataframe(json_lines, selected_columns=None, read_in_string=False):
def json_to_dataframe(
json_lines: List[JSONType],
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Take list of json objects and return dataframe representing json list.
:param json_lines: list of json objects
:type json_lines: list(dict)
:type json_lines: list(JSONType)
:param selected_columns: a list of keys to be processed
:type selected_columns: list(str)
:param read_in_string: if True, all the values in dataframe will be
Expand Down Expand Up @@ -137,7 +156,11 @@ def json_to_dataframe(json_lines, selected_columns=None, read_in_string=False):
return df, original_df_dtypes


def read_json_df(data_generator, selected_columns=None, read_in_string=False):
def read_json_df(
data_generator: Generator,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[Iterator[pd.DataFrame], pd.Series]:
"""
Return an iterator that returns a chunk of data as dataframe in each call.
Expand All @@ -163,7 +186,7 @@ def read_json_df(data_generator, selected_columns=None, read_in_string=False):
each call as well as original dtypes of the dataframe columns.
:rtype: typle(Iterator(pd.DataFrame), pd.Series(dtypes)
"""
lines = list()
lines: List[JSONType] = list()
k = 0
while True:
try:
Expand All @@ -190,7 +213,11 @@ def read_json_df(data_generator, selected_columns=None, read_in_string=False):
return json_to_dataframe(lines, selected_columns, read_in_string)


def read_json(data_generator, selected_columns=None, read_in_string=False):
def read_json(
data_generator: Generator,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> List[JSONType]:
"""
Return the lines of a json.
Expand All @@ -215,7 +242,7 @@ def read_json(data_generator, selected_columns=None, read_in_string=False):
:return: returns the lines of a json file
:rtype: list(dict)
"""
lines = list()
lines: List[JSONType] = list()
k = 0
while True:
try:
Expand Down Expand Up @@ -243,13 +270,13 @@ def read_json(data_generator, selected_columns=None, read_in_string=False):


def read_csv_df(
file_path,
delimiter,
header,
selected_columns=[],
read_in_string=False,
encoding="utf-8",
):
file_path: Union[str, BytesIO, TextIOWrapper],
delimiter: Optional[str],
header: Optional[int],
selected_columns: List[str] = [],
read_in_string: bool = False,
encoding: Optional[str] = "utf-8",
) -> pd.DataFrame:
"""
Read a CSV file in chunks and return dataframe in form of iterator.
Expand All @@ -267,7 +294,7 @@ def read_csv_df(
:return: Iterator
:rtype: pd.DataFrame
"""
args = {
args: Dict[str, Any] = {
"delimiter": delimiter,
"header": header,
"iterator": True,
Expand Down Expand Up @@ -299,13 +326,18 @@ def read_csv_df(

# if the buffer was wrapped, detach it before returning
if is_buf_wrapped:
file_path = cast(TextIOWrapper, file_path)
file_path.detach()
fo.close()

return data


def read_parquet_df(file_path, selected_columns=None, read_in_string=False):
def read_parquet_df(
file_path: str,
selected_columns: Optional[List[str]] = None,
read_in_string: bool = False,
) -> Tuple[pd.DataFrame, pd.Series]:
"""
Return an iterator that returns one row group each time.
Expand Down Expand Up @@ -349,7 +381,9 @@ def read_parquet_df(file_path, selected_columns=None, read_in_string=False):
return data, original_df_dtypes


def read_text_as_list_of_strs(file_path, encoding=None):
def read_text_as_list_of_strs(
file_path: str, encoding: Optional[str] = None
) -> List[str]:
"""
Return list of strings relative to the chunk size.
Expand All @@ -367,7 +401,9 @@ def read_text_as_list_of_strs(file_path, encoding=None):
return data


def detect_file_encoding(file_path, buffer_size=1024, max_lines=20):
def detect_file_encoding(
file_path: str, buffer_size: int = 1024, max_lines: int = 20
) -> str:
"""
Determine encoding of files within initial `max_lines` of length `buffer_size`.
Expand Down Expand Up @@ -456,7 +492,7 @@ def _decode_is_valid(encoding):
return encoding.lower()


def detect_cell_type(cell):
def detect_cell_type(cell: str) -> str:
"""
Detect the cell type (int, float, etc).
Expand Down Expand Up @@ -488,7 +524,7 @@ def detect_cell_type(cell):
return cell_type


def get_delimiter_regex(delimiter=",", quotechar=","):
def get_delimiter_regex(delimiter: str = ",", quotechar: str = ",") -> Pattern[str]:
"""
Build regex for delimiter checks.
Expand Down Expand Up @@ -518,7 +554,12 @@ def get_delimiter_regex(delimiter=",", quotechar=","):
return re.compile(delimiter_regex + quotechar_regex)


def find_nth_loc(string=None, search_query=None, n=0, ignore_consecutive=True):
def find_nth_loc(
string: Optional[str] = None,
search_query: Optional[str] = None,
n: int = 0,
ignore_consecutive: bool = True,
) -> Tuple[int, int]:
"""
Search string via search_query and return nth index in which query occurs.
Expand Down Expand Up @@ -565,8 +606,12 @@ def find_nth_loc(string=None, search_query=None, n=0, ignore_consecutive=True):


def load_as_str_from_file(
file_path, file_encoding=None, max_lines=10, max_bytes=65536, chunk_size_bytes=1024
):
file_path: str,
file_encoding: Optional[str] = None,
max_lines: int = 10,
max_bytes: int = 65536,
chunk_size_bytes: int = 1024,
) -> str:
"""
Load data from a csv file up to a specific line OR byte_size.
Expand Down Expand Up @@ -602,7 +647,7 @@ def load_as_str_from_file(

# Return either the last index of sample_lines OR
# the index of the newline char that matches remaining_lines
search_query_value = "\n"
search_query_value: Union[str, bytes] = "\n"
if isinstance(sample_lines, bytes):
search_query_value = b"\n"

Expand All @@ -611,7 +656,8 @@ def load_as_str_from_file(
while start_loc < len_sample_lines - 1 and total_occurrences < max_lines:
loc, occurrence = find_nth_loc(
sample_lines[start_loc:],
search_query=search_query_value,
search_query=cast(str, search_query_value),
# TODO: make sure find_nth_loc() works with search_query as bytes
n=remaining_lines,
)

Expand All @@ -629,7 +675,7 @@ def load_as_str_from_file(
return data_as_str


def is_valid_url(url_as_string):
def is_valid_url(url_as_string: Any) -> bool:
"""
Determine whether a given string is a valid URL.
Expand All @@ -646,7 +692,7 @@ def is_valid_url(url_as_string):
return all([result.scheme, result.netloc])


def url_to_bytes(url_as_string, options):
def url_to_bytes(url_as_string: str, options: Dict) -> BytesIO:
"""
Read in URL and converts it to a byte stream.
Expand Down

0 comments on commit e4e54b6

Please sign in to comment.