"""
Module for FileSystem abstractions and utilities for multi-purpose
paths.
"""
import contextlib
import importlib
import itertools
import json
import os
import re
import sys
from collections import deque
from os.path import splitext
from pathlib import Path
from tempfile import NamedTemporaryFile
from types import ModuleType
from typing import IO, Generator, Iterable, Optional, Sequence
import yaml
from typing_extensions import Literal
boto3_import_error = None
try:
import boto3
except ImportError as e:
boto3 = None
boto3_import_error = e
BUCKET_KEY_REGEX = re.compile(r's3:\/\/([\w\-]+)\/([\w\-\/.]*)')
[docs]def split_s3_path(s3_path: str) -> tuple:
"""Split a full s3 path into `bucket` and `key` parts.
Parameters
----------
s3_path : str
Full S3 path, such as `s3://my-bucket/my-prefix/` or
`s3://my-bucket/my-prefix/my-file.txt`.
Returns
-------
tuple
`(bucket, key)` tuple.
"""
bucket, key = BUCKET_KEY_REGEX.findall(s3_path)[0]
return bucket, key
def _import_file_as_python_module(path_to_file: str) -> ModuleType:
"""Import a .py file as a Python module.
Parameters
----------
path_to_file : str
Path to .py file.
Returns
-------
ModuleType
Imported module.
Raises
------
ValueError
Not a valid Python file.
"""
module_path, extension = os.path.splitext(path_to_file)
module_name = os.path.basename(module_path)
if extension not in ('.py', '.o'):
raise ValueError('You should pass a valid Python file')
module_dir = os.path.dirname(path_to_file)
sys.path.insert(0, module_dir)
module = importlib.import_module(module_name)
sys.path.pop(0)
return module
[docs]class FileSystem():
"""Abstract file system operations over folders and files in
any place, such as local or in S3 buckets.
Parameters
----------
path : str
Path to file or folder.
"""
def __init__(self, path: str):
self.path = path
[docs] def ls(self, recursive: bool = False, files_only: bool = False,
reverse: bool = False, limit: Optional[int] = None
) -> Sequence['FileSystem']:
"""List files in a prefix or folder.
Parameters
----------
recursive : bool, optional
Whether or not to list subfolders recursively,
by default False
files_only : bool, optional
List only files, ignore folders/prefixes,
by default False.
"""
raise NotImplementedError
[docs] def read_dict(self, *args, **kwargs) -> dict:
"""Read and parse a `dict`-like file from either YAML or JSON
format.
Returns
-------
dict
Python dictionary with the file content.
"""
extension = splitext(self.path)[1].lower()
if extension == '.json':
return self.read_json(*args, **kwargs)
elif extension in ('.yaml', '.yml'):
return self.read_yaml(*args, **kwargs)
raise NotImplementedError(f'No parser for file type: {extension}')
[docs] def write_dict(self, *args, **kwargs) -> None:
"""Serialize and write a Python `dict` to either YAML or JSON
file."""
extension = splitext(self.path)[1].lower()
if extension == '.json':
return self.write_json(*args, **kwargs)
elif extension in ('.yaml', '.yml'):
return self.write_yaml(*args, **kwargs)
raise NotImplementedError(f'No serializer for file type: {extension}')
[docs] def read_yaml(self) -> dict:
"""Read and parse a YAML file as a Python `dict`.
Returns
-------
dict
Python dictionary with the file content.
"""
with self.open('r') as fp:
return yaml.safe_load(fp)
[docs] def write_yaml(self, doc: dict, **kwargs) -> None:
"""Serialize and write a Python `dict` to a YAML file.
Parameters
----------
doc : dict
The Python `dict`.
"""
with self.open('w') as fp:
yaml.dump(doc, fp, sort_keys=False, **kwargs)
[docs] def read_json(self) -> dict:
"""Read and parse a JSON file as a Python `dict`.
Returns
-------
dict
Python dictionary with the file content.
"""
with self.open('r') as fp:
return json.load(fp)
[docs] def write_json(self, doc: dict, **kwargs) -> None:
"""Serialize and write a Python `dict` to a JSON file.
Parameters
----------
doc : dict
The Python `dict`.
"""
with self.open('w') as fp:
json.dump(doc, fp, **kwargs)
[docs] def isdir(self) -> bool:
"""Return True if the path is a directory.
Returns
-------
bool
Whether or not the path is a directory
Raises
------
NotImplementedError
Operation not valid or not implemented.
"""
raise NotImplementedError
[docs] def mkdir(self, *args, **kwargs):
"""Create directory using `Path.mkdir` method. Arguments are
passed directly to this method.
Raises
------
NotImplementedError
Operator not valid or not implemented.
"""
raise NotImplementedError
[docs] def import_as_python_module(self) -> ModuleType:
"""Import file as a Python module."""
raise NotImplementedError
[docs] def open(self, mode: Literal['r', 'w'], *args, **kwargs) -> IO:
"""Open file."""
raise NotImplementedError
[docs] def read(self, *args, **kwargs) -> str:
"""Read a file as text."""
with self.open(*args, mode='r', **kwargs) as fp:
return fp.read()
[docs] def __truediv__(self, rest: str) -> 'FileSystem':
"""Create another FileSystem object by '/'-joining a FileSystem
object with a string.
Parameters
----------
rest : str
The rest of the file path.
Returns
-------
FileSystem
The same FileSystem subclass as the original object.
Raises
------
TypeError
`rest` should be a `str`.
"""
if isinstance(rest, str):
cls = type(self)
return cls(os.path.join(self.path, rest))
raise TypeError()
def __lt__(self, other: 'FileSystem') -> bool:
assert isinstance(self, type(other))
return self.path.__lt__(other.path)
def __str__(self) -> str:
return self.path
[docs]class LocalFileSystem(FileSystem):
"""FileSystem wrapper for local files and folders."""
# docstr-coverage:inherited
[docs] def ls(self, recursive: bool = False, files_only: bool = False,
reverse: bool = False, limit: Optional[int] = None
) -> Sequence['LocalFileSystem']:
if recursive is False:
raise NotImplementedError
if files_only is False:
raise NotImplementedError
def _recursive_list():
for parent, _, files in sorted(os.walk(self.path),
key=lambda parent, *_: parent,
reverse=reverse):
for file in sorted(files, reverse=reverse):
yield os.path.join(parent, file)
limited_files = itertools.islice(_recursive_list(), limit)
return [LocalFileSystem(path) for path in limited_files]
# docstr-coverage:inherited
[docs] def import_as_python_module(self) -> ModuleType:
module = _import_file_as_python_module(self.path)
return module
# docstr-coverage:inherited
[docs] def isdir(self) -> bool:
return os.path.isdir(self.path)
# docstr-coverage:inherited
[docs] def mkdir(self, *args, **kwargs) -> None:
return Path(self.path).mkdir(*args, **kwargs)
# docstr-coverage:inherited
[docs] def open(self, mode: Literal['r', 'w'], *args, **kwargs) -> IO:
return open(self.path, mode, *args, **kwargs)
[docs]class S3FileSystem(FileSystem):
"""FileSystem wrapper for objects stored in AWS S3."""
LIST_OBJECTS_MAX_KEYS = 1000
def __init__(self, path: Optional[str] = None,
bucket: Optional[str] = None,
prefix_or_key: Optional[str] = None,
client: Optional['boto3.client'] = None):
if boto3 is None:
raise ImportError('S3-backend requires `boto3` module to be'
f' installed ({boto3_import_error})')
if bool(path) == bool(bucket) or bool(path) == bool(prefix_or_key):
raise ValueError(
'Either `path` or `(bucket, prefix_or_key)` should be'
' passed (but not both).'
)
if bucket and prefix_or_key:
final_bucket, final_prefix_or_key = bucket, prefix_or_key
final_path = os.path.join(bucket, prefix_or_key)
elif path:
final_bucket, final_prefix_or_key = split_s3_path(path)
final_path = path
super().__init__(final_path)
self.client = client or boto3.client('s3')
self.bucket = final_bucket
self.prefix_or_key = final_prefix_or_key
# docstr-coverage:inherited
[docs] def ls(self, recursive: bool = False, files_only: bool = False,
reverse: bool = False, limit: Optional[int] = None
) -> Sequence['S3FileSystem']:
if recursive is False:
raise NotImplementedError
if files_only is False:
raise NotImplementedError
# To handle requests containing over 1000 items, we need to
# paginate through the results.
max_keys = self.LIST_OBJECTS_MAX_KEYS
page_iterable = self.client.get_paginator('list_objects_v2').paginate(
Bucket=self.bucket,
Prefix=self.prefix_or_key,
PaginationConfig={'PageSize': max_keys}
)
chained_items: Iterable
if not reverse:
chained_items = (
item['Key']
for page in page_iterable
for item in page.get('Contents', [])
)
else:
# Use deque with maxlen to exhaust the paginator and keep
# only the last pages (those necessary to satisfy the
# limit).
pages_to_keep = (
None if limit is None else (limit+max_keys-2)//max_keys+1
)
last_pages = deque(iter(page_iterable), maxlen=pages_to_keep)
# Worst case scenario: the last page contains only one
# item. Hence, from 2 to 1001, we need at least 2 pages,
# and so on. Thus, the expression `(limit+998)//1000 + 1`
chained_items = (
item['Key']
for page in reversed(last_pages)
for item in reversed(page.get('Contents', []))
)
chained_items = itertools.islice(chained_items, limit)
return [
S3FileSystem(bucket=self.bucket,
prefix_or_key=key,
client=self.client)
for key in chained_items
]
# docstr-coverage:inherited
[docs] def import_as_python_module(self) -> ModuleType:
_, extension = os.path.splitext(self.path)
with NamedTemporaryFile(suffix=extension) as tmp_fp:
self.client.download_file(self.bucket,
self.prefix_or_key,
tmp_fp.name)
module = _import_file_as_python_module(tmp_fp.name)
return module
# docstr-coverage:inherited
[docs] def open(self, mode: Literal['r', 'w'], *args, **kwargs) -> IO:
if mode == 'r':
body = self.client.get_object(Bucket=self.bucket,
Key=self.prefix_or_key)['Body']
return contextlib.closing(body) # type: ignore
elif mode == 'w':
def _writable() -> Generator[IO, None, None]:
with NamedTemporaryFile(mode, *args, **kwargs) as tmp_fp:
yield tmp_fp
tmp_fp.flush()
self.client.upload_file(
tmp_fp.name,
self.bucket,
self.prefix_or_key
)
return contextlib.contextmanager(_writable)() # type: ignore
raise NotImplementedError(f"Mode '{mode}' not supported.")
[docs]def fs_factory(path: str) -> FileSystem:
"""Factory for FileSystem objects.
Parameters
----------
path : str
Local or S3 path.
Returns
-------
FileSystem
Proper FileSystem object for the path provided.
"""
if path.startswith('s3://'):
return S3FileSystem(path)
else:
return LocalFileSystem(path)