import logging
import warnings
from datetime import datetime
from typing import Optional, Union
from airflow.exceptions import AirflowSkipException
from airflow.models.baseoperator import BaseOperator
import deirokay
from deirokay._typing import (DeirokayDataSource, DeirokayOptionsDocument,
DeirokayValidationDocument)
from deirokay.enums import Backend, SeverityLevel
from deirokay.exceptions import ValidationError
from deirokay.validator import raise_validation
logger = logging.getLogger(__name__)
[docs]class DeirokayOperator(BaseOperator):
"""Parse a DataFrame from a file using Deirokay options and
validate against a Deirokay Validation Document.
You may choose different severity levels to mark a task as
"soft failure" (`skipped` state) or "normal failure" (`failed`
state).
Parameters
----------
data : Optional[str]
File to be parsed into Deirokay.
path_to_file : Optional[str]
(Deprecated) File to be parsed into Deirokay.
Use `data` instead.
options : Union[dict, str]
A dict or a local/S3 path to a YAML/JSON options file.
against : Union[dict, str]
A dict or a local/S3 path to a YAML/JSON validation
document file.
backend : Optional[Backend]
Backend to use when processing data.
template : Optional[dict]
Map of templates to be passed to Deirokay validation.
save_to : Optional[str], optional
Where validation logs will be saved to.
If None, no log is saved. By default None.
soft_fail_level : Union[SeverityLevel, int]
Minimum Deirokay severity level to trigger a
"soft failure".
Any statement with lower severity level will only raise a
warning. Set to `None` to never trigger.
By default SeverityLevel.MINIMAL (1).
hard_fail_level : Union[SeverityLevel, int]
Minimum Deirokay severity level to trigger a task failure.
Set to `None` to never trigger.
By default SeverityLevel.CRITICAL (5).
reader_kwargs : Optional[dict]
Additional keyword arguments for `Deirokay.data_reader` method.
validator_kwargs : Optional[dict]
Additional keyword arguments for `Deirokay.validate` method.
**kwargs : Optional[dict]
Additional keyword arguments for `BaseOperator`.
"""
template_fields = [
'data',
'options',
'against',
'template',
'save_to',
'reader_kwargs',
'validator_kwargs'
]
template_fields_renderers = {'options': 'json', 'against': 'json'}
ui_color = '#59f75e'
def __init__(
self,
*,
data: Optional[Union[str, DeirokayDataSource]] = None,
# `path_to_file` is Deprecated
path_to_file: Optional[Union[str, DeirokayDataSource]] = None,
options: Union[str, DeirokayOptionsDocument],
against: Union[str, DeirokayValidationDocument],
backend: Optional[Backend] = None,
template: Optional[dict] = None,
save_to: Optional[str] = None,
soft_fail_level: Union[SeverityLevel, int] = SeverityLevel.MINIMAL,
hard_fail_level: Union[SeverityLevel, int] = SeverityLevel.CRITICAL,
reader_kwargs: Optional[dict] = None,
validator_kwargs: Optional[dict] = None,
**kwargs
):
super().__init__(**kwargs)
assert bool(data) is not bool(path_to_file), (
'Declare either `data` or `path_to_file`, but not both.'
)
if path_to_file:
warnings.warn(
'The argument `path_to_file` is deprecated and will be'
' removed in next major release. Use `data` instead.',
DeprecationWarning
)
assert options, 'You should provide an `options` attribute.'
assert against, 'You should provide an `against` attribute.'
self.data = data or path_to_file
self.options = options
self.against = against
self.backend = backend
self.template = template
self.save_to = save_to
self.soft_fail_level = soft_fail_level
self.hard_fail_level = hard_fail_level
self.reader_kwargs = reader_kwargs or {}
self.validator_kwargs = validator_kwargs or {}
# docstr-coverage:inherited
[docs] def execute(self, context: dict):
current_date = datetime.strptime(context['ts_nodash'], '%Y%m%dT%H%M%S')
df = deirokay.data_reader(
self.data,
options=self.options,
backend=self.backend,
**self.reader_kwargs
)
validation_document = deirokay.validate(
df,
against=self.against,
template=self.template,
save_to=self.save_to,
current_date=current_date,
raise_exception=False,
**self.validator_kwargs
)
try:
raise_validation(validation_document, SeverityLevel.MINIMAL)
except ValidationError as e:
if (
self.hard_fail_level is not None and
e.level >= self.hard_fail_level
):
raise e
if (
self.soft_fail_level is not None and
e.level >= self.soft_fail_level
):
raise AirflowSkipException from e