"""
Created on 29.09.2017
@author: lemmerfn
"""
import numbers
from collections import namedtuple
from functools import total_ordering
import numpy as np
import pysubgroup as ps
[docs]
@total_ordering
class NumericTarget:
statistic_types = (
"size_sg",
"size_dataset",
"mean_sg",
"mean_dataset",
"std_sg",
"std_dataset",
"median_sg",
"median_dataset",
"max_sg",
"max_dataset",
"min_sg",
"min_dataset",
"mean_lift",
"median_lift",
)
def __init__(self, target_variable):
self.target_variable = target_variable
def __repr__(self):
return "T: " + str(self.target_variable)
def __eq__(self, other):
return self.__dict__ == other.__dict__ # pragma: no cover
def __lt__(self, other):
return str(self) < str(other) # pragma: no cover
[docs]
def get_attributes(self):
return [self.target_variable]
[docs]
def get_base_statistics(self, subgroup, data):
cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(data), data)
all_target_values = data[self.target_variable]
sg_target_values = all_target_values[cover_arr]
instances_dataset = len(data)
instances_subgroup = size_sg
mean_sg = np.mean(sg_target_values)
mean_dataset = np.mean(all_target_values)
return (instances_dataset, mean_dataset, instances_subgroup, mean_sg)
[docs]
def calculate_statistics(self, subgroup, data, cached_statistics=None):
if cached_statistics is None or not isinstance(cached_statistics, dict):
statistics = {}
elif all(k in cached_statistics for k in NumericTarget.statistic_types):
return cached_statistics
else:
statistics = cached_statistics
cover_arr, _ = ps.get_cover_array_and_size(subgroup, len(data), data)
all_target_values = data[self.target_variable].to_numpy()
sg_target_values = all_target_values[cover_arr]
statistics["size_sg"] = len(sg_target_values)
statistics["size_dataset"] = len(data)
statistics["mean_sg"] = np.mean(sg_target_values)
statistics["mean_dataset"] = np.mean(all_target_values)
statistics["std_sg"] = np.std(sg_target_values)
statistics["std_dataset"] = np.std(all_target_values)
statistics["median_sg"] = np.median(sg_target_values)
statistics["median_dataset"] = np.median(all_target_values)
statistics["max_sg"] = np.max(sg_target_values)
statistics["max_dataset"] = np.max(all_target_values)
statistics["min_sg"] = np.min(sg_target_values)
statistics["min_dataset"] = np.min(all_target_values)
statistics["mean_lift"] = statistics["mean_sg"] / statistics["mean_dataset"]
statistics["median_lift"] = (
statistics["median_sg"] / statistics["median_dataset"]
)
return statistics
[docs]
def read_mean(tpl):
return tpl.mean
[docs]
class StandardQFNumeric(ps.BoundedInterestingnessMeasure):
tpl = namedtuple("StandardQFNumeric_parameters", ("size_sg", "mean", "estimate"))
mean_tpl = tpl
median_tpl = namedtuple(
"StandardQFNumeric_median_parameters", ("size_sg", "median", "estimate")
)
[docs]
@staticmethod
def standard_qf_numeric(a, _, mean_dataset, instances_subgroup, mean_sg):
return instances_subgroup**a * (mean_sg - mean_dataset)
def __init__(self, a, invert=False, estimator="default", centroid="mean"):
if not isinstance(a, numbers.Number):
raise ValueError(f"a is not a number. Received a={a}")
self.a = a
self.invert = invert
self.dataset_statistics = None
self.all_target_values = None
self.has_constant_statistics = False
if centroid == "median":
if estimator == "default":
estimator = "max"
assert estimator in (
"max",
"order",
), "For median only estimator = max or order are possible"
self.required_stat_attrs = ("size_sg", "median")
self.agg = np.median
self.tpl = StandardQFNumeric.median_tpl
self.read_centroid = read_median
elif centroid == "sorted_median":
if estimator == "default":
estimator = "max"
assert estimator in (
"max",
"order",
), "For median only estimator = max or order are possible"
self.required_stat_attrs = ("size_sg", "median")
self.agg = calc_sorted_median
self.tpl = StandardQFNumeric.median_tpl
self.read_centroid = read_median
elif centroid == "mean":
if estimator == "default":
estimator = "sum"
self.required_stat_attrs = ("size_sg", "mean")
self.agg = np.mean
self.tpl = StandardQFNumeric.mean_tpl
self.read_centroid = read_mean
else:
raise ValueError(
f"centroid was {centroid} which is not in (median, sorted_median, mean)"
)
if estimator == "sum":
self.estimator = StandardQFNumeric.Summation_Estimator(self)
elif estimator == "max":
self.estimator = StandardQFNumeric.Max_Estimator(self)
elif estimator == "average":
self.estimator = StandardQFNumeric.Max_Estimator(self)
elif estimator == "order":
if centroid == "mean":
self.estimator = StandardQFNumeric.MeanOrdering_Estimator(self)
else:
raise NotImplementedError(
"Order estimation is not implemented for median qf"
)
else:
raise ValueError(
"estimator is not one of the following: "
+ str(["sum", "average", "order"])
)
[docs]
def calculate_constant_statistics(self, data, target):
data = self.estimator.get_data(data, target)
self.all_target_values = data[target.target_variable].to_numpy()
target_centroid = self.agg(self.all_target_values)
data_size = len(data)
self.dataset_statistics = self.tpl(data_size, target_centroid, None)
self.estimator.calculate_constant_statistics(data, target)
self.has_constant_statistics = True
[docs]
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
return StandardQFNumeric.standard_qf_numeric(
self.a,
dataset.size_sg,
self.read_centroid(dataset),
statistics.size_sg,
self.read_centroid(statistics),
)
[docs]
def calculate_statistics(
self, subgroup, target, data, statistics=None
): # pylint: disable=unused-argument
cover_arr, sg_size = ps.get_cover_array_and_size(
subgroup, len(self.all_target_values), data
)
sg_centroid = 0
sg_target_values = 0
if sg_size > 0:
sg_target_values = self.all_target_values[cover_arr]
sg_centroid = self.agg(sg_target_values)
estimate = self.estimator.get_estimate(
subgroup, sg_size, sg_centroid, cover_arr, sg_target_values
)
else:
estimate = float("-inf")
return self.tpl(sg_size, sg_centroid, estimate)
[docs]
def optimistic_estimate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.estimate
[docs]
class Summation_Estimator:
r"""\
This estimator calculates the optimistic estimate as a hyppothetical subgroup\
which contains only instances with value greater than the dataset mean and\
is of maximal size.
.. math::
oe(sg) = \sum_{x \in sg, T(x)>0} (T(sg) - \mu_0)
From Florian Lemmerich's Dissertation [section 4.2.2.1, Theorem 2 (page 81)]
"""
def __init__(self, qf):
self.qf = qf
self.indices_greater_centroid = None
self.target_values_greater_centroid = None
[docs]
def get_data(self, data, target): # pylint: disable=unused-argument
return data
[docs]
def calculate_constant_statistics(
self, data, target
): # pylint: disable=unused-argument
self.indices_greater_centroid = (
self.qf.all_target_values
> self.qf.read_centroid(self.qf.dataset_statistics)
)
self.target_values_greater_centroid = (
self.qf.all_target_values
) # [self.indices_greater_mean]
[docs]
def get_estimate(
self, subgroup, sg_size, sg_centroid, cover_arr, _
): # pylint: disable=unused-argument
larger_than_centroid = self.target_values_greater_centroid[cover_arr][
self.indices_greater_centroid[cover_arr]
]
size_greater_centroid = len(larger_than_centroid)
sum_greater_centroid = np.sum(larger_than_centroid)
return sum_greater_centroid - size_greater_centroid * self.qf.read_centroid(
self.qf.dataset_statistics
)
[docs]
class Max_Estimator:
r"""
This estimator calculates the optimistic estimate
.. math::
oe(sg) = n_{>\mu_0}^a (T^{\max}(sg) - \mu_0)
From Florian Lemmerich's Dissertation [section 4.2.2.1, Theorem 4 (page 82)]
"""
def __init__(self, qf):
self.qf = qf
self.indices_greater_centroid = None
self.target_values_greater_centroid = None
[docs]
def get_data(self, data, target): # pylint: disable=unused-argument
return data
[docs]
def calculate_constant_statistics(
self, data, target
): # pylint: disable=unused-argument
self.indices_greater_centroid = (
self.qf.all_target_values
> self.qf.read_centroid(self.qf.dataset_statistics)
)
self.target_values_greater_centroid = self.qf.all_target_values
[docs]
def get_estimate(
self, subgroup, sg_size, sg_centroid, cover_arr, _
): # pylint: disable=unused-argument
larger_than_centroid = self.target_values_greater_centroid[cover_arr][
self.indices_greater_centroid[cover_arr]
]
size_greater_centroid = len(larger_than_centroid)
if size_greater_centroid == 0:
return -np.inf
max_greater_centroid = np.max(larger_than_centroid)
return size_greater_centroid**self.qf.a * (
max_greater_centroid - self.qf.read_centroid(self.qf.dataset_statistics)
)
[docs]
class MeanOrdering_Estimator:
def __init__(self, qf):
self.qf = qf
self.indices_greater_centroid = None
self._get_estimate = self.get_estimate_numpy
self.use_numba = True
self.numba_in_place = False
[docs]
def get_data(self, data, target):
data.sort_values(target.get_attributes()[0], ascending=False, inplace=True)
return data
[docs]
def calculate_constant_statistics(
self, data, target
): # pylint: disable=unused-argument
if not self.use_numba or self.numba_in_place:
return
try:
from numba import njit # pylint: disable=import-outside-toplevel
# print('StandardQf_Numeric: Using numba for speedup')
except ImportError: # pragma: no cover
return
@njit
def estimate_numba(values_sg, a, mean_dataset): # pragma: no cover
n = 1
sum_values = 0
max_value = -(10**10)
for val in values_sg:
sum_values += val
mean_sg = sum_values / n
quality = n**a * (mean_sg - mean_dataset)
if quality > max_value:
max_value = quality
n += 1
return max_value
self._get_estimate = estimate_numba
self.numba_in_place = True
[docs]
def get_estimate(
self, subgroup, sg_size, sg_mean, cover_arr, target_values_sg
): # pylint: disable=unused-argument
if self.numba_in_place:
return self._get_estimate(
target_values_sg, self.qf.a, self.qf.dataset_statistics.mean
)
else:
return self._get_estimate(
target_values_sg, self.qf.a, self.qf.dataset_statistics.mean
)
[docs]
def get_estimate_numpy(self, values_sg, _, mean_dataset):
target_values_cs = np.cumsum(values_sg)
sizes = np.arange(1, len(target_values_cs) + 1)
mean_values = target_values_cs / sizes
stats = StandardQFNumeric.mean_tpl(sizes, mean_values, mean_dataset)
qualities = self.qf.evaluate(None, None, None, stats)
optimistic_estimate = np.max(qualities)
return optimistic_estimate
[docs]
class StandardQFNumericTscore(ps.BoundedInterestingnessMeasure):
tpl = namedtuple(
"StandardQFNumericTscore_parameters", ("size_sg", "mean", "std", "estimate")
)
[docs]
@staticmethod
def t_score(mean_dataset, instances_subgroup, mean_sg, std_sg):
if std_sg == 0:
return 0
else:
return (instances_subgroup**0.5 * (mean_sg - mean_dataset)) / std_sg
def __init__(self, invert=False):
self.invert = invert
self.required_stat_attrs = ("size_sg", "mean", "std")
self.dataset_statistics = None
self.all_target_values = None
self.has_constant_statistics = False
[docs]
def calculate_constant_statistics(self, data, target):
self.all_target_values = data[target.target_variable].to_numpy()
target_mean = np.mean(self.all_target_values)
target_std = np.std(self.all_target_values)
data_size = len(data)
self.dataset_statistics = StandardQFNumericTscore.tpl(
data_size, target_mean, target_std, np.inf
)
self.has_constant_statistics = True
[docs]
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
dataset = self.dataset_statistics
return StandardQFNumericTscore.t_score(
dataset.mean,
statistics.size_sg,
statistics.mean,
statistics.std,
)
[docs]
def calculate_statistics(
self, subgroup, target, data, statistics=None
): # pylint: disable=unused-argument
cover_arr, sg_size = ps.get_cover_array_and_size(
subgroup, len(self.all_target_values), data
)
sg_mean = np.array([0])
sg_std = np.array([0])
sg_target_values = 0
if sg_size > 0:
sg_target_values = self.all_target_values[cover_arr]
sg_mean = np.mean(sg_target_values)
sg_std = np.std(sg_target_values)
estimate = np.inf
else:
estimate = float("-inf")
return StandardQFNumericTscore.tpl(sg_size, sg_mean, sg_std, estimate)
[docs]
def optimistic_estimate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
return statistics.estimate
[docs]
class GeneralizationAware_StandardQFNumeric(ps.GeneralizationAwareQF_stats):
def __init__(self, a, invert=False, estimator="default", centroid="mean"):
super().__init__(
StandardQFNumeric(a, invert=invert, estimator=estimator, centroid=centroid)
)
[docs]
def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
sg_stats = statistics.subgroup_stats
general_stats = statistics.generalisation_stats
if sg_stats.size_sg == 0:
return np.nan
read_centroid = self.qf.read_centroid
return (sg_stats.size_sg / self.stats0.size_sg) ** self.qf.a * (
read_centroid(sg_stats) - read_centroid(general_stats)
)
[docs]
def aggregate_statistics(self, stats_subgroup, list_of_pairs):
read_centroid = self.qf.read_centroid
if len(list_of_pairs) == 0:
return stats_subgroup
max_centroid = 0.0
max_stats = None
for stat, agg_stat in list_of_pairs:
if stat.size_sg == 0:
continue
centroid = max(read_centroid(agg_stat), read_centroid(stat))
if centroid > max_centroid:
max_centroid = centroid
max_stats = stat
return max_stats