Source code for pysubgroup.binary_target

"""
Created on 29.09.2017

@author: lemmerfn
"""
from collections import namedtuple
from functools import total_ordering

import numpy as np

from pysubgroup.measures import (
    AbstractInterestingnessMeasure,
    BoundedInterestingnessMeasure,
    GeneralizationAwareQF_stats,
)

from .subgroup_description import EqualitySelector, get_cover_array_and_size
from .utils import BaseTarget, derive_effective_sample_size


[docs] @total_ordering class BinaryTarget(BaseTarget): """Binary target for classic subgroup discovery with boolean targets. Stores the target attribute and value, and computes various statistics related to the target within a subgroup. """ statistic_types = ( "size_sg", "size_dataset", "positives_sg", "positives_dataset", "size_complement", "relative_size_sg", "relative_size_complement", "coverage_sg", "coverage_complement", "target_share_sg", "target_share_complement", "target_share_dataset", "lift", ) def __init__(self, target_attribute=None, target_value=None, target_selector=None): """ Initialize a BinaryTarget instance. Creates a new target for the boolean model class (classic subgroup discovery). If target_attribute and target_value are given, the target_selector is computed using the attribute and value. Parameters: target_attribute (str, optional): The name of the target attribute. target_value (any, optional): The value of the target attribute. target_selector (Selector, optional): A predefined target selector. Raises: ValueError: If both target_selector and target_attribute/target_value are provided, or if none are provided. """ if target_attribute is not None and target_value is not None: if target_selector is not None: raise ValueError( "BinaryTarget is to be constructed " "EITHER by a selector OR by attribute/value pair" ) target_selector = EqualitySelector(target_attribute, target_value) if target_selector is None: raise ValueError("No target selector given") self.target_selector = target_selector def __repr__(self): """String representation of the BinaryTarget.""" return "T: " + str(self.target_selector) def __eq__(self, other): """Check equality based on the instance dictionary.""" return self.__dict__ == other.__dict__ def __lt__(self, other): """Define less-than comparison for sorting purposes.""" return str(self) < str(other)
[docs] def covers(self, instance): """Determine whether the target selector covers the given instance. Parameters: instance (pandas DataFrame): The data instance to check. Returns: numpy.ndarray: Boolean array indicating coverage. """ return self.target_selector.covers(instance)
[docs] def get_attributes(self): """Get the attribute names used in the target. Returns: tuple: A tuple containing the attribute name. """ return (self.target_selector.attribute_name,)
[docs] def get_base_statistics(self, subgroup, data): """Compute basic statistics for the target within the subgroup and dataset. Parameters: subgroup: The subgroup for which to compute statistics. data (pandas DataFrame): The dataset. Returns: tuple: Contains instances_dataset, positives_dataset, instances_subgroup, positives_subgroup. """ cover_arr, size_sg = get_cover_array_and_size(subgroup, len(data), data) positives = self.covers(data) instances_subgroup = size_sg positives_dataset = np.sum(positives) instances_dataset = len(data) positives_subgroup = np.sum(positives[cover_arr]) return ( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, )
[docs] def calculate_statistics(self, subgroup, data, cached_statistics=None): """Calculate various statistics for the subgroup. Parameters: subgroup: The subgroup for which to calculate statistics. data (pandas DataFrame): The dataset. cached_statistics (dict, optional): Previously computed statistics. Returns: dict: A dictionary containing various statistical measures. """ if self.all_statistics_present(cached_statistics): return cached_statistics ( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, ) = self.get_base_statistics(subgroup, data) statistics = {} statistics["size_sg"] = instances_subgroup statistics["size_dataset"] = instances_dataset statistics["positives_sg"] = positives_subgroup statistics["positives_dataset"] = positives_dataset statistics["size_complement"] = instances_dataset - instances_subgroup statistics["relative_size_sg"] = instances_subgroup / instances_dataset statistics["relative_size_complement"] = ( instances_dataset - instances_subgroup ) / instances_dataset statistics["coverage_sg"] = positives_subgroup / positives_dataset statistics["coverage_complement"] = ( positives_dataset - positives_subgroup ) / positives_dataset statistics["target_share_sg"] = positives_subgroup / instances_subgroup if instances_dataset == instances_subgroup: statistics["target_share_complement"] = float("nan") else: statistics["target_share_complement"] = ( positives_dataset - positives_subgroup ) / (instances_dataset - instances_subgroup) statistics["target_share_dataset"] = positives_dataset / instances_dataset statistics["lift"] = ( statistics["target_share_sg"] / statistics["target_share_dataset"] ) return statistics
[docs] class SimplePositivesQF( AbstractInterestingnessMeasure ): # pylint: disable=abstract-method """Quality function for binary targets based on positive instances.""" tpl = namedtuple("PositivesQF_parameters", ("size_sg", "positives_count")) def __init__(self): """Initialize the SimplePositivesQF.""" self.dataset_statistics = None self.positives = None self.has_constant_statistics = False self.required_stat_attrs = ("size_sg", "positives_count")
[docs] def calculate_constant_statistics(self, data, target): """Calculate statistics that remain constant for the dataset. Parameters: data (pandas DataFrame): The dataset. target (BinaryTarget): The target definition. Raises: AssertionError: If the target is not an instance of BinaryTarget. """ assert isinstance(target, BinaryTarget) self.positives = target.covers(data) self.dataset_statistics = SimplePositivesQF.tpl( len(data), np.sum(self.positives) ) self.has_constant_statistics = True
[docs] def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument """Calculate statistics specific to the subgroup. Parameters: subgroup: The subgroup for which to calculate statistics. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: namedtuple: Contains size_sg and positives_count for the subgroup. """ cover_arr, size_sg = get_cover_array_and_size( subgroup, len(self.positives), data ) return SimplePositivesQF.tpl( size_sg, np.count_nonzero(self.positives[cover_arr]) )
# <<< GpGrowth >>>
[docs] def gp_get_stats(self, row_index): """Get statistics for a single row (used in GP-Growth algorithms). Parameters: row_index (int): The index of the row. Returns: numpy.ndarray: Array containing [1, positives[row_index]]. """ return np.array([1, self.positives[row_index]], dtype=int)
[docs] def gp_get_null_vector(self): """Get a null vector for initialization in GP-Growth algorithms. Returns: numpy.ndarray: Zero-initialized array of size 2. """ return np.zeros(2)
[docs] def gp_merge(self, left, right): """Merge two statistics vectors by summing them. Parameters: left (numpy.ndarray): Left statistics vector. right (numpy.ndarray): Right statistics vector. """ left += right
[docs] def gp_get_params(self, _cover_arr, v): """Extract parameters from the statistics vector. Parameters: _cover_arr: Unused parameter. v (numpy.ndarray): Statistics vector. Returns: namedtuple: Contains size_sg and positives_count. """ return SimplePositivesQF.tpl(v[0], v[1])
[docs] def gp_to_str(self, stats): """Convert statistics to a string representation. Parameters: stats (numpy.ndarray): Statistics vector. Returns: str: String representation of the statistics. """ return " ".join(map(str, stats))
[docs] def gp_size_sg(self, stats): """Get the size of the subgroup from the statistics. Parameters: stats (numpy.ndarray): Statistics vector. Returns: int: Size of the subgroup. """ return stats[0]
@property def gp_requires_cover_arr(self): """Indicate whether the GP-Growth algorithm requires a cover array. Returns: bool: False, since cover array is not required. """ return False
# TODO Make ChiSquared useful for real nominal data not just binary # Introduce Enum for direction # Maybe it is possible to give an optimistic estimate for ChiSquared
[docs] class ChiSquaredQF(SimplePositivesQF): # pragma: no cover """ ChiSquaredQF tests for statistical independence of a subgroup against its complement. Calculates the chi-squared statistic or p-value to measure the significance of the difference between the subgroup and the dataset. """
[docs] @staticmethod def chi_squared_qf( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, min_instances=5, bidirect=True, direction_positive=True, index=0, ): """ Perform chi-squared test of statistical independence. Tests whether a subgroup is statistically independent from its complement (see scipy.stats.chi2_contingency). Parameters: instances_dataset (int): Total number of instances in the dataset. positives_dataset (int): Total number of positive instances in the dataset. instances_subgroup (int): Number of instances in the subgroup. positives_subgroup (int): Number of positive instances in the subgroup. min_instances (int, optional): Minimum required instances; return -inf if less. bidirect (bool, optional): If True, both directions are considered interesting. direction_positive (bool, optional): If bidirect is False, specifies the direction. index (int, optional): Whether to return statistic (0) or p-value (1). Returns: float: Chi-squared statistic or p-value, depending on the index parameter. """ import scipy.stats # pylint:disable=import-outside-toplevel if (instances_subgroup < min_instances) or ( (instances_dataset - instances_subgroup) < min_instances ): return float("-inf") negatives_subgroup = instances_subgroup - positives_subgroup negatives_dataset = instances_dataset - positives_dataset negatives_complement = negatives_dataset - negatives_subgroup positives_complement = positives_dataset - positives_subgroup val = scipy.stats.chi2_contingency( [ [positives_subgroup, positives_complement], [negatives_subgroup, negatives_complement], ], correction=False, )[index] if bidirect: return val p_subgroup = positives_subgroup / instances_subgroup p_dataset = positives_dataset / instances_dataset if direction_positive and p_subgroup > p_dataset: return val if not direction_positive and p_subgroup < p_dataset: return val return -val
[docs] @staticmethod def chi_squared_qf_weighted( subgroup, data, weighting_attribute, effective_sample_size=0, min_instances=5, ): """Perform chi-squared test for weighted data. Parameters: subgroup: The subgroup for which to calculate the statistic. data (pandas DataFrame): The dataset. weighting_attribute (str): The attribute used for weighting. effective_sample_size (int, optional): Effective sample size. min_instances (int, optional): Minimum required instances. Returns: float: The p-value from the chi-squared test. """ import scipy.stats # pylint:disable=import-outside-toplevel ( instancesDataset, positivesDataset, instancesSubgroup, positivesSubgroup, ) = subgroup.get_base_statistics(data, weighting_attribute) if (instancesSubgroup < min_instances) or ( (instancesDataset - instancesSubgroup) < 5 ): return float("inf") if effective_sample_size == 0: effective_sample_size = derive_effective_sample_size( data[weighting_attribute] ) negatives_subgroup = instancesSubgroup - positivesSubgroup negatives_dataset = instancesDataset - positivesDataset positives_complement = positivesDataset - positivesSubgroup negatives_complement = negatives_dataset - negatives_subgroup val = scipy.stats.chi2_contingency( [ [positivesSubgroup, positives_complement], [negatives_subgroup, negatives_complement], ], correction=True, )[0] return scipy.stats.chi2.sf(val * effective_sample_size / instancesDataset, 1)
def __init__(self, direction="both", min_instances=5, stat="chi2"): """ Initialize the ChiSquaredQF. Parameters: direction (str, optional): Direction of deviation of interest ('both', 'positive', 'negative'). min_instances (int, optional): Minimum required instances; return -inf if less. stat (str, optional): Use test statistic ('chi2') or the p-value ('p')? """ if direction == "both": self.bidirect = True self.direction_positive = True elif direction == "positive": self.bidirect = False self.direction_positive = True elif direction == "negative": self.bidirect = False self.direction_positive = False else: raise ValueError( "Invalid direction; must be 'both', 'positive', or 'negative'" ) self.min_instances = min_instances self.index = {"chi2": 0, "p": 1}[stat] super().__init__()
[docs] def evaluate(self, subgroup, target, data, statistics=None): """Evaluate the quality of the subgroup using the chi-squared test. Parameters: subgroup: The subgroup to evaluate. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: float: The chi-squared statistic or p-value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return ChiSquaredQF.chi_squared_qf( dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, self.min_instances, self.bidirect, self.direction_positive, self.index, )
[docs] class StandardQF(SimplePositivesQF, BoundedInterestingnessMeasure): """ StandardQF which weights the relative size against the difference in averages. The StandardQF is a general form of quality function which for different values of 'a' is order equivalent to many popular quality measures. """
[docs] @staticmethod def standard_qf( a, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup ): """Compute the standard quality function. Parameters: a (float): Exponent to trade-off the relative size with difference in means. instances_dataset (int): Total number of instances in the dataset. positives_dataset (int): Total number of positive instances in the dataset. instances_subgroup (int): Number of instances in the subgroup. positives_subgroup (int): Number of positive instances in the subgroup. Returns: float: The computed quality value. """ if not hasattr(instances_subgroup, "__array_interface__") and ( instances_subgroup == 0 ): return np.nan p_subgroup = np.divide(positives_subgroup, instances_subgroup) p_dataset = positives_dataset / instances_dataset return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset)
def __init__(self, a): """ Initialize the StandardQF. Parameters: a (float): Exponent to trade-off the relative size with the difference in means. """ self.a = a super().__init__()
[docs] def evaluate(self, subgroup, target, data, statistics=None): """Evaluate the quality of the subgroup using the standard quality function. Parameters: subgroup: The subgroup to evaluate. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: float: The computed quality value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg, statistics.positives_count, )
[docs] def optimistic_estimate(self, subgroup, target, data, statistics=None): """Compute the optimistic estimate of the quality function. Parameters: subgroup: The subgroup for which to compute the optimistic estimate. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: float: The optimistic estimate of the quality value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.positives_count, statistics.positives_count, )
[docs] def optimistic_generalisation(self, subgroup, target, data, statistics=None): """Compute the optimistic generalization of the quality function. Parameters: subgroup: The subgroup for which to compute the optimistic generalization. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: float: The optimistic generalization of the quality value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics pos_remaining = dataset.positives_count - statistics.positives_count return StandardQF.standard_qf( self.a, dataset.size_sg, dataset.positives_count, statistics.size_sg + pos_remaining, dataset.positives_count, )
[docs] class LiftQF(StandardQF): """ Lift Quality Function. LiftQF is a StandardQF with a=0. Thus it treats the difference in ratios as the quality without caring about the relative size of a subgroup. """ def __init__(self): """Initialize the LiftQF.""" super().__init__(0.0)
# TODO add true binomial quality function as in # https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786
[docs] class SimpleBinomialQF(StandardQF): """ Simple Binomial Quality Function. SimpleBinomialQF is a StandardQF with a=0.5. It is an order-equivalent approximation of the full binomial test if the subgroup size is much smaller than the size of the entire dataset. """ def __init__(self): """Initialize the SimpleBinomialQF.""" super().__init__(0.5)
[docs] class WRAccQF(StandardQF): """ Weighted Relative Accuracy Quality Function. WRAccQF is a StandardQF with a=1. It is order-equivalent to the difference in the observed and expected number of positive instances. """ def __init__(self): """Initialize the WRAccQF.""" super().__init__(1.0)
##### # Generalization-Aware Interestingness Measures #####
[docs] class GeneralizationAware_StandardQF( GeneralizationAwareQF_stats, BoundedInterestingnessMeasure ): """Generalization-Aware Standard Quality Function. Extends the StandardQF to consider generalizations during subgroup discovery, providing methods for optimistic estimates and aggregate statistics. """ ga_sQF_agg_tuple = namedtuple( "ga_sQF_agg_tuple", ["max_p", "min_delta_negatives", "min_negatives"] ) def __init__(self, a, optimistic_estimate_strategy="default"): """ Initialize the GeneralizationAware_StandardQF. Parameters: a (float): Exponent to trade-off the relative size with the difference in means. optimistic_estimate_strategy (str, optional): Strategy for optimistic estimates. """ super().__init__(StandardQF(a)) if optimistic_estimate_strategy in ("default", "difference"): self.optimistic_estimate = self.difference_based_optimistic_estimate self.aggregate_statistics = self.difference_based_agg_function self.read_p = self.difference_based_read_p elif optimistic_estimate_strategy == "max": self.optimistic_estimate = self.max_based_optimistic_estimate self.aggregate_statistics = self.max_based_aggregate_statistics self.read_p = self.max_based_read_p else: raise ValueError( "optimistic_estimate_strategy should be one of " "('default', 'max', 'difference')" ) self.a = a
[docs] def evaluate(self, subgroup, target, data, statistics=None): """Evaluate the quality of the subgroup considering generalizations. Parameters: subgroup: The subgroup to evaluate. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: float: The computed quality value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats if sg_stats.size_sg == 0: return np.nan general_stats = statistics.generalisation_stats sg_ratio = sg_stats.positives_count / sg_stats.size_sg return (sg_stats.size_sg / self.stats0.size_sg) ** self.a * ( sg_ratio - self.read_p(general_stats) )
[docs] def max_based_aggregate_statistics(self, stats_subgroup, list_of_pairs): """Aggregate statistics using the maximum-based strategy. Parameters: stats_subgroup: Statistics of the current subgroup. list_of_pairs: List of (stats, agg_tuple) for all generalizations. Returns: The aggregated statistics. """ if len(list_of_pairs) == 0: return stats_subgroup max_ratio = -100 max_stats = None for pair in list_of_pairs: ratio = -np.inf for agg_stat in pair: if agg_stat.size_sg == 0: # pragma: no cover continue ratio = agg_stat.positives_count / agg_stat.size_sg if ratio > max_ratio: max_ratio = ratio max_stats = agg_stat return max_stats
[docs] def max_based_optimistic_estimate(self, subgroup, target, data, statistics=None): """Compute the optimistic estimate using the maximum-based strategy. Parameters: subgroup: The subgroup for which to compute the estimate. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any, optional): Unused in this implementation. Returns: float: The optimistic estimate of the quality value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats general_stats = statistics.generalisation_stats if sg_stats.size_sg == 0 or general_stats.size_sg == 0: return np.nan general_ratio = general_stats.positives_count / general_stats.size_sg return (sg_stats.positives_count / self.stats0.size_sg) ** self.a * ( 1 - general_ratio )
[docs] def max_based_read_p(self, agg_tuple): """Read the p-value from the aggregate tuple using the maximum-based strategy. Parameters: agg_tuple: The aggregate statistics tuple. Returns: float: The ratio of positives in the aggregate statistics. """ return agg_tuple.positives_count / agg_tuple.size_sg
[docs] def difference_based_optimistic_estimate(self, subgroup, target, data, statistics): """Compute the optimistic estimate using the difference-based strategy. Parameters: subgroup: The subgroup for which to compute the estimate. target (BinaryTarget): The target definition. data (pandas DataFrame): The dataset. statistics (any): Current statistics. Returns: float: The optimistic estimate of the quality value. """ sg_stats, agg_stats = self.ensure_statistics(subgroup, target, data, statistics) if np.isposinf(agg_stats.min_delta_negatives): return np.inf delta_n = agg_stats.min_delta_negatives size_dataset = self.qf.dataset_statistics.size_sg if self.qf.a == 0: pos = 1 elif self.qf.a == 1.0: pos = sg_stats.positives_count else: a = self.qf.a p_hat = min(np.ceil(a * delta_n / (1 - a)), sg_stats.positives_count) pos = p_hat tau_diff = pos / (pos + delta_n) if sg_stats.size_sg > 0: tau_sg = sg_stats.positives_count / sg_stats.size_sg else: tau_sg = -1 tau_max = max(tau_diff, tau_sg, agg_stats.max_p) return (sg_stats.positives_count / size_dataset) ** self.a * (1 - tau_max)
[docs] def difference_based_agg_function(self, stats_subgroup, list_of_pairs): """Aggregate statistics using the difference-based strategy. Parameters: stats_subgroup: Statistics of the current subgroup. list_of_pairs: List of (stats, agg_tuple) for all generalizations. Returns: namedtuple: Aggregate statistics tuple. """ def get_negatives_count(sg_stats): return sg_stats.size_sg - sg_stats.positives_count def get_percentage_positives(sg_stats): if sg_stats.size_sg == 0: return np.nan return sg_stats.positives_count / sg_stats.size_sg if len(list_of_pairs) == 0: # empty pattern return GeneralizationAware_StandardQF.ga_sQF_agg_tuple( get_percentage_positives(stats_subgroup), np.infty, np.infty ) subgroup_negatives = stats_subgroup.size_sg - stats_subgroup.positives_count min_immediate_generalizations_negatives = min( get_negatives_count(x.subgroup_stats) for x in list_of_pairs ) min_immediate_generalizations_delta_negatives = min( x.generalisation_stats.min_delta_negatives for x in list_of_pairs ) max_percentage_positives = max( max( get_percentage_positives(x.subgroup_stats), x.generalisation_stats.max_p ) for x in list_of_pairs ) sg_delta_negatives = ( min_immediate_generalizations_negatives - subgroup_negatives ) min_delta_negatives = min( sg_delta_negatives, min_immediate_generalizations_delta_negatives ) return GeneralizationAware_StandardQF.ga_sQF_agg_tuple( max_percentage_positives, min_delta_negatives, sg_delta_negatives )
[docs] def difference_based_read_p(self, agg_tuple): """ Read the p-value from the aggregate tuple using the difference-based strategy. Parameters: agg_tuple: The aggregate statistics tuple. Returns: float: The maximum percentage of positives. """ return agg_tuple.max_p