from collections import namedtuple
import numpy as np
import pysubgroup as ps
beta_tuple = namedtuple("beta_tuple", ["beta", "size_sg"])
[docs]class EMM_Likelihood(ps.AbstractInterestingnessMeasure):
tpl = namedtuple(
"EMM_Likelihood",
["model_params", "subgroup_likelihood", "inverse_likelihood", "size"],
)
def __init__(self, model):
self.model = model
self.has_constant_statistics = False
self.required_stat_attrs = EMM_Likelihood.tpl._fields
self.data_size = None
[docs] def calculate_constant_statistics(self, data, target):
self.model.calculate_constant_statistics(data, target)
self.data_size = len(data)
self.has_constant_statistics = True
[docs] def calculate_statistics(
self, subgroup, target, data, statistics=None
): # pylint: disable=unused-argument
cover_arr, sg_size = ps.get_cover_array_and_size(subgroup, self.data_size, data)
params = self.model.fit(cover_arr, data)
return self.get_tuple(sg_size, params, cover_arr)
[docs] def get_tuple(self, sg_size, params, cover_arr):
# numeric stability?
all_likelihood = self.model.likelihood(
params, np.ones(self.data_size, dtype=bool)
)
sg_likelihood_sum = np.sum(all_likelihood[cover_arr])
total_likelihood_sum = np.sum(all_likelihood)
dataset_average = np.nan
if (self.data_size - sg_size) > 0:
dataset_average = (total_likelihood_sum - sg_likelihood_sum) / (
self.data_size - sg_size
)
sg_average = np.nan
if sg_size > 0:
sg_average = sg_likelihood_sum / sg_size
return EMM_Likelihood.tpl(params, sg_average, dataset_average, sg_size)
[docs] def evaluate(self, subgroup, target, data, statistics=None):
statistics = self.ensure_statistics(subgroup, target, data, statistics)
# numeric stability?
return statistics.subgroup_likelihood - statistics.inverse_likelihood
[docs] def gp_get_params(self, cover_arr, v):
params = self.model.gp_get_params(v)
sg_size = params.size_sg
return self.get_tuple(sg_size, params, cover_arr)
@property
def gp_requires_cover_arr(self):
return True
def __getattr__(self, name):
return getattr(self.model, name)
[docs]class PolyRegression_ModelClass:
def __init__(self, x_name="x", y_name="y", degree=1):
self.x_name = x_name
self.y_name = y_name
if degree != 1:
raise ValueError("Currently only degree == 1 is supported")
self.degree = degree
self.x = None
self.y = None
self.has_constant_statistics = True
super().__init__()
[docs] def calculate_constant_statistics(
self, data, target
): # pylint: disable=unused-argument
self.x = data[self.x_name].to_numpy()
self.y = data[self.y_name].to_numpy()
self.has_constant_statistics = True
[docs] @staticmethod
def gp_merge(u, v):
v0 = v[0]
u0 = u[0]
if v0 == 0 or u0 == 0:
d = 0
else:
d = v0 * u0 / (v0 + u0) * (v[1] / v0 - u[1] / u0) * (v[2] / v0 - u[2] / u0)
u += v
u[3] += d
[docs] def gp_get_null_vector(self):
return np.zeros(5)
[docs] def gp_get_stats(self, row_index):
x = self.x[row_index]
return np.array([1, x, self.y[row_index], 0, x * x])
[docs] def gp_get_params(self, v):
size = v[0]
if size < self.degree:
return beta_tuple(np.full(self.degree + 1, np.nan), size)
v1 = v[1]
slope = v[0] * v[3] / (v[0] * v[4] - v1 * v1)
intersept = v[2] / v[0] - slope * v[1] / v[0]
return beta_tuple(np.array([slope, intersept]), v[0])
[docs] def gp_to_str(self, stats):
return " ".join(map(str, stats))
[docs] def gp_size_sg(self, stats):
return stats[0]
@property
def gp_requires_cover_arr(self):
return False
[docs] def fit(self, subgroup, data=None):
cover_arr, size = ps.get_cover_array_and_size(subgroup, len(self.x), data)
if size <= self.degree + 1:
return beta_tuple(np.full(self.degree + 1, np.nan), size)
return beta_tuple(
np.polyfit(self.x[cover_arr], self.y[cover_arr], deg=self.degree), size
)
[docs] def likelihood(self, stats, sg):
from scipy.stats import norm # pylint: disable=import-outside-toplevel
if any(np.isnan(stats.beta)):
return np.full(self.x[sg].shape, np.nan)
return norm.pdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])
[docs] def loglikelihood(self, stats, sg):
from scipy.stats import norm # pylint: disable=import-outside-toplevel
return norm.logpdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])