"""
Created on 29.04.2016
@author: lemmerfn
"""
import copy
import warnings
from collections import Counter, defaultdict, namedtuple
from heapq import heappop, heappush
from itertools import chain, combinations
from math import factorial
import numpy as np
import pysubgroup as ps
[docs]class SubgroupDiscoveryTask:
"""
Capsulates all parameters required to perform standard subgroup discovery
"""
def __init__(
self,
data,
target,
search_space,
qf,
result_set_size=10,
depth=3,
min_quality=float("-inf"),
constraints=None,
):
self.data = data
self.target = target
self.search_space = search_space
self.qf = qf
self.result_set_size = result_set_size
self.depth = depth
self.min_quality = min_quality
if constraints is None:
constraints = []
self.constraints = constraints
self.constraints_monotone = [
constr for constr in constraints if constr.is_monotone
]
self.constraints_other = [
constr for constr in constraints if not constr.is_monotone
]
[docs]def constraints_satisfied(constraints, subgroup, statistics=None, data=None):
return all(
constr.is_satisfied(subgroup, statistics, data) for constr in constraints
)
[docs]class Apriori:
def __init__(
self, representation_type=None, combination_name="Conjunction", use_numba=True
):
self.combination_name = combination_name
if representation_type is None:
representation_type = ps.BitSetRepresentation
self.representation_type = representation_type
self.use_vectorization = True
self.optimistic_estimate_name = "optimistic_estimate"
self.next_level = self.get_next_level
self.compiled_func = None
if use_numba:
try:
# TODO: used?
import numba # pylint: disable=unused-import, import-outside-toplevel # noqa: F401, E501
self.next_level = self.get_next_level_numba
print("Apriori: Using numba for speedup")
except ImportError:
pass
[docs] def get_next_level_candidates(self, task, result, next_level_candidates):
promising_candidates = []
optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name)
for sg in next_level_candidates:
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
ps.add_if_required(
result,
sg,
task.qf.evaluate(sg, statistics, task.target, task.data),
task,
statistics=statistics,
)
optimistic_estimate = optimistic_estimate_function(
sg, task.target, task.data, statistics
)
if optimistic_estimate >= ps.minimum_required_quality(result, task):
if ps.constraints_satisfied(
task.constraints_monotone, sg, statistics, task.data
):
promising_candidates.append((optimistic_estimate, sg.selectors))
min_quality = ps.minimum_required_quality(result, task)
promising_candidates = [
selectors
for estimate, selectors in promising_candidates
if estimate > min_quality
]
return promising_candidates
[docs] def get_next_level_candidates_vectorized(self, task, result, next_level_candidates):
promising_candidates = []
statistics = []
optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name)
for sg in next_level_candidates:
statistics.append(task.qf.calculate_statistics(sg, task.target, task.data))
tpl_class = statistics[0].__class__
vec_statistics = tpl_class._make(np.array(tpl) for tpl in zip(*statistics))
qualities = task.qf.evaluate(None, task.target, task.data, vec_statistics)
optimistic_estimates = optimistic_estimate_function(
None, None, None, vec_statistics
)
for sg, quality, stats in zip(next_level_candidates, qualities, statistics):
ps.add_if_required(result, sg, quality, task, statistics=stats)
min_quality = ps.minimum_required_quality(result, task)
for sg, optimistic_estimate in zip(next_level_candidates, optimistic_estimates):
if optimistic_estimate >= min_quality:
promising_candidates.append(sg.selectors)
return promising_candidates
[docs] def get_next_level_numba(self, promising_candidates):
from numba import njit # pylint: disable=import-error, import-outside-toplevel
if not hasattr(self, "compiled_func") or self.compiled_func is None:
@njit
def getNewCandidates(candidates, hashes): # pragma: no cover
result = []
for i in range(len(candidates) - 1):
for j in range(i + 1, len(candidates)):
if hashes[i] == hashes[j]:
if np.all(candidates[i, :-1] == candidates[j, :-1]):
result.append((i, j))
return result
self.compiled_func = getNewCandidates
all_selectors = Counter(chain.from_iterable(promising_candidates))
all_selectors_ids = {selector: i for i, selector in enumerate(all_selectors)}
promising_candidates_selector_ids = [
tuple(all_selectors_ids[sel] for sel in selectors)
for selectors in promising_candidates
]
arr = np.array(promising_candidates_selector_ids, dtype=int)
print(len(arr))
hashes = np.array(
[hash(tuple(x[:-1])) for x in promising_candidates_selector_ids],
dtype=np.int64,
)
candidates_int = self.compiled_func(arr, hashes)
return list(
(*promising_candidates[i], promising_candidates[j][-1])
for i, j in candidates_int
)
[docs] def get_next_level(self, promising_candidates):
by_prefix_dict = defaultdict(list)
for sg in promising_candidates:
by_prefix_dict[tuple(sg[:-1])].append(sg[-1])
return [
prefix + real_suffix
for prefix, suffixes in by_prefix_dict.items()
for real_suffix in combinations(sorted(suffixes), 2)
]
[docs] def execute(self, task):
if not isinstance(task.qf, ps.BoundedInterestingnessMeasure):
raise RuntimeWarning("Quality function is unbounded, long runtime expected")
task.qf.calculate_constant_statistics(task.data, task.target)
with self.representation_type(task.data, task.search_space) as representation:
combine_selectors = getattr(representation.__class__, self.combination_name)
result = []
# init the first level
next_level_candidates = []
for sel in task.search_space:
sg = combine_selectors([sel])
if ps.constraints_satisfied(
task.constraints_monotone, sg, None, task.data
):
next_level_candidates.append(sg)
# level-wise search
depth = 1
while next_level_candidates:
# check sgs from the last level
if self.use_vectorization:
promising_candidates = self.get_next_level_candidates_vectorized(
task, result, next_level_candidates
)
else:
promising_candidates = self.get_next_level_candidates(
task, result, next_level_candidates
)
if depth == task.depth:
break
next_level_candidates_no_pruning = self.next_level(promising_candidates)
# select those selectors and build a subgroup from them
# for which all subsets of length depth (=candidate length -1)
# are in the set of promising candidates
set_promising_candidates = set(tuple(p) for p in promising_candidates)
next_level_candidates = [
combine_selectors(selectors)
for selectors in next_level_candidates_no_pruning
if all(
(subset in set_promising_candidates)
for subset in combinations(selectors, depth)
)
]
depth = depth + 1
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]class BestFirstSearch:
[docs] def execute(self, task):
result = []
queue = [(float("-inf"), ps.Conjunction([]))]
operator = ps.StaticSpecializationOperator(task.search_space)
task.qf.calculate_constant_statistics(task.data, task.target)
while queue:
q, old_description = heappop(queue)
q = -q
if not q > ps.minimum_required_quality(result, task):
break
for candidate_description in operator.refinements(old_description):
sg = candidate_description
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
ps.add_if_required(
result,
sg,
task.qf.evaluate(sg, task.target, task.data, statistics),
task,
statistics=statistics,
)
if len(candidate_description) < task.depth:
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
# compute refinements and fill the queue
if optimistic_estimate >= ps.minimum_required_quality(result, task):
if ps.constraints_satisfied(
task.constraints_monotone,
candidate_description,
statistics,
task.data,
):
heappush(
queue, (-optimistic_estimate, candidate_description)
)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]class GeneralisingBFS: # pragma: no cover
def __init__(self):
self.alpha = 1.10
self.discarded = [0, 0, 0, 0, 0, 0, 0]
self.refined = [0, 0, 0, 0, 0, 0, 0]
[docs] def execute(self, task):
result = []
queue = []
operator = ps.StaticGeneralizationOperator(task.search_space)
# init the first level
for sel in task.search_space:
queue.append((float("-inf"), ps.Disjunction([sel])))
task.qf.calculate_constant_statistics(task.data, task.target)
while queue:
q, candidate_description = heappop(queue)
q = -q
if q < ps.minimum_required_quality(result, task):
break
sg = candidate_description
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality = task.qf.evaluate(sg, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
qual = ps.minimum_required_quality(result, task)
if (quality, sg) in result:
new_queue = []
for q_tmp, c_tmp in queue:
if (-q_tmp) > qual:
heappush(new_queue, (q_tmp, c_tmp))
queue = new_queue
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
# else:
# ps.add_if_required(
# result, sg, task.qf.evaluate_from_dataset(task.data, sg), task)
# optimistic_estimate = task.qf.optimistic_generalisation_from_dataset(
# task.data, sg) if qf_is_bounded else float("inf")
# compute refinements and fill the queue
if len(candidate_description) < task.depth and (
optimistic_estimate / self.alpha ** (len(candidate_description) + 1)
) >= ps.minimum_required_quality(result, task):
# print(qual)
# print(optimistic_estimate)
self.refined[len(candidate_description)] += 1
# print(str(candidate_description))
for new_description in operator.refinements(candidate_description):
heappush(queue, (-optimistic_estimate, new_description))
else:
self.discarded[len(candidate_description)] += 1
result.sort(key=lambda x: x[0], reverse=True)
for qual, sg in result:
print(f"{qual} {sg}")
print("discarded " + str(self.discarded))
return ps.SubgroupDiscoveryResult(result, task)
[docs]class BeamSearch:
"""
Implements the BeamSearch algorithm. Its a basic implementation
"""
def __init__(self, beam_width=20, beam_width_adaptive=False):
self.beam_width = beam_width
self.beam_width_adaptive = beam_width_adaptive
[docs] def execute(self, task):
# adapt beam width to the result set size if desired
beam_width = self.beam_width
if self.beam_width_adaptive:
beam_width = task.result_set_size
# check if beam size is to small for result set
if beam_width < task.result_set_size:
raise RuntimeError(
"Beam width in the beam search algorithm "
"is smaller than the result set size!"
)
task.qf.calculate_constant_statistics(task.data, task.target)
# init
beam = [
(
0,
ps.Conjunction([]),
task.qf.calculate_statistics(slice(None), task.target, task.data),
)
]
previous_beam = None
depth = 0
while beam != previous_beam and depth < task.depth:
previous_beam = beam.copy()
for _, last_sg, _ in previous_beam:
if not getattr(last_sg, "visited", False):
setattr(last_sg, "visited", True)
for sel in task.search_space:
# create a clone
new_selectors = list(last_sg.selectors)
if sel not in new_selectors:
new_selectors.append(sel)
sg = ps.Conjunction(new_selectors)
statistics = task.qf.calculate_statistics(
sg, task.target, task.data
)
quality = task.qf.evaluate(
sg, task.target, task.data, statistics
)
ps.add_if_required(
beam,
sg,
quality,
task,
check_for_duplicates=True,
statistics=statistics,
explicit_result_set_size=beam_width,
)
depth += 1
# result = beam[-task.result_set_size:]
while len(beam) > task.result_set_size:
heappop(beam)
result = beam
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]class SimpleSearch:
def __init__(self, show_progress=True):
self.show_progress = show_progress
[docs] def execute(self, task):
task.qf.calculate_constant_statistics(task.data, task.target)
result = []
all_selectors = chain.from_iterable(
combinations(task.search_space, r) for r in range(1, task.depth + 1)
)
if self.show_progress:
try:
from tqdm.auto import tqdm # pylint: disable=import-outside-toplevel
def binomial(x, y):
try:
binom = factorial(x) // factorial(y) // factorial(x - y)
except ValueError: # pragma: no cover
binom = 0
return binom
total = sum(
binomial(len(task.search_space), k)
for k in range(1, task.depth + 1)
)
all_selectors = tqdm(all_selectors, total=total)
except ImportError:
warnings.warn(
"tqdm not installed but show_progress=True", ImportWarning
)
for selectors in all_selectors:
sg = ps.Conjunction(selectors)
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]class SimpleDFS:
[docs] def execute(self, task, use_optimistic_estimates=True):
task.qf.calculate_constant_statistics(task.data, task.target)
result = self.search_internal(
task, [], task.search_space, [], use_optimistic_estimates
)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs] def search_internal(
self, task, prefix, modification_set, result, use_optimistic_estimates
):
sg = ps.Conjunction(copy.copy(prefix))
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
if (
use_optimistic_estimates
and len(prefix) < task.depth
and isinstance(task.qf, ps.BoundedInterestingnessMeasure)
):
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
if not optimistic_estimate > ps.minimum_required_quality(result, task):
return result
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
if not ps.constraints_satisfied(
task.constraints_monotone, sg, statistics=statistics, data=task.data
):
return result
if len(prefix) < task.depth:
new_modification_set = copy.copy(modification_set)
for sel in modification_set:
prefix.append(sel)
new_modification_set.pop(0)
self.search_internal(
task, prefix, new_modification_set, result, use_optimistic_estimates
)
# remove the sel again
prefix.pop(-1)
return result
[docs]class DFS:
"""
Implementation of a depth-first-search
with look-ahead using a provided datastructure.
"""
def __init__(self, apply_representation):
self.target_bitset = None
self.apply_representation = apply_representation
self.operator = None
self.params_tpl = namedtuple(
"StandardQF_parameters", ("size_sg", "positives_count")
)
[docs] def execute(self, task):
self.operator = ps.StaticSpecializationOperator(task.search_space)
task.qf.calculate_constant_statistics(task.data, task.target)
result = []
with self.apply_representation(task.data, task.search_space) as representation:
self.search_internal(task, result, representation.Conjunction([]))
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs] def search_internal(self, task, result, sg):
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
if not constraints_satisfied(
task.constraints_monotone, sg, statistics, task.data
):
return
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
if not optimistic_estimate > ps.minimum_required_quality(result, task):
return
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
if sg.depth < task.depth:
for new_sg in self.operator.refinements(sg):
self.search_internal(task, result, new_sg)
[docs]class DFSNumeric:
tpl = namedtuple("size_mean_parameters", ("size_sg", "mean"))
def __init__(self):
self.pop_size = 0
self.f = None
self.target_values = None
self.bitsets = {}
self.num_calls = 0
self.evaluate = None
[docs] def execute(self, task):
if not isinstance(task.qf, ps.StandardQFNumeric):
raise RuntimeError(
"BSD_numeric so far is only implemented for StandardQFNumeric"
)
self.pop_size = len(task.data)
sorted_data = task.data.sort_values(
task.target.get_attributes()[0], ascending=False
)
# generate target bitset
self.target_values = sorted_data[task.target.get_attributes()[0]].to_numpy()
task.qf.calculate_constant_statistics(task.data, task.target)
# generate selector bitsets
self.bitsets = {}
for sel in task.search_space:
# generate bitset
self.bitsets[sel] = sel.covers(sorted_data)
result = self.search_internal(
task, [], task.search_space, [], np.ones(len(sorted_data), dtype=bool)
)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs] def search_internal(self, task, prefix, modification_set, result, bitset):
self.num_calls += 1
sg_size = bitset.sum()
if sg_size == 0:
return result
target_values_sg = self.target_values[bitset]
target_values_cs = np.cumsum(target_values_sg, dtype=np.float64)
sizes = np.arange(1, len(target_values_cs) + 1)
mean_values_cs = target_values_cs / sizes
tpl = DFSNumeric.tpl(sizes, mean_values_cs)
qualities = task.qf.evaluate(None, None, None, tpl)
optimistic_estimate = np.max(qualities)
if optimistic_estimate <= ps.minimum_required_quality(result, task):
return result
sg = ps.Conjunction(copy.copy(prefix))
quality = qualities[-1]
ps.add_if_required(result, sg, quality, task)
if len(prefix) < task.depth:
new_modification_set = copy.copy(modification_set)
for sel in modification_set:
prefix.append(sel)
new_bitset = bitset & self.bitsets[sel]
new_modification_set.pop(0)
self.search_internal(
task, prefix, new_modification_set, result, new_bitset
)
# remove the sel again
prefix.pop(-1)
return result