import copy
import weakref
from abc import ABC, abstractmethod
from functools import total_ordering
from itertools import chain

import numpy as np

import pysubgroup as ps

[docs]@total_ordering class SelectorBase(ABC): # selector cache __refs__ = weakref.WeakSet() def __new__(cls, *args, **kwargs): """Ensures that each selector only exists once.""" # create temporary selector tmp = super().__new__(cls) tmp.set_descriptions(*args, **kwargs) # save original arguments # NOTE: this is a fix for pickle # so we can call `__getnewargs_ex__` with the right arguments # TODO: this may have unintended side effects if args, # kwargs are large or volatile (I don't think we have that yet though) tmp.__new_args__ = args, kwargs # check if selector is already in cache (__refs__) # if so, return cached instance if tmp in SelectorBase.__refs__: for ref in SelectorBase.__refs__: if ref == tmp: return ref # if not return return tmp def __getnewargs_ex__(self): # pylint: disable=invalid-getnewargs-ex-returned tmp_args = self.__new_args__ del self.__new_args__ return tmp_args def __init__(self): # add selector to cache # TODO: why not do this in `__new__`, # then it would be all together in one function? SelectorBase.__refs__.add(self) def __eq__(self, other): if other is None: return False return repr(self) == repr(other) def __lt__(self, other): return repr(self) < repr(other) def __hash__(self): return self._hash # pylint: disable=no-member
[docs] @abstractmethod def set_descriptions(self, *args, **kwargs): pass # pragma: no-cover
[docs]def get_cover_array_and_size(subgroup, data_len=None, data=None): if hasattr(subgroup, "representation"): cover_arr = subgroup size = subgroup.size_sg elif isinstance(subgroup, slice): cover_arr = subgroup if data_len is None: if type(data).__name__ == "DataFrame": data_len = len(data) else: raise ValueError( "if you pass a slice, you need to pass either data_len or data" ) # size = len(range(*subgroup.indices(data_len))) elif hasattr(subgroup, "__array_interface__"): cover_arr = subgroup type_char = subgroup.__array_interface__["typestr"][1] if type_char == "b": # boolean indexing is used size = np.count_nonzero(cover_arr) elif type_char in ("u", "i"): # integer indexing size = subgroup.__array_interface__["shape"][0] else: raise NotImplementedError( f"Currently a typechar of {type_char} is not supported." ) else: assert type(data).__name__ == "DataFrame" cover_arr = subgroup.covers(data) size = np.count_nonzero(cover_arr) return cover_arr, size
[docs]def get_size(subgroup, data_len=None, data=None): if hasattr(subgroup, "representation"): size = subgroup.size_sg elif isinstance(subgroup, slice): if data_len is None: if type(data).__name__ == "DataFrame": data_len = len(data) else: raise ValueError( "if you pass a slice, you need to pass either data_len or data" ) # size = len(range(*subgroup.indices(data_len))) elif hasattr(subgroup, "__array_interface__"): type_char = subgroup.__array_interface__["typestr"][1] if type_char == "b": # boolean indexing is used size = np.count_nonzero(subgroup) elif type_char == "u" or type_char == "i": # integer indexing size = subgroup.__array_interface__["shape"][0] else: raise NotImplementedError( f"Currently a typechar of {type_char} is not supported." ) else: assert type(data).__name__ == "DataFrame" size = np.count_nonzero(subgroup.covers(data)) return size
[docs]class EqualitySelector(SelectorBase): def __init__(self, attribute_name, attribute_value, selector_name=None): if attribute_name is None: raise TypeError() if attribute_value is None: raise TypeError() # TODO: this is redundant due to `__new__` and `set_descriptions` self._attribute_name = attribute_name self._attribute_value = attribute_value self._selector_name = selector_name self.set_descriptions( self._attribute_name, self._attribute_value, self._selector_name ) super().__init__() @property def attribute_name(self): return self._attribute_name @property def attribute_value(self): return self._attribute_value
[docs] def set_descriptions( self, attribute_name, attribute_value, selector_name=None ): # pylint: disable=arguments-differ self._hash, self._query, self._string = EqualitySelector.compute_descriptions( attribute_name, attribute_value, selector_name=selector_name )
[docs] @classmethod def compute_descriptions(cls, attribute_name, attribute_value, selector_name): if isinstance(attribute_value, (str, bytes)): query = str(attribute_name) + "==" + "'" + str(attribute_value) + "'" elif attribute_value is None: query = str(attribute_name) + " is None" elif np.isnan(attribute_value): query = attribute_name + ".isnull()" else: query = str(attribute_name) + "==" + str(attribute_value) if selector_name is not None: string_ = selector_name else: string_ = query hash_value = hash(query) return (hash_value, query, string_)
def __repr__(self): return self._query
[docs] def covers(self, data): import pandas as pd # pylint: disable=import-outside-toplevel row = data[self.attribute_name].to_numpy() if pd.isnull(self.attribute_value): return pd.isnull(row) return row == self.attribute_value
def __str__(self, open_brackets="", closing_brackets=""): return open_brackets + self._string + closing_brackets @property def selectors(self): return (self,)
[docs] @staticmethod def from_str(s): s = s.strip() attribute_name, attribute_value = s.split("==") if attribute_value[0] == "'" and attribute_value[-1] == "'": if attribute_value.startswith("'b'") and attribute_value.endswith("''"): attribute_value = str.encode(attribute_value[3:-2]) else: attribute_value = attribute_value[1:-1] try: attribute_value = int(attribute_value) except ValueError: try: attribute_value = float(attribute_value) except ValueError: pass return ps.EqualitySelector(attribute_name, attribute_value)
[docs]class NegatedSelector(SelectorBase): def __init__(self, selector): # TODO: this is redundant due to `__new__` and `set_descriptions` self._selector = selector self.set_descriptions(selector) super().__init__()
[docs] def covers(self, data_instance): return np.logical_not(self._selector.covers(data_instance))
def __repr__(self): return self._query def __str__(self, open_brackets="", closing_brackets=""): return "NOT " + self._selector.__str__(open_brackets, closing_brackets)
[docs] def set_descriptions(self, selector): # pylint: disable=arguments-differ self._query = "(not " + repr(selector) + ")" self._hash = hash(repr(self))
@property def attribute_name(self): return self._selector.attribute_name @property def selectors(self): return (self,)
# Including the lower bound, excluding the upper_bound
[docs]class IntervalSelector(SelectorBase): def __init__(self, attribute_name, lower_bound, upper_bound, selector_name=None): assert lower_bound < upper_bound # TODO: this is redundant due to `__new__` and `set_descriptions` self._attribute_name = attribute_name self._lower_bound = lower_bound self._upper_bound = upper_bound self.selector_name = selector_name self.set_descriptions(attribute_name, lower_bound, upper_bound, selector_name) super().__init__() @property def attribute_name(self): return self._attribute_name @property def lower_bound(self): return self._lower_bound @property def upper_bound(self): return self._upper_bound
[docs] def covers(self, data_instance): val = data_instance[self.attribute_name].to_numpy() return np.logical_and((val >= self.lower_bound), (val < self.upper_bound))
def __repr__(self): return self._query def __hash__(self): return self._hash def __str__(self): return self._string
[docs] @classmethod def compute_descriptions( cls, attribute_name, lower_bound, upper_bound, selector_name=None ): if selector_name is None: _string = cls.compute_string( attribute_name, lower_bound, upper_bound, rounding_digits=2 ) else: _string = selector_name _query = cls.compute_string( attribute_name, lower_bound, upper_bound, rounding_digits=None ) _hash = hash(_query) return (_hash, _query, _string)
[docs] def set_descriptions( self, attribute_name, lower_bound, upper_bound, selector_name=None ): # pylint: disable=arguments-differ self._hash, self._query, self._string = IntervalSelector.compute_descriptions( attribute_name, lower_bound, upper_bound, selector_name=selector_name )
[docs] @classmethod def compute_string(cls, attribute_name, lower_bound, upper_bound, rounding_digits): if rounding_digits is None: formatter = "{}" else: formatter = "{0:." + str(rounding_digits) + "f}" ub = upper_bound lb = lower_bound if ub % 1: ub = formatter.format(ub) if lb % 1: lb = formatter.format(lb) if lower_bound == float("-inf") and upper_bound == float("inf"): repre = attribute_name + " = anything" elif lower_bound == float("-inf"): repre = attribute_name + "<" + str(ub) elif upper_bound == float("inf"): repre = attribute_name + ">=" + str(lb) else: repre = attribute_name + ": [" + str(lb) + ":" + str(ub) + "[" return repre
[docs] @staticmethod def from_str(s): s = s.strip() if s.endswith(" = anything"): return IntervalSelector( s[: -len(" = anything")], float("-inf"), float("+inf") ) if "<" in s: attribute_name, ub = s.split("<") try: return IntervalSelector(attribute_name.strip(), float("-inf"), int(ub)) except ValueError: return IntervalSelector( attribute_name.strip(), float("-inf"), float(ub) ) if ">=" in s: attribute_name, lb = s.split(">=") try: return IntervalSelector(attribute_name.strip(), int(lb), float("inf")) except ValueError: return IntervalSelector(attribute_name.strip(), float(lb), float("inf")) if s.count(":") == 2: attribute_name, lb, ub = s.split(":") lb = lb.strip()[1:] ub = ub.strip()[:-1] try: return IntervalSelector(attribute_name.strip(), int(lb), int(ub)) except ValueError: return IntervalSelector(attribute_name.strip(), float(lb), float(ub)) else: raise ValueError(f"string {s} could not be converted to IntervalSelector")
@property def selectors(self): return (self,)
[docs]def create_selectors(data, nbins=5, intervals_only=True, ignore=None): if ignore is None: ignore = [] sels = create_nominal_selectors(data, ignore) sels.extend(create_numeric_selectors(data, nbins, intervals_only, ignore=ignore)) return sels
[docs]def create_nominal_selectors(data, ignore=None): if ignore is None: ignore = [] nominal_selectors = [] # for attr_name in [ # x for x in data.select_dtypes(exclude=['number']).columns.values # if x not in ignore]: # nominal_selectors.extend( # create_nominal_selectors_for_attribute(data, attr_name)) nominal_dtypes = data.select_dtypes(exclude=["number"]) dtypes = data.dtypes # print(dtypes) for attr_name in [x for x in nominal_dtypes.columns.values if x not in ignore]: nominal_selectors.extend( create_nominal_selectors_for_attribute(data, attr_name, dtypes) ) return nominal_selectors
[docs]def create_nominal_selectors_for_attribute(data, attribute_name, dtypes=None): import pandas as pd # pylint: disable=import-outside-toplevel nominal_selectors = [] for val in pd.unique(data[attribute_name]): nominal_selectors.append(EqualitySelector(attribute_name, val)) # setting the is_bool flag for selector if dtypes is None: dtypes = data.dtypes if dtypes[attribute_name] == "bool": for s in nominal_selectors: s.is_bool = True return nominal_selectors
[docs]def create_numeric_selectors( data, nbins=5, intervals_only=True, weighting_attribute=None, ignore=None ): if ignore is None: ignore = [] # pragma: no cover numeric_selectors = [] for attr_name in [ x for x in data.select_dtypes(include=["number"]).columns.values if x not in ignore ]: numeric_selectors.extend( create_numeric_selectors_for_attribute( data, attr_name, nbins, intervals_only, weighting_attribute ) ) return numeric_selectors
[docs]def create_numeric_selectors_for_attribute( data, attr_name, nbins=5, intervals_only=True, weighting_attribute=None ): numeric_selectors = [] data_not_null = data[data[attr_name].notnull()] uniqueValues = np.unique(data_not_null[attr_name]) if len(data_not_null.index) < len(data.index): numeric_selectors.append(EqualitySelector(attr_name, np.nan)) if len(uniqueValues) <= nbins: for val in uniqueValues: numeric_selectors.append(EqualitySelector(attr_name, val)) else: cutpoints = ps.equal_frequency_discretization( data, attr_name, nbins, weighting_attribute ) if intervals_only: old_cutpoint = float("-inf") for c in cutpoints: numeric_selectors.append(IntervalSelector(attr_name, old_cutpoint, c)) old_cutpoint = c numeric_selectors.append( IntervalSelector(attr_name, old_cutpoint, float("inf")) ) else: for c in cutpoints: numeric_selectors.append(IntervalSelector(attr_name, c, float("inf"))) numeric_selectors.append(IntervalSelector(attr_name, float("-inf"), c)) return numeric_selectors
[docs]def remove_target_attributes(selectors, target): return [ sel for sel in selectors if sel.attribute_name not in target.get_attributes() ]
############## # Boolean expressions ##############
[docs]class BooleanExpressionBase(ABC): def __or__(self, other): tmp = copy.copy(self) tmp.append_or(other) return tmp def __and__(self, other): tmp = copy.copy(self) tmp.append_and(other) return tmp
[docs] @abstractmethod def append_and(self, to_append): pass
[docs] @abstractmethod def append_or(self, to_append): pass
@abstractmethod def __copy__(self): pass
[docs]@total_ordering class Conjunction(BooleanExpressionBase): def __init__(self, selectors): self._repr = None self._hash = None try: it = iter(selectors) self._selectors = list(it) except TypeError: self._selectors = [selectors]
[docs] def covers(self, instance): # empty description ==> return a list of all '1's if not self._selectors: return np.full(len(instance), True, dtype=bool) # non-empty description return np.all([sel.covers(instance) for sel in self._selectors], axis=0)
def __len__(self): return len(self._selectors) def __str__(self, open_brackets="", closing_brackets="", and_term=" AND "): if not self._selectors: return "Dataset" attrs = sorted(str(sel) for sel in self._selectors) return "".join((open_brackets, and_term.join(attrs), closing_brackets)) def __repr__(self): if self._repr is not None: return self._repr else: self._repr = self._compute_repr() return self._repr def __eq__(self, other): return repr(self) == repr(other) def __lt__(self, other): return repr(self) < repr(other) def __hash__(self): if self._hash is not None: return self._hash else: self._hash = self._compute_hash() return self._hash def _compute_repr(self): if not self._selectors: return "True" reprs = sorted(repr(sel) for sel in self._selectors) return "".join(("(", " and ".join(reprs), ")")) def _compute_hash(self): return hash(repr(self)) def _invalidate_representations(self): self._repr = None self._hash = None
[docs] def append_and(self, to_append): if isinstance(to_append, SelectorBase): self._selectors.append(to_append) elif isinstance(to_append, Conjunction): self._selectors.extend(to_append.selectors) else: self._selectors.extend(to_append) self._invalidate_representations()
[docs] def append_or(self, to_append): raise RuntimeError( "Or operations are not supported by a pure Conjunction. Consider using DNF." )
[docs] def pop_and(self): return self._selectors.pop()
[docs] def pop_or(self): raise RuntimeError( "Or operations are not supported by a pure Conjunction. Consider using DNF." )
def __copy__(self): cls = self.__class__ result = cls.__new__(cls) result.__dict__.update(self.__dict__) result._selectors = list(self._selectors) return result @property def depth(self): return len(self._selectors) @property def selectors(self): return tuple(chain.from_iterable(sel.selectors for sel in self._selectors))
[docs] @staticmethod def from_str(s): if s.strip() == "Dataset": return Conjunction([]) selector_strings = s.split(" AND ") selectors = [] for selector_string in selector_strings: selector_string = selector_string.strip() if "==" in selector_string: selectors.append(EqualitySelector.from_str(selector_string)) else: selectors.append(IntervalSelector.from_str(selector_string)) return Conjunction(selectors)
[docs]@total_ordering class Disjunction(BooleanExpressionBase): def __init__(self, selectors=None): if isinstance(selectors, (list, tuple)): self._selectors = selectors elif selectors is None: self._selectors = [] else: self._selectors = [selectors]
[docs] def covers(self, instance): # empty description ==> return a list of all '1's if not self._selectors: return np.full(len(instance), False, dtype=bool) # non-empty description return np.any([sel.covers(instance) for sel in self._selectors], axis=0)
def __len__(self): return len(self._selectors) def __str__(self, open_brackets="", closing_brackets="", or_term=" OR "): if not self._selectors: return "Empty" # pragma: no cover attrs = sorted(str(sel) for sel in self._selectors) return "".join((open_brackets, or_term.join(attrs), closing_brackets)) def __repr__(self): if not self._selectors: return "True" reprs = sorted(repr(sel) for sel in self._selectors) return "".join(("(", " or ".join(reprs), ")")) def __eq__(self, other): return repr(self) == repr(other) def __lt__(self, other): return repr(self) < repr(other) def __hash__(self): return hash(repr(self))
[docs] def append_and(self, to_append): raise RuntimeError( "And operations are not supported by a pure Conjunction. " "Consider using DNF." )
[docs] def append_or(self, to_append): if isinstance(to_append, Disjunction): self._selectors.extend(to_append.selectors) return try: self._selectors.extend(to_append) except TypeError: self._selectors.append(to_append)
def __copy__(self): cls = self.__class__ result = cls.__new__(cls) result.__dict__.update(self.__dict__) result._selectors = copy.copy(self._selectors) return result @property def selectors(self): return tuple(chain.from_iterable(sel.selectors for sel in self._selectors))
[docs]class DNF(Disjunction): def __init__(self, selectors=None): if selectors is None: selectors = [] super().__init__([]) self.append_or(selectors) @staticmethod def _ensure_pure_conjunction(to_append): if isinstance(to_append, Conjunction): return to_append elif isinstance(to_append, SelectorBase): return Conjunction(to_append) else: it = iter(to_append) if all(isinstance(sel, SelectorBase) for sel in to_append): return Conjunction(it) else: raise ValueError( "DNFs only accept an iterable of Selectors" ) # pragma: no cover
[docs] def append_or(self, to_append): if isinstance(to_append, ps.Disjunction): to_append = to_append.selectors try: it = iter(to_append) conjunctions = [DNF._ensure_pure_conjunction(part) for part in it] except TypeError: conjunctions = DNF._ensure_pure_conjunction(to_append) super().append_or(conjunctions)
[docs] def append_and(self, to_append): if isinstance(to_append, Disjunction): raise NotImplementedError( "Appeding a disjunction to DNF is not implemented" ) conj = DNF._ensure_pure_conjunction(to_append) if len(self._selectors) > 0: for conjunction in self._selectors: conjunction.append_and(conj) else: self._selectors.append(conj)
[docs] def pop_and(self): out_list = [s.pop_and() for s in self._selectors] return_val = out_list[0] if all(x == return_val for x in out_list): return return_val else: for to_append, conj in zip(out_list, self._selectors): conj.append_and(to_append) raise RuntimeError("pop_and failed as the result was inconsistent")