Source code for pysubgroup.model_target

from collections import namedtuple

import numpy as np

import pysubgroup as ps

beta_tuple = namedtuple("beta_tuple", ["beta", "size_sg"])


[docs]class EMM_Likelihood(ps.AbstractInterestingnessMeasure): tpl = namedtuple( "EMM_Likelihood", ["model_params", "subgroup_likelihood", "inverse_likelihood", "size"], ) def __init__(self, model): self.model = model self.has_constant_statistics = False self.required_stat_attrs = EMM_Likelihood.tpl._fields self.data_size = None
[docs] def calculate_constant_statistics(self, data, target): self.model.calculate_constant_statistics(data, target) self.data_size = len(data) self.has_constant_statistics = True
[docs] def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument cover_arr, sg_size = ps.get_cover_array_and_size(subgroup, self.data_size, data) params = self.model.fit(cover_arr, data) return self.get_tuple(sg_size, params, cover_arr)
[docs] def get_tuple(self, sg_size, params, cover_arr): # numeric stability? all_likelihood = self.model.likelihood( params, np.ones(self.data_size, dtype=bool) ) sg_likelihood_sum = np.sum(all_likelihood[cover_arr]) total_likelihood_sum = np.sum(all_likelihood) dataset_average = np.nan if (self.data_size - sg_size) > 0: dataset_average = (total_likelihood_sum - sg_likelihood_sum) / ( self.data_size - sg_size ) sg_average = np.nan if sg_size > 0: sg_average = sg_likelihood_sum / sg_size return EMM_Likelihood.tpl(params, sg_average, dataset_average, sg_size)
[docs] def evaluate(self, subgroup, target, data, statistics=None): statistics = self.ensure_statistics(subgroup, target, data, statistics) # numeric stability? return statistics.subgroup_likelihood - statistics.inverse_likelihood
[docs] def gp_get_params(self, cover_arr, v): params = self.model.gp_get_params(v) sg_size = params.size_sg return self.get_tuple(sg_size, params, cover_arr)
@property def gp_requires_cover_arr(self): return True def __getattr__(self, name): return getattr(self.model, name)
[docs]class PolyRegression_ModelClass: def __init__(self, x_name="x", y_name="y", degree=1): self.x_name = x_name self.y_name = y_name if degree != 1: raise ValueError("Currently only degree == 1 is supported") self.degree = degree self.x = None self.y = None self.has_constant_statistics = True super().__init__()
[docs] def calculate_constant_statistics( self, data, target ): # pylint: disable=unused-argument self.x = data[self.x_name].to_numpy() self.y = data[self.y_name].to_numpy() self.has_constant_statistics = True
[docs] @staticmethod def gp_merge(u, v): v0 = v[0] u0 = u[0] if v0 == 0 or u0 == 0: d = 0 else: d = v0 * u0 / (v0 + u0) * (v[1] / v0 - u[1] / u0) * (v[2] / v0 - u[2] / u0) u += v u[3] += d
[docs] def gp_get_null_vector(self): return np.zeros(5)
[docs] def gp_get_stats(self, row_index): x = self.x[row_index] return np.array([1, x, self.y[row_index], 0, x * x])
[docs] def gp_get_params(self, v): size = v[0] if size < self.degree: return beta_tuple(np.full(self.degree + 1, np.nan), size) v1 = v[1] slope = v[0] * v[3] / (v[0] * v[4] - v1 * v1) intersept = v[2] / v[0] - slope * v[1] / v[0] return beta_tuple(np.array([slope, intersept]), v[0])
[docs] def gp_to_str(self, stats): return " ".join(map(str, stats))
[docs] def gp_size_sg(self, stats): return stats[0]
@property def gp_requires_cover_arr(self): return False
[docs] def fit(self, subgroup, data=None): cover_arr, size = ps.get_cover_array_and_size(subgroup, len(self.x), data) if size <= self.degree + 1: return beta_tuple(np.full(self.degree + 1, np.nan), size) return beta_tuple( np.polyfit(self.x[cover_arr], self.y[cover_arr], deg=self.degree), size )
[docs] def likelihood(self, stats, sg): from scipy.stats import norm # pylint: disable=import-outside-toplevel if any(np.isnan(stats.beta)): return np.full(self.x[sg].shape, np.nan) return norm.pdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])
[docs] def loglikelihood(self, stats, sg): from scipy.stats import norm # pylint: disable=import-outside-toplevel return norm.logpdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg])