Source code for deirokay.parser.reader.dask_reader

from posixpath import splitext
from typing import Any, Dict, List, Union

import dask.dataframe  # lazy module


[docs]def read(data: Union[str, 'dask.dataframe.DataFrame'], columns: List[str], sql: bool = False, **kwargs) -> 'dask.dataframe.DataFrame': """Infer the file type by its extension and call the proper `dask.dataframe` method to parse it. Parameters ---------- data : Union[str, dask.dataframe.DataFrame] Path to file or SQL query, or DataFrame object columns : List[str] List of columns to be parsed. sql : bool, optional Whether or not `data` should be interpreted as a path to a file or a SQL query. **kwargs : dict Arguments to be passed to `dask.dataframe` methods when reading. Returns ------- dask.dataframe.DataFrame The dask DataFrame. """ default_kwargs: Dict[str, Any] = {} if isinstance(data, dask.dataframe.DataFrame): return data[columns].copy() if not isinstance(data, str): raise TypeError(f'Unexpected type for `data` ({data.__class__})') if sql: read_ = dask.dataframe.read_sql else: file_extension = splitext(data)[1].lstrip('.').lower() if file_extension == 'csv': default_kwargs.update({ 'dtype': str, 'skipinitialspace': True, }) read_ = getattr(dask.dataframe, f'read_{file_extension}', None) if read_ is None: raise TypeError(f'File type "{file_extension}" not supported') default_kwargs.update(kwargs) # try `columns` argument try: read_data = read_(data, columns=columns, **default_kwargs) except TypeError as e: if 'columns' not in str(e): raise e # try `usecols` argument try: read_data = read_(data, usecols=columns, **default_kwargs) except TypeError as e: if 'usecols' not in str(e): raise e # give up, read everything, filter columns later read_data = read_(data, **default_kwargs)[columns] if sql: read_data = read_data.reset_index() return read_data