Source code for pysubgroup.measures

"""
Created on 28.04.2016

@author: lemmerfn
"""
from abc import ABC
from collections import namedtuple
from itertools import combinations

import numpy as np

import pysubgroup as ps


[docs]class AbstractInterestingnessMeasure(ABC): # pylint: disable=no-member
[docs] def ensure_statistics(self, subgroup, target, data, statistics=None): if not self.has_constant_statistics: self.calculate_constant_statistics(data, target) if any(not hasattr(statistics, attr) for attr in self.required_stat_attrs): if getattr(subgroup, "statistics", False): return subgroup.statistics return self.calculate_statistics(subgroup, target, data, statistics) return statistics
# pylint: enable=no-member # def optimistic_estimate_from_dataset( # self, # data, # subgroup, # weighting_attribute=None): #pylint: disable=unused-argument # return float("inf")
[docs]class BoundedInterestingnessMeasure(AbstractInterestingnessMeasure): pass
# @abstractmethod # def optimistic_estimate_from_dataset( # self, data, subgroup, weighting_attribute=None): # pass ##### # FIX ME: This is currently not working anymore #####
[docs]class CombinedInterestingnessMeasure(BoundedInterestingnessMeasure): def __init__(self, measures, weights=None): self.measures = measures if weights is None: weights = [1] * len(measures) assert len(weights) == len(measures) self.weights = weights
[docs] def calculate_constant_statistics(self, data, target): pass
[docs] def calculate_statistics(self, subgroup, target, data, cached_statistics=None): pass
[docs] def evaluate(self, subgroup, target, data, statistics=None): # FIX USE of constant statistics return np.dot( [m.evaluate(subgroup, target, data, None) for m in self.measures], self.weights, )
[docs] def optimistic_estimate(self, subgroup, target, data, statistics=None): # FIX USE of constant statistics return np.dot( [ m.optimistic_estimate(subgroup, target, data, None) for m in self.measures ], self.weights, )
[docs] def evaluate_from_statistics( self, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, ): return np.dot( [ m.evaluate_from_statistics( instances_dataset, positives_dataset, instances_subgroup, positives_subgroup, ) for m in self.measures ], self.weights, )
# def optimistic_estimate_from_statistics( # self, # instances_dataset, # positives_dataset, # instances_subgroup, # positives_subgroup): # return np.dot( # [m.evaluate_from_statistics( # instances_dataset, # positives_dataset, # instances_subgroup, # positives_subgroup) # for m in self.measures], # self.weights) ########## # Filter ##########
[docs]def unique_attributes(result_set, data): result = [] used_attributes = [] for q, sg in result_set: atts = sg.subgroup_description.get_attributes() if atts not in used_attributes or all( ps.is_categorical_attribute(data, x) for x in atts ): result.append((q, sg)) used_attributes.append(atts) return result
[docs]def minimum_statistic_filter(result_set, statistic, minimum, data): result = [] for q, sg in result_set: if len(sg.statistics) == 0: sg.calculate_statistics(data) if sg.statistics[statistic] >= minimum: result.append((q, sg)) return result
[docs]def minimum_quality_filter(result_set, minimum): result = [] for q, sg in result_set: if q >= minimum: result.append((q, sg)) return result
[docs]def maximum_statistic_filter(result_set, statistic, maximum): result = [] for q, sg in result_set: if sg.statistics[statistic] <= maximum: result.append((q, sg)) return result
[docs]def overlap_filter(result_set, data, similarity_level=0.9): result = [] result_sgs = [] for q, sg in result_set: if not overlaps_list(sg, result_sgs, data, similarity_level): result_sgs.append(sg) result.append((q, sg)) return result
[docs]def overlaps_list(sg, list_of_sgs, data, similarity_level=0.9): for anotherSG in list_of_sgs: if ps.overlap(sg, anotherSG, data) > similarity_level: return True return False
[docs]class CountCallsInterestingMeasure(BoundedInterestingnessMeasure): def __init__(self, qf): self.qf = qf self.calls = 0
[docs] def calculate_statistics(self, sg, target, data, statistics=None): self.calls += 1 return self.qf.calculate_statistics(sg, target, data, statistics)
def __getattr__(self, name): return getattr(self.qf, name) def __hasattr__(self, name): return hasattr(self.qf, name)
##### # GeneralizationAware Interestingness Measures #####
[docs]class GeneralizationAwareQF(AbstractInterestingnessMeasure): ga_tuple = namedtuple("ga_tuple", ["subgroup_quality", "generalisation_quality"]) def __init__(self, qf): self.qf = qf # this cache maps the representation of descriptions to tuples # the first entry is the quality and the second one is # the largest quality of all its predessors self.cache = {} self.has_constant_statistics = False self.required_stat_attrs = ["subgroup_quality", "generalisation_quality"] self.q0 = 0
[docs] def calculate_constant_statistics(self, data, target): self.cache = {} self.qf.calculate_constant_statistics(data, target) self.q0 = self.qf.evaluate(slice(None), target, data) self.has_constant_statistics = self.qf.has_constant_statistics
[docs] def calculate_statistics(self, subgroup, target, data, statistics=None): sg_repr = repr(subgroup) if sg_repr in self.cache: return GeneralizationAwareQF.ga_tuple(*self.cache[sg_repr]) (q_sg, q_prev) = self.get_qual_and_previous_qual(subgroup, target, data) self.cache[sg_repr] = (q_sg, q_prev) return GeneralizationAwareQF.ga_tuple(q_sg, q_prev)
[docs] def get_qual_and_previous_qual(self, subgroup, target, data): q_subgroup = self.qf.evaluate(subgroup, target, data) max_q = 0 selectors = subgroup.selectors if len(selectors) > 0: # compute quality of all generalizations generalizations = combinations(selectors, len(selectors) - 1) for sels in generalizations: sgd = ps.Conjunction(list(sels)) (q_sg, q_prev) = self.calculate_statistics(sgd, target, data) max_q = max(max_q, q_sg, q_prev) return (q_subgroup, max_q)
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) return statistics.subgroup_quality - statistics.generalisation_quality
##### # GeneralizationAware Interestingness Measures #####
[docs]class GeneralizationAwareQF_stats(AbstractInterestingnessMeasure): ga_tuple = namedtuple("ga_stats_tuple", ["subgroup_stats", "generalisation_stats"]) def __init__(self, qf): self.qf = qf # this cache maps the representation of descriptions to tuples # the first entry is the quality and the second one is # the largest quality of all its predecessors self.cache = {} self.has_constant_statistics = False self.required_stat_attrs = GeneralizationAwareQF_stats.ga_tuple._fields self.stats0 = None
[docs] def calculate_constant_statistics(self, data, target): self.cache = {} self.qf.calculate_constant_statistics(data, target) self.stats0 = self.qf.calculate_statistics(slice(None), target, data) self.has_constant_statistics = self.qf.has_constant_statistics
[docs] def calculate_statistics(self, subgroup, target, data, statistics=None): sg_repr = repr(subgroup) if sg_repr in self.cache: return GeneralizationAwareQF_stats.ga_tuple(*self.cache[sg_repr]) (stats_sg, stats_prev) = self.get_stats_and_previous_stats( subgroup, target, data ) self.cache[sg_repr] = (stats_sg, stats_prev) return GeneralizationAwareQF_stats.ga_tuple(stats_sg, stats_prev)
[docs] def get_stats_and_previous_stats(self, subgroup, target, data): stats_subgroup = self.qf.calculate_statistics(subgroup, target, data) max_stats = self.stats0 selectors = subgroup.selectors if len(selectors) > 0: # compute quality of all generalizations generalizations = combinations(selectors, len(selectors) - 1) for sels in generalizations: sgd = ps.Conjunction(list(sels)) (stats_sg, stats_prev) = self.calculate_statistics(sgd, target, data) max_stats = self.get_max(max_stats, stats_sg, stats_prev) return (stats_subgroup, max_stats)
[docs] def evaluate(self, subgroup, target, data, statistics=None): raise NotImplementedError
[docs] def get_max(self, *args): raise NotImplementedError