Source code for pysubgroup.binary_target

"""
Created on 29.09.2017

@author: lemmerfn
"""
from collections import namedtuple
from functools import total_ordering

import numpy as np

from pysubgroup.measures import (
    AbstractInterestingnessMeasure,
    BoundedInterestingnessMeasure,
    GeneralizationAwareQF_stats,
)

from .subgroup_description import EqualitySelector, get_cover_array_and_size
from .utils import BaseTarget, derive_effective_sample_size


[docs]@total_ordering class BinaryTarget(BaseTarget): statistic_types = ( "size_sg", "size_dataset", "positives_sg", "positives_dataset", "size_complement", "relative_size_sg", "relative_size_complement", "coverage_sg", "coverage_complement", "target_share_sg", "target_share_complement", "target_share_dataset", "lift", ) def __init__(self, target_attribute=None, target_value=None, target_selector=None): """ Creates a new target for the boolean model class (classic subgroup discovery). If target_attribute and target_value are given, the target_selector is computed using attribute and value """ if target_attribute is not None and target_value is not None: if target_selector is not None: raise ValueError( "BinaryTarget is to be constructed" "EITHER by a selector OR by attribute/value pair" ) target_selector = EqualitySelector(target_attribute, target_value) if target_selector is None: raise ValueError("No target selector given") self.target_selector = target_selector def __repr__(self): return "T: " + str(self.target_selector) def __eq__(self, other): return self.__dict__ == other.__dict__ def __lt__(self, other): return str(self) < str(other)
[docs] def covers(self, instance): return self.target_selector.covers(instance)
[docs] def get_attributes(self): return (self.target_selector.attribute_name,)
[docs] def get_base_statistics(self, subgroup, data): cover_arr, size_sg = get_cover_array_and_size(subgroup, len(data), data) positives = self.covers(data) instances_subgroup = size_sg positives_dataset = np.sum(positives) instances_dataset = len(data) positives_subgroup = np.sum(positives[cover_arr]) return ( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, )
[docs] def calculate_statistics(self, subgroup, data, cached_statistics=None): if self.all_statistics_present(cached_statistics): return cached_statistics ( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, ) = self.get_base_statistics(subgroup, data) statistics = {} statistics["size_sg"] = instances_subgroup statistics["size_dataset"] = instances_dataset statistics["positives_sg"] = positives_subgroup statistics["positives_dataset"] = positives_dataset statistics["size_complement"] = instances_dataset - instances_subgroup statistics["relative_size_sg"] = instances_subgroup / instances_dataset statistics["relative_size_complement"] = ( instances_dataset - instances_subgroup ) / instances_dataset statistics["coverage_sg"] = positives_subgroup / positives_dataset statistics["coverage_complement"] = ( positives_dataset - positives_subgroup ) / positives_dataset statistics["target_share_sg"] = positives_subgroup / instances_subgroup if instances_dataset == instances_subgroup: statistics["target_share_complement"] = float("nan") else: statistics["target_share_complement"] = ( positives_dataset - positives_subgroup ) / (instances_dataset - instances_subgroup) statistics["target_share_dataset"] = positives_dataset / instances_dataset statistics["lift"] = ( statistics["target_share_sg"] / statistics["target_share_dataset"] ) return statistics
[docs]class SimplePositivesQF( AbstractInterestingnessMeasure ): # pylint: disable=abstract-method tpl = namedtuple("PositivesQF_parameters", ("size_sg", "positives_count")) def __init__(self): self.dataset_statistics = None self.positives = None self.has_constant_statistics = False self.required_stat_attrs = ("size_sg", "positives_count")
[docs] def calculate_constant_statistics(self, data, target): assert isinstance(target, BinaryTarget) self.positives = target.covers(data) self.dataset_statistics = SimplePositivesQF.tpl( len(data), np.sum(self.positives) ) self.has_constant_statistics = True
[docs] def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument cover_arr, size_sg = get_cover_array_and_size( subgroup, len(self.positives), data ) return SimplePositivesQF.tpl( size_sg, np.count_nonzero(self.positives[cover_arr]) )
# <<< GpGrowth >>>
[docs] def gp_get_stats(self, row_index): return np.array([1, self.positives[row_index]], dtype=int)
[docs] def gp_get_null_vector(self): return np.zeros(2)
[docs] def gp_merge(self, left, right): left += right
[docs] def gp_get_params(self, _cover_arr, v): return SimplePositivesQF.tpl(v[0], v[1])
[docs] def gp_to_str(self, stats): return " ".join(map(str, stats))
[docs] def gp_size_sg(self, stats): return stats[0]
@property def gp_requires_cover_arr(self): return False
# TODO Make ChiSquared useful for real nominal data not just binary # Introduce Enum for direction # Maybe it is possible to give a optimistic estimate for ChiSquared
[docs]class ChiSquaredQF(SimplePositivesQF): # pragma: no cover """ ChiSquaredQF which test for statistical independence of a subgroup against it's complement ... """
[docs] @staticmethod def chi_squared_qf( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, min_instances=5, bidirect=True, direction_positive=True, index=0, ): """ Performs chi2 test of statistical independence Test whether a subgroup is statistically independent from it's complement (see scipy.stats.chi2_contingency). Parameters ---------- instances_dataset, positives_dataset, instances_subgroup, positives_subgroup : int counts of subgroup and dataset min_instances : int, optional number of required instances, if less -inf is returned for that subgroup bidirect : bool, optional If true both directions are considered interesting else direction_positive decides which direction is interesting direction_positive: bool, optional Only used if bidirect=False; specifies whether you are interested in positive (True) or negative deviations index : {0, 1}, optional decides whether the test statistic (0) or the p-value (1) should be used """ import scipy.stats # pylint:disable=import-outside-toplevel if (instances_subgroup < min_instances) or ( (instances_dataset - instances_subgroup) < min_instances ): return float("-inf") negatives_subgroup = instances_subgroup - positives_subgroup negatives_dataset = instances_dataset - positives_dataset negatives_complement = negatives_dataset - negatives_subgroup positives_complement = positives_dataset - positives_subgroup val = scipy.stats.chi2_contingency( [ [positives_subgroup, positives_complement], [negatives_subgroup, negatives_complement], ], correction=False, )[index] if bidirect: return val p_subgroup = positives_subgroup / instances_subgroup p_dataset = positives_dataset / instances_dataset if direction_positive and p_subgroup > p_dataset: return val if not direction_positive and p_subgroup < p_dataset: return val return -val
[docs] @staticmethod def chi_squared_qf_weighted( subgroup, data, weighting_attribute, effective_sample_size=0, min_instances=5, ): import scipy.stats # pylint:disable=import-outside-toplevel ( instancesDataset, positivesDataset, instancesSubgroup, positivesSubgroup, ) = subgroup.get_base_statistics(data, weighting_attribute) if (instancesSubgroup < min_instances) or ( (instancesDataset - instancesSubgroup) < 5 ): return float("inf") if effective_sample_size == 0: effective_sample_size = derive_effective_sample_size( data[weighting_attribute] ) # p_subgroup = positivesSubgroup / instancesSubgroup # p_dataset = positivesDataset / instancesDataset negatives_subgroup = instancesSubgroup - positivesSubgroup negatives_dataset = instancesDataset - positivesDataset positives_complement = positivesDataset - positivesSubgroup negatives_complement = negatives_dataset - negatives_subgroup val = scipy.stats.chi2_contingency( [ [positivesSubgroup, positives_complement], [negatives_subgroup, negatives_complement], ], correction=True, )[0] return scipy.stats.chi2.sf(val * effective_sample_size / instancesDataset, 1)
def __init__(self, direction="both", min_instances=5, stat="chi2"): """ Parameters ---------- direction : {'both', 'positive', 'negative'} direction of deviation that is of interest min_instances : int, optional number of required instances, if less -inf is returned for that subgroup stat : {'chi2', 'p'} whether to report the test statistic or the p-value (see scipy.stats.chi2_contingency) """ if direction == "both": self.bidirect = True self.direction_positive = True if direction == "positive": self.bidirect = False self.direction_positive = True if direction == "negative": self.bidirect = False self.direction_positive = False self.min_instances = min_instances self.index = {"chi2": 0, "p": 1}[stat] super().__init__()
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return ChiSquaredQF.chi_squared_qf( dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, self.min_instances, self.bidirect, self.direction_positive, self.index, )
[docs]class StandardQF(SimplePositivesQF, BoundedInterestingnessMeasure): """ StandardQF which weights the relative size against the difference in averages The StandardQF is a general form of quality function which for different values of a is order equivalen to many popular quality measures. Attributes ---------- a : float used as an exponent to scale the relative size to the difference in averages """
[docs] @staticmethod def standard_qf( a, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup ): if not hasattr(instances_subgroup, "__array_interface__") and ( instances_subgroup == 0 ): return np.nan p_subgroup = np.divide(positives_subgroup, instances_subgroup) # if instances_subgroup == 0: # return 0 # p_subgroup = positives_subgroup / instances_subgroup p_dataset = positives_dataset / instances_dataset return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset)
def __init__(self, a): """ Parameters ---------- a : float exponent to trade-off the relative size with the difference in means """ self.a = a super().__init__()
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, )
[docs] def optimistic_estimate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.positives_count, statistics.positives_count, )
[docs] def optimistic_generalisation(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics pos_remaining = dataset.positives_count - statistics.positives_count return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg + pos_remaining, dataset.positives_count, )
[docs]class LiftQF(StandardQF): """ Lift Quality Function LiftQF is a StandardQF with a=0. Thus it treats the difference in ratios as the quality without caring about the relative size of a subgroup. """ def __init__(self): """ """ super().__init__(0.0)
# TODO add true binomial quality function as in # https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786 # noqa: E501
[docs]class SimpleBinomialQF(StandardQF): """ Simple Binomial Quality Function SimpleBinomialQF is a StandardQF with a=0.5. It is an order equivalent approximation of the full binomial test if the subgroup size is much smaller than the size of the entire dataset. """ def __init__(self): """ """ super().__init__(0.5)
[docs]class WRAccQF(StandardQF): """ Weighted Relative Accuracy Quality Function WRAccQF is a StandardQF with a=1. It is order equivalent to the difference in the observed and expected number of positive instances. """ def __init__(self): """ """ super().__init__(1.0)
##### # GeneralizationAware Interestingness Measures #####
[docs]class GeneralizationAware_StandardQF(GeneralizationAwareQF_stats): def __init__(self, a): super().__init__(StandardQF(0)) self.a = a
[docs] def get_max(self, *args): max_ratio = 0.0 max_stats = None for stat in args: if stat.size_sg > 0: ratio = stat.positives_count / stat.size_sg if ratio > max_ratio: max_ratio = ratio max_stats = stat return max_stats
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats general_stats = statistics.generalisation_stats if sg_stats.size_sg == 0 or general_stats.size_sg == 0: return np.nan sg_ratio = sg_stats.positives_count / sg_stats.size_sg general_ratio = general_stats.positives_count / general_stats.size_sg return (sg_stats.size_sg / self.stats0.size_sg) ** self.a * ( sg_ratio - general_ratio )