Source code for deirokay.statements.builtin.contain

"""
Statement to check the presence (or absence) of values in a scope.
"""
import warnings
from typing import List

import dask.dataframe  # lazy module
import numpy  # lazy module
import pandas  # lazy module

from deirokay._typing import DeirokayStatement
from deirokay._utils import noneor
from deirokay.enums import Backend
from deirokay.parser import get_dtype_treater, get_treater_instance
from deirokay.parser.loader import data_treater

from ..multibackend import profile, report
from .base_statement import BaseStatement

NODEFAULT = object()


[docs]class Contain(BaseStatement): """ Checks if the given scope contains specific values. You may also check the number of their occurrences by specifying a minimum and maximum value of frequency. The available parameters for this statement are: * `rule` (required): One of `all`, `only` or `all_and_only`. * `values` (required): A list of values to which the rule applies. * `multicolumn`: A boolean indicating whether the statement should consider each of the `values` as a tuple of multiple columns or a single value. When set to False and evaluated over a scope containing more than one column, the rule will be applied over all the values from the original columns as in a single column. Default: False. * `parser`: The parser (or a list) to be used to parse the `values`. Correspond to the `parser` parameter of the `treater` function (see `deirokay.data_reader` method). Either `parser` or `parsers` must be declared. * `parsers`: An alias for `parser`, recommended when `multicolumn` is set to True. Either `parser` or `parsers` must be declared. * `min_occurrences`: a global minimum number of occurrences for each of the `values`. Default: 1 for `all` and `all_and_only` rules, 0 for `only`. * `max_occurrences`: a global maximum number of occurrences for each of the `values`. Default: `inf` (unbounded). * `occurrences_per_value`: a list of dictionaries overriding the global boundaries. Each dictionary may have the following keys: * `values` (required): a list of values to which the occurrence bounds below must apply to. * `min_occurrences`: a minimum number of occurrences for these values. Default: global `min_occurrences` parameter. * `max_occurrences`: a maximum number of occurrences for these values. Default: global `max_occurrences` parameter. Global parameters apply to all values not present in any of the dictionaries in `occurrences_per_value` (but yet present on the main `values` list). * `report_limit`: if set to a positive integer, limit the number of items generated in the statement report. Default: 32. The `all` rule checks if all the `values` declared are present in the scope (possibly tolerating other values not declared in `values`). Use it when you want to be sure that your data contains at least all the values you declare, also setting `min_occurrences` and `max_occurrences` when necessary. You may also check for "zero" occurrences of a set of values by setting `max_occurrences` to 0. The `only` rule ensures that the `values` are the only possible values in the scope (possibly not containing them at all). Use it when you want to enumerate the admitted values for the scope, as in an enumeration. The `all_and_only` rule checks both if all the `values` declared are present in the scope and if only they are present (not tolerating values not declared). Use it when you know all the possible values for the scope and you are sure that they will be always present. The `min_occurrences` and `max_occurrences` parameters are applied applied to all the `values` declared, and only these. It means you cannot (yet) specify boundaries for values you did't declare. Null values are considered valid for the purpose of the statement evaluation and must be explicitely passed in `values` if you wish to allow them (or not). You may also notice that, by tweaking the expected number of occurrences, you may end up having the very same behaviour regardless the `rule` you choose. In this case, you should go for the rule that semantically matches best your intents, so that your final validation document looks more readable and easy to understand. Examples -------- * You have a table of users containg a column `handedness` only admitting the values: `right-handed`, `left-handed` and `ambidextrous`. You know that some of these values may not appear in the data, but you don't want other values to be present. .. code-block:: json { "scope": "handedness", "statements": [ { "type": "contain", "rule": "only", "values": ["right-handed", "left-handed", "ambidextrous"], "parser": {"dtype": "string"} } ] } * You have a table of servers containg a column `role` which may contain the values `master` and `slave`. You want to be sure that there is always one and only one master server in the data. .. code-block:: json { "scope": "role", "statements": [ { "type": "contain", "rule": "all", "values": ["master"], "parser": {"dtype": "string"}, "min_occurrences": 1, "max_occurrences": 1 } ] } You may also extend the previous example by making some adjustments to ensure that there is no other value than `master` and `role` in the data. Make notice that although the `rule` below is changed to `only`, the statement above is still contemplated by the `occurrences_per_value` parameter in the following validation item: .. code-block:: json { "scope": "role", "statements": [ { "type": "contain", "rule": "only", "values": ["master", "slave"], "parser": {"dtype": "string"}, "occurrences_per_value": [ { "values": ["master"], "min_occurrences": 1, "max_occurrences": 1 } ] } ] } * You have a table of transactions containing details about transactions in all the branches of a company. You expect that there should always be at least one transaction per branch. .. code-block:: json { "scope": ["branch_name"], "statements": [ { "type": "contain", "rule": "all_and_only", "values": [ "Albany", "Utica", "Scranton", "Akron", "Nashua", "Buffalo", "Rochester" ], "parser": {"dtype": "string"} } ] } * You have a table for the logs of user accesses to a website which contains an `IP` column. You want to be sure that blacklisted IPs are not present in the data. The following validation item in YAML format checks for the absense of blacklisted IPs: .. code-block:: yaml scope: IP statements: - type: contain rule: all max_occurrences: 0 values: # blacklisted IPs - 3.48.48.135 - 3.48.48.136 parser: {dtype: string} * You want the pair 'San Diego' and '2022' to appear at most twice in your dataset (for any reason). .. code-block:: yaml scope: [city, year] statements: - type: contain rule: all min_occurrences: 0 max_occurrences: 2 values: - ['San Diego', 2022] parsers: - {dtype: string} - {dtype: integer} """ name = 'contain' expected_parameters = [ 'rule', 'values', 'multicolumn', 'parser', 'parsers', 'min_occurrences', 'max_occurrences', 'occurrences_per_value', 'report_limit', ] supported_backends: List[Backend] = [Backend.PANDAS, Backend.DASK] DEFAULT_MIN_OCCURRENCES = { 'all': (1, 0), 'only': (0, 0), 'all_and_only': (1, 0) } DEFAULT_MAX_OCCURRENCES = { 'all': (numpy.inf, numpy.inf), 'only': (numpy.inf, 0), 'all_and_only': (numpy.inf, 0) } DEFAULT_REPORT_LIMIT = 32 def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.rule = self.options['rule'] assert self.rule in ('all', 'only', 'all_and_only') self.multicolumn = self.options.get('multicolumn', False) _parsers = self.options.get('parser') or self.options['parsers'] if self.multicolumn: self.parsers = _parsers else: self.parsers = [_parsers] self.treaters = [ get_treater_instance(parser, backend=self.get_backend()) for parser in self.parsers ] self.min_occurrences = self.options.get('min_occurrences', None) self.max_occurrences = self.options.get('max_occurrences', None) self.occurrences_per_value = self.options.get( 'occurrences_per_value', [] ) self.report_limit = self.options.get('report_limit', NODEFAULT) self._set_default_minmax_occurrences() def _set_default_minmax_occurrences(self) -> None: final_min = noneor(self.min_occurrences, Contain.DEFAULT_MIN_OCCURRENCES[self.rule][0]) final_max = noneor(self.max_occurrences, Contain.DEFAULT_MAX_OCCURRENCES[self.rule][0]) if ( self.max_occurrences is not None and self.max_occurrences < final_min ): final_min = self.max_occurrences if ( self.min_occurrences is not None and self.min_occurrences > final_max ): final_max = self.min_occurrences assert final_min >= 0 assert final_max >= 0 self.min_occurrences = final_min self.max_occurrences = final_max def _generate_analysis(self, value_counts): if self.multicolumn: def _unpack_row(row, *args): return (*row, *args) else: def _unpack_row(row, *args): return (row, *args) listed_values = [ _unpack_row( row, self.min_occurrences, self.max_occurrences ) for row in self.options['values'] ] occurences_per_value = [ _unpack_row( row, noneor(item.get('min_occurrences'), self.min_occurrences), noneor(item.get('max_occurrences'), self.max_occurrences), ) for item in self.occurrences_per_value for row in item['values'] ] occurrence_limits = pandas.DataFrame( occurences_per_value + listed_values, columns=value_counts.index.names + ['min', 'max'] ) options = { col: parser for col, parser in zip(value_counts.index.names, self.parsers) } data_treater(occurrence_limits, options, backend=Backend.PANDAS) occurrence_limits.drop_duplicates( subset=value_counts.index.names, keep='first', inplace=True ) occurrence_limits.set_index(value_counts.index.names, inplace=True) analysis = value_counts.to_frame().reset_index().merge( occurrence_limits.reset_index(), how='outer' ) analysis['count'].fillna(0, inplace=True) analysis['min'].fillna(Contain.DEFAULT_MIN_OCCURRENCES[self.rule][1], inplace=True) analysis['max'].fillna(Contain.DEFAULT_MAX_OCCURRENCES[self.rule][1], inplace=True) analysis['result'] = ( analysis['count'].ge(analysis['min']) & analysis['count'].le(analysis['max']) ) return analysis def _generate_report(self, analysis): columns = [analysis[col] for col in analysis.columns[:-4]] serialized = ( treater.serialize(column) for treater, column in zip(self.treaters, columns) ) rows = zip(*(s['values'] for s in serialized)) values_report = sorted([ { 'value': value_row, 'count': analysis_row.count, 'result': analysis_row.result, } for value_row, analysis_row in zip(rows, analysis.itertuples()) ], key=lambda x: x['result']) if ( self.report_limit is NODEFAULT and len(values_report) > Contain.DEFAULT_REPORT_LIMIT ): self.report_limit = Contain.DEFAULT_REPORT_LIMIT warnings.warn( "The 'contain' statement's report size was automatically" f' truncated to {Contain.DEFAULT_REPORT_LIMIT} items to' ' prevent unexpectedly long logs.\n' 'If you wish to set a different' ' size limit or even not set a limit at all (None),' ' please declare the `report_limit` parameter explicitely.', Warning ) return { 'values': ( values_report if self.report_limit is NODEFAULT else values_report if self.report_limit is None else values_report[:self.report_limit] ) } @report(Backend.PANDAS) def _report_pandas(self, df: 'pandas.DataFrame') -> dict: # Concat all columns _cols = df.columns.tolist() if not self.multicolumn: # Columns are assumed to be of same Dtype df = pandas.concat([df[col] for col in _cols]).to_frame() value_counts = ( df.groupby(_cols, dropna=False)[_cols[0]].size() .rename('count') ) analysis = self._generate_analysis(value_counts) return self._generate_report(analysis) @report(Backend.DASK) def _report_dask(self, df: 'dask.dataframe.DataFrame') -> dict: # Concat all columns _cols = df.columns.tolist() if not self.multicolumn: # Columns are assumed to be of same Dtype df = dask.dataframe.concat([df[col] for col in _cols]).to_frame() value_counts = ( df.groupby(_cols, dropna=False)[_cols[0]].size() .rename('count') ) analysis = self._generate_analysis(value_counts.compute()) return self._generate_report(analysis) # docstr-coverage:inherited
[docs] def result(self, report: dict) -> bool: return all( item['result'] for item in report['values'] )
@profile(Backend.PANDAS) @staticmethod def _profile_pandas(df: 'pandas.DataFrame') -> DeirokayStatement: if any(dtype != df.dtypes for dtype in df.dtypes): raise NotImplementedError( "Refusing to mix up different types of columns" ) series = pandas.concat(df[col] for col in df.columns) unique_series = series.drop_duplicates().dropna() if len(unique_series) > 20: raise NotImplementedError("Won't generate too long statements!") value_frequency = series.value_counts() min_occurrences = int(value_frequency.min()) statement_template = { 'type': 'contain', 'rule': 'all' } # type: DeirokayStatement # Get most common type to infer treater try: statement_template.update( get_dtype_treater(unique_series.map(type).mode()[0]) .attach_backend(Backend.PANDAS) .serialize(unique_series) # type: ignore ) except TypeError: raise NotImplementedError("Can't handle mixed types") # Sort allowing `None` values, which will appear last statement_template['values'].sort(key=lambda x: (x is None, x)) if min_occurrences != 1: statement_template['min_occurrences'] = min_occurrences return statement_template @profile(Backend.DASK) @staticmethod def _profile_dask(df: 'dask.dataframe.DataFrame') -> DeirokayStatement: if any(dtype != df.dtypes for dtype in df.dtypes): raise NotImplementedError( "Refusing to mix up different types of columns" ) series = dask.dataframe.concat([df[col] for col in df.columns]) unique_series = series.drop_duplicates().dropna() if len(unique_series) > 20: raise NotImplementedError("Won't generate too long statements!") value_frequency = series.value_counts() min_occurrences = int(value_frequency.min().compute()) statement_template = { 'type': 'contain', 'rule': 'all' } # type: DeirokayStatement # Get most common type to infer treater try: statement_template.update( get_dtype_treater(unique_series.map(type).mode().compute()[0]) .attach_backend(Backend.DASK) .serialize(unique_series) # type: ignore ) except TypeError: raise NotImplementedError("Can't handle mixed types") # Sort allowing `None` values, which will appear last statement_template['values'].sort(key=lambda x: (x is None, x)) if min_occurrences != 1: statement_template['min_occurrences'] = min_occurrences return statement_template