Source code for pysubgroup.binary_target

"""
Created on 29.09.2017

@author: lemmerfn
"""
from collections import namedtuple
from functools import total_ordering

import numpy as np

from pysubgroup.measures import (
    AbstractInterestingnessMeasure,
    BoundedInterestingnessMeasure,
    GeneralizationAwareQF_stats,
)

from .subgroup_description import EqualitySelector, get_cover_array_and_size
from .utils import BaseTarget, derive_effective_sample_size


[docs] @total_ordering class BinaryTarget(BaseTarget): statistic_types = ( "size_sg", "size_dataset", "positives_sg", "positives_dataset", "size_complement", "relative_size_sg", "relative_size_complement", "coverage_sg", "coverage_complement", "target_share_sg", "target_share_complement", "target_share_dataset", "lift", ) def __init__(self, target_attribute=None, target_value=None, target_selector=None): """ Creates a new target for the boolean model class (classic subgroup discovery). If target_attribute and target_value are given, the target_selector is computed using attribute and value """ if target_attribute is not None and target_value is not None: if target_selector is not None: raise ValueError( "BinaryTarget is to be constructed" "EITHER by a selector OR by attribute/value pair" ) target_selector = EqualitySelector(target_attribute, target_value) if target_selector is None: raise ValueError("No target selector given") self.target_selector = target_selector def __repr__(self): return "T: " + str(self.target_selector) def __eq__(self, other): return self.__dict__ == other.__dict__ def __lt__(self, other): return str(self) < str(other)
[docs] def covers(self, instance): return self.target_selector.covers(instance)
[docs] def get_attributes(self): return (self.target_selector.attribute_name,)
[docs] def get_base_statistics(self, subgroup, data): cover_arr, size_sg = get_cover_array_and_size(subgroup, len(data), data) positives = self.covers(data) instances_subgroup = size_sg positives_dataset = np.sum(positives) instances_dataset = len(data) positives_subgroup = np.sum(positives[cover_arr]) return ( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, )
[docs] def calculate_statistics(self, subgroup, data, cached_statistics=None): if self.all_statistics_present(cached_statistics): return cached_statistics ( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, ) = self.get_base_statistics(subgroup, data) statistics = {} statistics["size_sg"] = instances_subgroup statistics["size_dataset"] = instances_dataset statistics["positives_sg"] = positives_subgroup statistics["positives_dataset"] = positives_dataset statistics["size_complement"] = instances_dataset - instances_subgroup statistics["relative_size_sg"] = instances_subgroup / instances_dataset statistics["relative_size_complement"] = ( instances_dataset - instances_subgroup ) / instances_dataset statistics["coverage_sg"] = positives_subgroup / positives_dataset statistics["coverage_complement"] = ( positives_dataset - positives_subgroup ) / positives_dataset statistics["target_share_sg"] = positives_subgroup / instances_subgroup if instances_dataset == instances_subgroup: statistics["target_share_complement"] = float("nan") else: statistics["target_share_complement"] = ( positives_dataset - positives_subgroup ) / (instances_dataset - instances_subgroup) statistics["target_share_dataset"] = positives_dataset / instances_dataset statistics["lift"] = ( statistics["target_share_sg"] / statistics["target_share_dataset"] ) return statistics
[docs] class SimplePositivesQF( AbstractInterestingnessMeasure ): # pylint: disable=abstract-method tpl = namedtuple("PositivesQF_parameters", ("size_sg", "positives_count")) def __init__(self): self.dataset_statistics = None self.positives = None self.has_constant_statistics = False self.required_stat_attrs = ("size_sg", "positives_count")
[docs] def calculate_constant_statistics(self, data, target): assert isinstance(target, BinaryTarget) self.positives = target.covers(data) self.dataset_statistics = SimplePositivesQF.tpl( len(data), np.sum(self.positives) ) self.has_constant_statistics = True
[docs] def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument cover_arr, size_sg = get_cover_array_and_size( subgroup, len(self.positives), data ) return SimplePositivesQF.tpl( size_sg, np.count_nonzero(self.positives[cover_arr]) )
# <<< GpGrowth >>>
[docs] def gp_get_stats(self, row_index): return np.array([1, self.positives[row_index]], dtype=int)
[docs] def gp_get_null_vector(self): return np.zeros(2)
[docs] def gp_merge(self, left, right): left += right
[docs] def gp_get_params(self, _cover_arr, v): return SimplePositivesQF.tpl(v[0], v[1])
[docs] def gp_to_str(self, stats): return " ".join(map(str, stats))
[docs] def gp_size_sg(self, stats): return stats[0]
@property def gp_requires_cover_arr(self): return False
# TODO Make ChiSquared useful for real nominal data not just binary # Introduce Enum for direction # Maybe it is possible to give a optimistic estimate for ChiSquared
[docs] class ChiSquaredQF(SimplePositivesQF): # pragma: no cover """ ChiSquaredQF which test for statistical independence of a subgroup against it's complement ... """
[docs] @staticmethod def chi_squared_qf( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, min_instances=5, bidirect=True, direction_positive=True, index=0, ): """ Performs chi2 test of statistical independence Test whether a subgroup is statistically independent from it's complement (see scipy.stats.chi2_contingency). Parameters ---------- instances_dataset, positives_dataset, instances_subgroup, positives_subgroup : int counts of subgroup and dataset min_instances : int, optional number of required instances, if less -inf is returned for that subgroup bidirect : bool, optional If true both directions are considered interesting else direction_positive decides which direction is interesting direction_positive: bool, optional Only used if bidirect=False; specifies whether you are interested in positive (True) or negative deviations index : {0, 1}, optional decides whether the test statistic (0) or the p-value (1) should be used """ import scipy.stats # pylint:disable=import-outside-toplevel if (instances_subgroup < min_instances) or ( (instances_dataset - instances_subgroup) < min_instances ): return float("-inf") negatives_subgroup = instances_subgroup - positives_subgroup negatives_dataset = instances_dataset - positives_dataset negatives_complement = negatives_dataset - negatives_subgroup positives_complement = positives_dataset - positives_subgroup val = scipy.stats.chi2_contingency( [ [positives_subgroup, positives_complement], [negatives_subgroup, negatives_complement], ], correction=False, )[index] if bidirect: return val p_subgroup = positives_subgroup / instances_subgroup p_dataset = positives_dataset / instances_dataset if direction_positive and p_subgroup > p_dataset: return val if not direction_positive and p_subgroup < p_dataset: return val return -val
[docs] @staticmethod def chi_squared_qf_weighted( subgroup, data, weighting_attribute, effective_sample_size=0, min_instances=5, ): import scipy.stats # pylint:disable=import-outside-toplevel ( instancesDataset, positivesDataset, instancesSubgroup, positivesSubgroup, ) = subgroup.get_base_statistics(data, weighting_attribute) if (instancesSubgroup < min_instances) or ( (instancesDataset - instancesSubgroup) < 5 ): return float("inf") if effective_sample_size == 0: effective_sample_size = derive_effective_sample_size( data[weighting_attribute] ) # p_subgroup = positivesSubgroup / instancesSubgroup # p_dataset = positivesDataset / instancesDataset negatives_subgroup = instancesSubgroup - positivesSubgroup negatives_dataset = instancesDataset - positivesDataset positives_complement = positivesDataset - positivesSubgroup negatives_complement = negatives_dataset - negatives_subgroup val = scipy.stats.chi2_contingency( [ [positivesSubgroup, positives_complement], [negatives_subgroup, negatives_complement], ], correction=True, )[0] return scipy.stats.chi2.sf(val * effective_sample_size / instancesDataset, 1)
def __init__(self, direction="both", min_instances=5, stat="chi2"): """ Parameters ---------- direction : {'both', 'positive', 'negative'} direction of deviation that is of interest min_instances : int, optional number of required instances, if less -inf is returned for that subgroup stat : {'chi2', 'p'} whether to report the test statistic or the p-value (see scipy.stats.chi2_contingency) """ if direction == "both": self.bidirect = True self.direction_positive = True if direction == "positive": self.bidirect = False self.direction_positive = True if direction == "negative": self.bidirect = False self.direction_positive = False self.min_instances = min_instances self.index = {"chi2": 0, "p": 1}[stat] super().__init__()
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return ChiSquaredQF.chi_squared_qf( dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, self.min_instances, self.bidirect, self.direction_positive, self.index, )
[docs] class StandardQF(SimplePositivesQF, BoundedInterestingnessMeasure): """ StandardQF which weights the relative size against the difference in averages The StandardQF is a general form of quality function which for different values of a is order equivalen to many popular quality measures. Attributes ---------- a : float used as an exponent to scale the relative size to the difference in averages """
[docs] @staticmethod def standard_qf( a, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup ): if not hasattr(instances_subgroup, "__array_interface__") and ( instances_subgroup == 0 ): return np.nan p_subgroup = np.divide(positives_subgroup, instances_subgroup) # if instances_subgroup == 0: # return 0 # p_subgroup = positives_subgroup / instances_subgroup p_dataset = positives_dataset / instances_dataset return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset)
def __init__(self, a): """ Parameters ---------- a : float exponent to trade-off the relative size with the difference in means """ self.a = a super().__init__()
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, )
[docs] def optimistic_estimate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.positives_count, statistics.positives_count, )
[docs] def optimistic_generalisation(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics pos_remaining = dataset.positives_count - statistics.positives_count return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg + pos_remaining, dataset.positives_count, )
[docs] class LiftQF(StandardQF): """ Lift Quality Function LiftQF is a StandardQF with a=0. Thus it treats the difference in ratios as the quality without caring about the relative size of a subgroup. """ def __init__(self): """ """ super().__init__(0.0)
# TODO add true binomial quality function as in # https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786 # noqa: E501
[docs] class SimpleBinomialQF(StandardQF): """ Simple Binomial Quality Function SimpleBinomialQF is a StandardQF with a=0.5. It is an order equivalent approximation of the full binomial test if the subgroup size is much smaller than the size of the entire dataset. """ def __init__(self): """ """ super().__init__(0.5)
[docs] class WRAccQF(StandardQF): """ Weighted Relative Accuracy Quality Function WRAccQF is a StandardQF with a=1. It is order equivalent to the difference in the observed and expected number of positive instances. """ def __init__(self): """ """ super().__init__(1.0)
##### # GeneralizationAware Interestingness Measures #####
[docs] class GeneralizationAware_StandardQF( GeneralizationAwareQF_stats, BoundedInterestingnessMeasure ): ga_sQF_agg_tuple = namedtuple( "ga_sQF_agg_tuple", ["max_p", "min_delta_negatives", "min_negatives"] ) def __init__(self, a, optimistic_estimate_strategy="default"): super().__init__(StandardQF(a)) if optimistic_estimate_strategy in ("default", "difference"): self.optimistic_estimate = self.difference_based_optimistic_estimate self.aggregate_statistics = self.difference_based_agg_function self.read_p = self.difference_based_read_p elif optimistic_estimate_strategy == "max": self.optimistic_estimate = self.max_based_optimistic_estimate self.aggregate_statistics = self.max_based_aggregate_statistics self.read_p = self.max_based_read_p else: raise ValueError( "optimistic_estimate_strategy should be one of " "('default', 'max', 'difference')" ) self.a = a
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats general_stats = statistics.generalisation_stats if sg_stats.size_sg == 0: return np.nan sg_ratio = sg_stats.positives_count / sg_stats.size_sg return (sg_stats.size_sg / self.stats0.size_sg) ** self.a * ( sg_ratio - self.read_p(general_stats) )
[docs] def max_based_aggregate_statistics(self, stats_subgroup, list_of_pairs): if len(list_of_pairs) == 0: return stats_subgroup max_ratio = -100 max_stats = None for pair in list_of_pairs: ratio = -np.inf for agg_stat in pair: if agg_stat.size_sg == 0: # pragma: no cover continue ratio = agg_stat.positives_count / agg_stat.size_sg if ratio > max_ratio: max_ratio = ratio max_stats = agg_stat return max_stats
[docs] def max_based_optimistic_estimate(self, subgroup, target, data, statistics=None): """ Computes the oe as the hypothetical subgroup containing only positive instances """ statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats general_stats = statistics.generalisation_stats if sg_stats.size_sg == 0 or general_stats.size_sg == 0: return np.nan general_ratio = general_stats.positives_count / general_stats.size_sg return (sg_stats.positives_count / self.stats0.size_sg) ** self.a * ( 1 - general_ratio )
[docs] def max_based_read_p(self, agg_tuple): return agg_tuple.positives_count / agg_tuple.size_sg
[docs] def difference_based_optimistic_estimate(self, subgroup, target, data, statistics): sg_stats, agg_stats = self.ensure_statistics(subgroup, target, data, statistics) if np.isposinf(agg_stats.min_delta_negatives): return np.inf delta_n = agg_stats.min_delta_negatives size_dataset = self.qf.dataset_statistics.size_sg tau_diff = 0 if self.qf.a == 0: pos = 1 # return delta_n /(1 + delta_n) elif self.qf.a == 1.0: pos = sg_stats.positives_count # return pos / size_dataset * delta_n /(pos + delta_n) else: a = self.qf.a p_hat = min(np.ceil(a * delta_n / (1 - a)), sg_stats.positives_count) pos = p_hat # return (p_hat / size_dataset) ** a * delta_n /(p_hat+delta_n) tau_diff = pos / (pos + delta_n) if sg_stats.size_sg > 0: tau_sg = sg_stats.positives_count / sg_stats.size_sg else: tau_sg = -1 tau_max = max(tau_diff, tau_sg, agg_stats.max_p) return (sg_stats.positives_count / size_dataset) ** self.a * (1 - tau_max)
[docs] def difference_based_agg_function(self, stats_subgroup, list_of_pairs): """ list_of_pairs is a list of (stats, agg_tuple) for all the generalizations """ def get_negatives_count(sg_stats): return sg_stats.size_sg - sg_stats.positives_count def get_percentage_positives(sg_stats): if sg_stats.size_sg == 0: return np.nan return sg_stats.positives_count / sg_stats.size_sg if len(list_of_pairs) == 0: # empty pattern return GeneralizationAware_StandardQF.ga_sQF_agg_tuple( get_percentage_positives(stats_subgroup), np.infty, np.infty ) subgroup_negatives = stats_subgroup.size_sg - stats_subgroup.positives_count min_immediate_generalizations_negatives = min( get_negatives_count(x.subgroup_stats) for x in list_of_pairs ) min_immediate_generalizations_delta_negatives = min( x.generalisation_stats.min_delta_negatives for x in list_of_pairs ) max_percentage_positives = max( max( get_percentage_positives(x.subgroup_stats), x.generalisation_stats.max_p ) for x in list_of_pairs ) sg_delta_negatives = ( min_immediate_generalizations_negatives - subgroup_negatives ) min_delta_negatives = min( sg_delta_negatives, min_immediate_generalizations_delta_negatives ) return GeneralizationAware_StandardQF.ga_sQF_agg_tuple( max_percentage_positives, min_delta_negatives, sg_delta_negatives )
[docs] def difference_based_read_p(self, agg_tuple): return agg_tuple.max_p