Source code for deirokay.statements.builtin.column_expression

"""
Statement to evaluate mathematical expressions againt a set of
columns from a given scope.
"""
import re
from decimal import Decimal
from functools import reduce
from typing import List, Tuple

import dask  # lazy module
import dask.dataframe  # lazy module
import numpy  # lazy module
import pandas  # lazy module

from deirokay.enums import Backend

from ..multibackend import report
from .base_statement import BaseStatement


[docs]class ColumnExpression(BaseStatement): """ Evaluates an expression (or a list of expressions) involving the scope columns, using `numpy.eval()`. The statement passes only if all expressions evaluate to `True`. The columns in the scope must be ideally of the same dtype. This statement supports the following dtypes: `string`, `integer`, `float` and `decimal`. The available parameters for this statement are: * `expressions` (required): an expression (or a `list` of expressions) to be evaluated. The valid operators are: `==`, `!=`, `=~`, `>=`, `<=`, `>` and `<`. * `at_least_%`: the minimum percentage of valid rows. Default: 100. * `at_most_%`: the maximum percentage of valid rows. Default: 100. * `rtol`: the relative tolerance for float evaluations (when using the `=~` operator). Default: 1e-5. * `atol`: the absolute tolerance for float evaluations (when using the `=~` operator). Default: 1e-8. Examples -------- In the example below, in JSON format, we test whether or not the values of the `a` column are equal to the values of the `b` column. Similarly, we test whether or not the values of the `b` column are greater than the values of the `c` column: .. code-block:: json { "scope": ["a", "b", "c"], "statements": [ { "type": "column_expression", "expressions": ["a == b", "a < c"], "at_least_%": 50.0 } ] } For float comparisons, you may prefer using the `rtol` or `atol` parameters, in addition to the `=~` operator. For example, if you want to test whether or not the values of the `a` column are equal to the values of the `b` column with a relative tolerance of 1e-3, you can use the following JSON: .. code-block:: json { "scope": ["a", "b"], "statements": [ { "type": "column_expression", "expressions": "a =~ b", "rtol": 1e-3 } ] } """ name = 'column_expression' expected_parameters = [ 'expressions', 'at_least_%', 'at_most_%', 'rtol', 'atol' ] supported_backends: List[Backend] = [Backend.PANDAS, Backend.DASK] VALID_OPERATORS = '|'.join(['==', '!=', '=~', '>=', '<=', '>', '<']) def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.expressions = self.options['expressions'] self.at_least_perc = self.options.get('at_least_%', 100.0) self.at_most_perc = self.options.get('at_most_%', 100.0) self.rtol = self.options.get('rtol', 1e-5) self.atol = self.options.get('atol', 1e-8) if not isinstance(self.expressions, list): self.expressions = [self.expressions] def _convert_df_dtypes(self, df: 'pandas.DataFrame') -> 'pandas.DataFrame': """ Fixes DataFrame dtypes. If Int64Dtype() or Float64Dtype(), converts to traditional int64 and float64 dtypes. If object dtype, we apply a more accurate check on the column type, verifying the dtype of all its cells. When a pandas version corrects this bug, we can delete this method. """ pandas_dtypes_int = [pandas.Int64Dtype(), pandas.Int32Dtype()] pandas_dtypes_float = [pandas.Float64Dtype(), pandas.Float32Dtype()] pandas_dtypes_decimal = [Decimal] def _fix_column(column: pandas.Series): if column.dtype in pandas_dtypes_int: column = column.astype(int) elif column.dtype in pandas_dtypes_float: column = column.astype(float) elif column.dtype == object: if len(column.map(type).drop_duplicates()) > 1: raise Exception('Mixed types') if type(column[0]) in pandas_dtypes_decimal: column = column.astype(float) return column return df.apply(_fix_column) def _eval(self, df: 'pandas.DataFrame', expr: str) -> 'pandas.Series': """ Accomplishes the paper of `pandas.eval` when we have the `=~` comparison to evaluate. That implementation is done by using `numpy.eval`. """ if '=~' not in expr: return df.eval(expr) expr_terms = re.split(ColumnExpression.VALID_OPERATORS, expr) expr_operators = re.findall(ColumnExpression.VALID_OPERATORS, expr) if len(expr_terms) != len(expr_operators) + 1: raise SyntaxError( 'Invalid expression. Incoherent number of expressions and' ' comparison operators' ) def _eval_part(term_1, operator, term_2): if operator == '=~': return pandas.Series( numpy.isclose( df.eval(term_1), df.eval(term_2), atol=self.atol, rtol=self.rtol ) ) return df.eval(term_1 + operator + term_2) return reduce( lambda x, y: x & y, ( _eval_part(term_1, operator, term_2) for term_1, operator, term_2 in zip(expr_terms, expr_operators, expr_terms[1:]) ) ) def _generate_report(self, summary: List[Tuple[int, int]], nrows: int) -> dict: expressions_report = [ { 'expression': expr, 'valid_rows': valid, 'valid_rows_%': 100.0*valid/nrows, 'invalid_rows': invalid, 'invalid_rows_%': 100.0*invalid/nrows } for expr, (valid, invalid) in zip(self.expressions, summary) ] return { 'column_expressions': expressions_report } @report(Backend.PANDAS) def _report_pandas(self, df: 'pandas.DataFrame') -> dict: df = self._convert_df_dtypes(df) results = ( self._eval(df, expr) for expr in self.expressions ) summary = ((sum(result), sum(~result)) for result in results) return self._generate_report(summary, len(df)) @report(Backend.DASK) def _report_dask(self, df: 'dask.dataframe.DataFrame') -> dict: df = df.map_partitions(self._convert_df_dtypes, meta=dict(df.dtypes.iteritems())) results = ( dask.dataframe.from_delayed( dask.delayed(self._eval)(partition, expr) for partition in df.to_delayed() ) for expr in self.expressions ) summary = ((sum(result), sum(~result)) for result in results) return self._generate_report(summary, len(df)) # docstr-coverage:inherited
[docs] def result(self, report: dict) -> bool: for item in report['column_expressions']: if not item['valid_rows_%'] >= self.at_least_perc: return False if not item['valid_rows_%'] <= self.at_most_perc: return False return True