"""
Created on 28.04.2016
@author: lemmerfn
"""
from abc import ABC
from collections import namedtuple
from itertools import combinations
import numpy as np
import pysubgroup as ps
[docs]class AbstractInterestingnessMeasure(ABC):
# pylint: disable=no-member
[docs] def ensure_statistics(self, subgroup, target, data, statistics=None):
if not self.has_constant_statistics:
self.calculate_constant_statistics(data, target)
if any(not hasattr(statistics, attr) for attr in self.required_stat_attrs):
if getattr(subgroup, "statistics", False):
return subgroup.statistics
return self.calculate_statistics(subgroup, target, data, statistics)
return statistics
# pylint: enable=no-member
# def optimistic_estimate_from_dataset(
# self,
# data,
# subgroup,
# weighting_attribute=None): #pylint: disable=unused-argument
# return float("inf")
[docs]class BoundedInterestingnessMeasure(AbstractInterestingnessMeasure):
pass
# @abstractmethod
# def optimistic_estimate_from_dataset(
# self, data, subgroup, weighting_attribute=None):
# pass
#####
# FIX ME: This is currently not working anymore
#####
[docs]class CombinedInterestingnessMeasure(BoundedInterestingnessMeasure):
def __init__(self, measures, weights=None):
self.measures = measures
if weights is None:
weights = [1] * len(measures)
assert len(weights) == len(measures)
self.weights = weights
[docs] def calculate_constant_statistics(self, data, target):
pass
[docs] def calculate_statistics(self, subgroup, target, data, cached_statistics=None):
pass
[docs] def evaluate(self, subgroup, target, data, statistics=None):
# FIX USE of constant statistics
return np.dot(
[m.evaluate(subgroup, target, data, None) for m in self.measures],
self.weights,
)
[docs] def optimistic_estimate(self, subgroup, target, data, statistics=None):
# FIX USE of constant statistics
return np.dot(
[
m.optimistic_estimate(subgroup, target, data, None)
for m in self.measures
],
self.weights,
)
[docs] def evaluate_from_statistics(
self,
instances_dataset,
positives_dataset,
instances_subgroup,
positives_subgroup,
):
return np.dot(
[
m.evaluate_from_statistics(
instances_dataset,
positives_dataset,
instances_subgroup,
positives_subgroup,
)
for m in self.measures
],
self.weights,
)
# def optimistic_estimate_from_statistics(
# self,
# instances_dataset,
# positives_dataset,
# instances_subgroup,
# positives_subgroup):
# return np.dot(
# [m.evaluate_from_statistics(
# instances_dataset,
# positives_dataset,
# instances_subgroup,
# positives_subgroup)
# for m in self.measures],
# self.weights)
##########
# Filter
##########
[docs]def unique_attributes(result_set, data):
result = []
used_attributes = []
for q, sg in result_set:
atts = sg.subgroup_description.get_attributes()
if atts not in used_attributes or all(
ps.is_categorical_attribute(data, x) for x in atts
):
result.append((q, sg))
used_attributes.append(atts)
return result
[docs]def minimum_statistic_filter(result_set, statistic, minimum, data):
result = []
for q, sg in result_set:
if len(sg.statistics) == 0:
sg.calculate_statistics(data)
if sg.statistics[statistic] >= minimum:
result.append((q, sg))
return result
[docs]def minimum_quality_filter(result_set, minimum):
result = []
for q, sg in result_set:
if q >= minimum:
result.append((q, sg))
return result
[docs]def maximum_statistic_filter(result_set, statistic, maximum):
result = []
for q, sg in result_set:
if sg.statistics[statistic] <= maximum:
result.append((q, sg))
return result
[docs]def overlap_filter(result_set, data, similarity_level=0.9):
result = []
result_sgs = []
for q, sg in result_set:
if not overlaps_list(sg, result_sgs, data, similarity_level):
result_sgs.append(sg)
result.append((q, sg))
return result
[docs]def overlaps_list(sg, list_of_sgs, data, similarity_level=0.9):
for anotherSG in list_of_sgs:
if ps.overlap(sg, anotherSG, data) > similarity_level:
return True
return False
[docs]class CountCallsInterestingMeasure(BoundedInterestingnessMeasure):
def __init__(self, qf):
self.qf = qf
self.calls = 0
[docs] def calculate_statistics(self, sg, target, data, statistics=None):
self.calls += 1
return self.qf.calculate_statistics(sg, target, data, statistics)
def __getattr__(self, name):
return getattr(self.qf, name)
def __hasattr__(self, name):
return hasattr(self.qf, name)
#####
# GeneralizationAware Interestingness Measures
#####
[docs]class GeneralizationAwareQF(AbstractInterestingnessMeasure):
ga_tuple = namedtuple("ga_tuple", ["subgroup_quality", "generalisation_quality"])
def __init__(self, qf):
self.qf = qf
# this cache maps the representation of descriptions to tuples
# the first entry is the quality and the second one is
# the largest quality of all its predessors
self.cache = {}
self.has_constant_statistics = False
self.required_stat_attrs = ["subgroup_quality", "generalisation_quality"]
self.q0 = 0
[docs] def calculate_constant_statistics(self, data, target):
self.cache = {}
self.qf.calculate_constant_statistics(data, target)
self.q0 = self.qf.evaluate(slice(None), target, data)
self.has_constant_statistics = self.qf.has_constant_statistics
[docs] def calculate_statistics(self, subgroup, target, data, statistics=None):
sg_repr = repr(subgroup)
if sg_repr in self.cache:
return GeneralizationAwareQF.ga_tuple(*self.cache[sg_repr])
(q_sg, q_prev) = self.get_qual_and_previous_qual(subgroup, target, data)
self.cache[sg_repr] = (q_sg, q_prev)
return GeneralizationAwareQF.ga_tuple(q_sg, q_prev)
[docs] def get_qual_and_previous_qual(self, subgroup, target, data):
q_subgroup = self.qf.evaluate(subgroup, target, data)
max_q = 0
selectors = subgroup.selectors
if len(selectors) > 0:
# compute quality of all generalizations
generalizations = combinations(selectors, len(selectors) - 1)
for sels in generalizations:
sgd = ps.Conjunction(list(sels))
(q_sg, q_prev) = self.calculate_statistics(sgd, target, data)
max_q = max(max_q, q_sg, q_prev)
return (q_subgroup, max_q)
[docs] def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.subgroup_quality - statistics.generalisation_quality
#####
# GeneralizationAware Interestingness Measures
#####
[docs]class GeneralizationAwareQF_stats(AbstractInterestingnessMeasure):
ga_tuple = namedtuple("ga_stats_tuple", ["subgroup_stats", "generalisation_stats"])
def __init__(self, qf):
self.qf = qf
# this cache maps the representation of descriptions to tuples
# the first entry is the quality and the second one is
# the largest quality of all its predecessors
self.cache = {}
self.has_constant_statistics = False
self.required_stat_attrs = GeneralizationAwareQF_stats.ga_tuple._fields
self.stats0 = None
[docs] def calculate_constant_statistics(self, data, target):
self.cache = {}
self.qf.calculate_constant_statistics(data, target)
self.stats0 = self.qf.calculate_statistics(slice(None), target, data)
self.has_constant_statistics = self.qf.has_constant_statistics
[docs] def calculate_statistics(self, subgroup, target, data, statistics=None):
sg_repr = repr(subgroup)
if sg_repr in self.cache:
return GeneralizationAwareQF_stats.ga_tuple(*self.cache[sg_repr])
(stats_sg, stats_prev) = self.get_stats_and_previous_stats(
subgroup, target, data
)
self.cache[sg_repr] = (stats_sg, stats_prev)
return GeneralizationAwareQF_stats.ga_tuple(stats_sg, stats_prev)
[docs] def get_stats_and_previous_stats(self, subgroup, target, data):
stats_subgroup = self.qf.calculate_statistics(subgroup, target, data)
max_stats = self.stats0
selectors = subgroup.selectors
if len(selectors) > 0:
# compute quality of all generalizations
generalizations = combinations(selectors, len(selectors) - 1)
for sels in generalizations:
sgd = ps.Conjunction(list(sels))
(stats_sg, stats_prev) = self.calculate_statistics(sgd, target, data)
max_stats = self.get_max(max_stats, stats_sg, stats_prev)
return (stats_subgroup, max_stats)
[docs] def evaluate(self, subgroup, target, data, statistics=None):
raise NotImplementedError
[docs] def get_max(self, *args):
raise NotImplementedError