"""
Created on 29.04.2016
@author: lemmerfn
"""
import copy
import warnings
from collections import Counter, defaultdict, namedtuple
from heapq import heappop, heappush
from itertools import chain, combinations
from math import factorial
import numpy as np
import pysubgroup as ps
[docs]
class SubgroupDiscoveryTask:
"""
Encapsulates all parameters required to perform standard subgroup discovery.
"""
def __init__(
self,
data,
target,
search_space,
qf,
result_set_size=10,
depth=3,
min_quality=float("-inf"),
constraints=None,
):
"""
Initializes a new SubgroupDiscoveryTask.
Parameters:
data: The dataset to be analyzed.
target: The target concept for subgroup discovery.
search_space: The search space of possible selectors.
qf: The quality function to evaluate subgroups.
result_set_size: The maximum number of subgroups to return.
depth: The maximum depth (length) of the subgroups.
min_quality: The minimal quality threshold for subgroups.
constraints: A list of constraints to be satisfied by subgroups.
"""
self.data = data
self.target = target
self.search_space = search_space
self.qf = qf
self.result_set_size = result_set_size
self.depth = depth
self.min_quality = min_quality
if constraints is None:
constraints = []
self.constraints = constraints
self.constraints_monotone = [
constr for constr in constraints if constr.is_monotone
]
self.constraints_other = [
constr for constr in constraints if not constr.is_monotone
]
[docs]
def constraints_satisfied(constraints, subgroup, statistics=None, data=None):
"""
Checks if all constraints are satisfied for a given subgroup.
Parameters:
constraints: A list of constraints to check.
subgroup: The subgroup to be evaluated.
statistics: Precomputed statistics for the subgroup (optional).
data: The dataset to be analyzed (optional).
Returns:
True if all constraints are satisfied, False otherwise.
"""
return all(
constr.is_satisfied(subgroup, statistics, data) for constr in constraints
)
try: # pragma: no cover
from numba import ( # pylint: disable=import-error, import-outside-toplevel
int32,
int64,
njit,
)
@njit([(int32[:, :], int64[:])], cache=True)
def getNewCandidates(candidates, hashes): # pragma: no cover
"""
Generates new candidate pairs for the next level using Numba for acceleration.
Parameters:
candidates: A 2D numpy array of candidate selector IDs.
hashes: A 1D numpy array of hash values for the candidates.
Returns:
A list of tuples, each containing indices of candidate pairs to be combined.
"""
result = []
for i in range(len(candidates) - 1):
for j in range(i + 1, len(candidates)):
if hashes[i] == hashes[j]:
if np.all(candidates[i, :-1] == candidates[j, :-1]):
result.append((i, j))
return result
except ImportError: # pragma: no cover
pass
[docs]
class Apriori:
"""
Implementation of the Apriori algorithm for subgroup discovery.
This class provides methods to perform level-wise search for subgroups
using the Apriori algorithm.
"""
def __init__(
self, representation_type=None, combination_name="Conjunction", use_numba=True
):
"""
Initializes the Apriori algorithm.
Parameters:
representation_type: The representation type to use for subgroups
(default is BitSetRepresentation).
combination_name: The name of the combination method
(e.g., "Conjunction" or "Disjunction").
use_numba: Whether to use Numba for performance optimization.
"""
self.combination_name = combination_name
if representation_type is None:
representation_type = ps.BitSetRepresentation
self.representation_type = representation_type
self.use_vectorization = True
self.optimistic_estimate_name = "optimistic_estimate"
self.next_level = self.get_next_level
self.compiled_func = None
if use_numba: # pragma: no cover
try:
import numba # pylint: disable=unused-import, import-outside-toplevel # noqa: F401, E501
self.next_level = self.get_next_level_numba
print("Apriori: Using numba for speedup")
except ImportError:
pass
[docs]
def get_next_level_candidates(self, task, result, next_level_candidates):
"""
Evaluates candidates at the current level and filters promising ones for
the next level.
Parameters:
task: The subgroup discovery task.
result: The current list of discovered subgroups.
next_level_candidates: List of subgroups to be evaluated at the current
level.
Returns:
A list of promising candidates (selectors) for the next level.
"""
promising_candidates = []
optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name)
for sg in next_level_candidates:
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
ps.add_if_required(
result,
sg,
task.qf.evaluate(sg, task.target, task.data, statistics),
task,
statistics=statistics,
)
optimistic_estimate = optimistic_estimate_function(
sg, task.target, task.data, statistics
)
if optimistic_estimate >= ps.minimum_required_quality(
result, task
) and ps.constraints_satisfied(
task.constraints_monotone, sg, statistics, task.data
):
promising_candidates.append((optimistic_estimate, sg.selectors))
min_quality = ps.minimum_required_quality(result, task)
promising_candidates = [
selectors
for estimate, selectors in promising_candidates
if estimate > min_quality
]
return promising_candidates
[docs]
def get_next_level_candidates_vectorized(self, task, result, next_level_candidates):
"""
Vectorized evaluation of candidates at the current level to filter promising
ones for the next level.
Parameters:
task: The subgroup discovery task.
result: The current list of discovered subgroups.
next_level_candidates: List of subgroups to be evaluated at the current
level.
Returns:
A list of promising candidates (selectors) for the next level.
"""
promising_candidates = []
statistics = []
optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name)
next_level_candidates = list(next_level_candidates)
if len(next_level_candidates) == 0:
return []
for sg in next_level_candidates:
statistics.append(task.qf.calculate_statistics(sg, task.target, task.data))
tpl_class = statistics[0].__class__
vec_statistics = tpl_class._make(np.array(tpl) for tpl in zip(*statistics))
qualities = task.qf.evaluate(
slice(None), task.target, task.data, vec_statistics
)
optimistic_estimates = optimistic_estimate_function(
None, None, None, vec_statistics
)
for sg, quality, stats in zip(next_level_candidates, qualities, statistics):
ps.add_if_required(result, sg, quality, task, statistics=stats)
min_quality = ps.minimum_required_quality(result, task)
for sg, optimistic_estimate in zip(next_level_candidates, optimistic_estimates):
if optimistic_estimate >= min_quality:
promising_candidates.append(sg.selectors)
return promising_candidates
[docs]
def get_next_level_numba(self, promising_candidates): # pragma: no cover
"""
Generates the next level of candidates using Numba for acceleration.
Parameters:
promising_candidates: A list of promising candidate selectors.
Returns:
A list of new candidate selectors for the next level.
"""
if not hasattr(self, "compiled_func") or self.compiled_func is None:
self.compiled_func = getNewCandidates
# Map selectors to unique IDs
all_selectors = Counter(chain.from_iterable(promising_candidates))
all_selectors_ids = {selector: i for i, selector in enumerate(all_selectors)}
promising_candidates_selector_ids = [
tuple(all_selectors_ids[sel] for sel in selectors)
for selectors in promising_candidates
]
shape1 = len(promising_candidates_selector_ids)
if shape1 == 0:
return []
shape2 = len(promising_candidates_selector_ids[0])
arr = np.array(promising_candidates_selector_ids, dtype=np.int32).reshape(
shape1, shape2
)
print(len(arr))
hashes = np.array(
[hash(tuple(x[:-1])) for x in promising_candidates_selector_ids],
dtype=np.int64,
)
print(len(arr), arr.dtype, hashes.dtype)
candidates_int = self.compiled_func(arr, hashes)
return [
(*promising_candidates[i], promising_candidates[j][-1])
for i, j in candidates_int
]
[docs]
def get_next_level(self, promising_candidates):
"""
Generates the next level of candidates based on the current promising
candidates.
Parameters:
promising_candidates: A list of promising candidate selectors.
Returns:
A list of new candidate selectors for the next level.
"""
by_prefix_dict = defaultdict(list)
for sg in promising_candidates:
by_prefix_dict[tuple(sg[:-1])].append(sg[-1])
return [
prefix + real_suffix
for prefix, suffixes in by_prefix_dict.items()
for real_suffix in combinations(sorted(suffixes), 2)
]
[docs]
def execute(self, task):
"""
Executes the Apriori algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
if not isinstance(
task.qf, ps.BoundedInterestingnessMeasure
): # pragma: no cover
warnings.warn(
"Quality function is unbounded, long runtime expected", RuntimeWarning
)
task.qf.calculate_constant_statistics(task.data, task.target)
with self.representation_type(task.data, task.search_space) as representation:
combine_selectors = getattr(representation.__class__, self.combination_name)
result = []
# Initialize the first level candidates
next_level_candidates = []
for sel in task.search_space:
sg = combine_selectors([sel])
if ps.constraints_satisfied(
task.constraints_monotone, sg, None, task.data
):
next_level_candidates.append(sg)
# Level-wise search
depth = 1
while next_level_candidates:
# Evaluate subgroups from the last level
if self.use_vectorization:
promising_candidates = self.get_next_level_candidates_vectorized(
task, result, next_level_candidates
)
else:
promising_candidates = self.get_next_level_candidates(
task, result, next_level_candidates
)
if len(promising_candidates) == 0:
break
if depth == task.depth:
break
next_level_candidates_no_pruning = self.next_level(promising_candidates)
# Select selectors and build subgroups for which all subsets are in the
# set of promising candidates
curr_depth = depth # Need copy of depth for lazy evaluation
set_promising_candidates = set(tuple(p) for p in promising_candidates)
next_level_candidates = (
combine_selectors(selectors)
for selectors in next_level_candidates_no_pruning
if all(
(subset in set_promising_candidates)
for subset in combinations(selectors, curr_depth)
)
)
depth = depth + 1
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
class BestFirstSearch:
"""
Implements the Best-First Search algorithm for subgroup discovery.
"""
[docs]
def execute(self, task):
"""
Executes the Best-First Search algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
result = []
queue = [(float("-inf"), ps.Conjunction([]))]
operator = ps.StaticSpecializationOperator(task.search_space)
task.qf.calculate_constant_statistics(task.data, task.target)
while queue:
q, old_description = heappop(queue)
q = -q
if not q > ps.minimum_required_quality(result, task):
break
for candidate_description in operator.refinements(old_description):
sg = candidate_description
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
ps.add_if_required(
result,
sg,
task.qf.evaluate(sg, task.target, task.data, statistics),
task,
statistics=statistics,
)
if len(candidate_description) < task.depth:
if hasattr(task.qf, "optimistic_estimate"):
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
else:
optimistic_estimate = np.inf
# Compute refinements and fill the queue
if optimistic_estimate >= ps.minimum_required_quality(result, task):
if ps.constraints_satisfied(
task.constraints_monotone,
candidate_description,
statistics,
task.data,
):
heappush(
queue, (-optimistic_estimate, candidate_description)
)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
class GeneralisingBFS: # pragma: no cover
"""
Implements a Generalizing Best-First Search algorithm for subgroup discovery.
"""
def __init__(self):
self.alpha = 1.10
self.discarded = [0, 0, 0, 0, 0, 0, 0]
self.refined = [0, 0, 0, 0, 0, 0, 0]
[docs]
def execute(self, task):
"""
Executes the Generalizing Best-First Search algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
result = []
queue = []
operator = ps.StaticGeneralizationOperator(task.search_space)
# Initialize the first level
for sel in task.search_space:
queue.append((float("-inf"), ps.Disjunction([sel])))
task.qf.calculate_constant_statistics(task.data, task.target)
while queue:
q, candidate_description = heappop(queue)
q = -q
if q < ps.minimum_required_quality(result, task):
break
sg = candidate_description
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
qual = ps.minimum_required_quality(result, task)
if (quality, sg) in result:
new_queue = []
for q_tmp, c_tmp in queue:
if (-q_tmp) > qual:
heappush(new_queue, (q_tmp, c_tmp))
queue = new_queue
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
# Compute refinements and fill the queue
if len(candidate_description) < task.depth and (
optimistic_estimate / self.alpha ** (len(candidate_description) + 1)
) >= ps.minimum_required_quality(result, task):
self.refined[len(candidate_description)] += 1
for new_description in operator.refinements(candidate_description):
heappush(queue, (-optimistic_estimate, new_description))
else:
self.discarded[len(candidate_description)] += 1
result.sort(key=lambda x: x[0], reverse=True)
print("discarded " + str(self.discarded))
return ps.SubgroupDiscoveryResult(result, task)
[docs]
class BeamSearch:
"""
Implements the Beam Search algorithm for subgroup discovery.
"""
def __init__(self, beam_width=20, beam_width_adaptive=False):
"""
Initializes the Beam Search algorithm.
Parameters:
beam_width: Width of the beam (number of candidates to keep at each level).
beam_width_adaptive: Whether to adapt the beam width to the result set size.
"""
self.beam_width = beam_width
self.beam_width_adaptive = beam_width_adaptive
[docs]
def execute(self, task):
"""
Executes the Beam Search algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
# Adapt beam width to the result set size if desired
beam_width = self.beam_width
if self.beam_width_adaptive:
beam_width = task.result_set_size
# Check if beam size is too small for result set
if beam_width < task.result_set_size:
raise RuntimeError(
"Beam width in the beam search algorithm "
"is smaller than the result set size!"
)
task.qf.calculate_constant_statistics(task.data, task.target)
# Initialize
beam = [
(
0,
ps.Conjunction([]),
task.qf.calculate_statistics(slice(None), task.target, task.data),
)
]
previous_beam = None
depth = 0
while beam != previous_beam and depth < task.depth:
previous_beam = beam.copy()
for _, last_sg, _ in previous_beam:
if getattr(last_sg, "visited", False):
continue
setattr(last_sg, "visited", True)
for sel in task.search_space:
# Create a clone
if sel in last_sg.selectors:
continue
sg = ps.Conjunction(last_sg.selectors + (sel,))
statistics = task.qf.calculate_statistics(
sg, task.target, task.data
)
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(
beam,
sg,
quality,
task,
check_for_duplicates=True,
statistics=statistics,
explicit_result_set_size=beam_width,
)
depth += 1
# Trim the beam to the result set size
while len(beam) > task.result_set_size:
heappop(beam)
result = beam
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
class SimpleSearch:
"""
Implements a simple exhaustive search algorithm for subgroup discovery.
"""
def __init__(self, show_progress=True):
"""
Initializes the Simple Search algorithm.
Parameters:
show_progress: Whether to display a progress bar during the search.
"""
self.show_progress = show_progress
[docs]
def execute(self, task):
"""
Executes the Simple Search algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
task.qf.calculate_constant_statistics(task.data, task.target)
result = []
all_selectors = chain.from_iterable(
combinations(task.search_space, r) for r in range(1, task.depth + 1)
)
if self.show_progress:
try:
from tqdm.auto import tqdm # pylint: disable=import-outside-toplevel
def binomial(x, y):
try:
binom = factorial(x) // factorial(y) // factorial(x - y)
except ValueError: # pragma: no cover
binom = 0
return binom
total = sum(
binomial(len(task.search_space), k)
for k in range(1, task.depth + 1)
)
all_selectors = tqdm(all_selectors, total=total)
except ImportError: # pragma: no cover
warnings.warn(
"tqdm not installed but show_progress=True", ImportWarning
)
for selectors in all_selectors:
sg = ps.Conjunction(selectors)
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
class SimpleDFS:
"""
Implements a simple Depth-First Search algorithm for subgroup discovery.
It is the most elementary (and thus probably slow) algorithm implementation.
"""
[docs]
def execute(self, task, use_optimistic_estimates=True):
"""
Executes the Simple DFS algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
use_optimistic_estimates: Whether to use optimistic estimates for pruning.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
task.qf.calculate_constant_statistics(task.data, task.target)
result = self.search_internal(
task, [], task.search_space, [], use_optimistic_estimates
)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
def search_internal(
self, task, prefix, modification_set, result, use_optimistic_estimates
):
"""
Recursively searches for subgroups in a depth-first manner.
Parameters:
task: The subgroup discovery task.
prefix: The current list of selectors in the subgroup description.
modification_set: The remaining selectors to consider.
result: The current list of discovered subgroups.
use_optimistic_estimates: Whether to use optimistic estimates for pruning.
Returns:
The updated list of discovered subgroups.
"""
sg = ps.Conjunction(copy.copy(prefix))
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
if (
use_optimistic_estimates
and len(prefix) < task.depth
and isinstance(task.qf, ps.BoundedInterestingnessMeasure)
):
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
if not optimistic_estimate > ps.minimum_required_quality(result, task):
return result
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
if not ps.constraints_satisfied(
task.constraints_monotone, sg, statistics=statistics, data=task.data
):
return result
if len(prefix) < task.depth:
new_modification_set = copy.copy(modification_set)
for sel in modification_set:
prefix.append(sel)
new_modification_set.pop(0)
self.search_internal(
task, prefix, new_modification_set, result, use_optimistic_estimates
)
# Remove the selector again
prefix.pop(-1)
return result
[docs]
class DFS:
"""
Depth-first search with look-ahead for a provided data structure.
"""
def __init__(self, apply_representation=None):
"""
Initializes the DFS algorithm.
Parameters:
apply_representation: The representation type to use for subgroups.
"""
self.target_bitset = None
if apply_representation is None:
apply_representation = ps.BitSetRepresentation
self.apply_representation = apply_representation
self.operator = None
self.params_tpl = namedtuple(
"StandardQF_parameters", ("size_sg", "positives_count")
)
[docs]
def execute(self, task):
"""
Executes the DFS algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
self.operator = ps.StaticSpecializationOperator(task.search_space)
task.qf.calculate_constant_statistics(task.data, task.target)
result = []
with self.apply_representation(task.data, task.search_space) as representation:
self.search_internal(task, result, representation.Conjunction([]))
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
def search_internal(self, task, result, sg):
"""
Recursively searches for subgroups in a depth-first manner.
Parameters:
task: The subgroup discovery task.
result: The current list of discovered subgroups.
sg: The current subgroup being evaluated.
"""
statistics = task.qf.calculate_statistics(sg, task.target, task.data)
if not constraints_satisfied(
task.constraints_monotone, sg, statistics, task.data
):
return
optimistic_estimate = task.qf.optimistic_estimate(
sg, task.target, task.data, statistics
)
if not optimistic_estimate > ps.minimum_required_quality(result, task):
return
quality = task.qf.evaluate(sg, task.target, task.data, statistics)
ps.add_if_required(result, sg, quality, task, statistics=statistics)
if sg.depth < task.depth:
for new_sg in self.operator.refinements(sg):
self.search_internal(task, result, new_sg)
[docs]
class DFSNumeric:
"""
Implements a specialized DFS algorithm for numeric quality functions.
"""
tpl = namedtuple("size_mean_parameters", ("size_sg", "mean"))
def __init__(self):
self.pop_size = 0
self.f = None
self.target_values = None
self.bitsets = {}
self.num_calls = 0
self.evaluate = None
[docs]
def execute(self, task):
"""
Executes the DFSNumeric algorithm on the given task.
Parameters:
task: The subgroup discovery task to be executed.
Returns:
A SubgroupDiscoveryResult containing the discovered subgroups.
"""
if not isinstance(task.qf, ps.StandardQFNumeric):
raise RuntimeError(
"BSD_numeric so far is only implemented for StandardQFNumeric"
)
self.pop_size = len(task.data)
sorted_data = task.data.sort_values(
task.target.get_attributes()[0], ascending=False
)
# Generate target values
self.target_values = sorted_data[task.target.get_attributes()[0]].to_numpy()
task.qf.calculate_constant_statistics(task.data, task.target)
# Generate selector bitsets
self.bitsets = {}
for sel in task.search_space:
# Generate bitset
self.bitsets[sel] = sel.covers(sorted_data)
result = self.search_internal(
task, [], task.search_space, [], np.ones(len(sorted_data), dtype=bool)
)
result = ps.prepare_subgroup_discovery_result(result, task)
return ps.SubgroupDiscoveryResult(result, task)
[docs]
def search_internal(self, task, prefix, modification_set, result, bitset):
"""
Recursively searches in a dfs-manner for numeric quality functions.
Parameters:
task: The subgroup discovery task.
prefix: The current list of selectors in the subgroup description.
modification_set: The remaining selectors to consider.
result: The current list of discovered subgroups.
bitset: The current bitset representing the subgroup.
Returns:
The updated list of discovered subgroups.
"""
self.num_calls += 1
sg_size = bitset.sum()
if sg_size == 0:
return result
target_values_sg = self.target_values[bitset]
target_values_cs = np.cumsum(target_values_sg, dtype=np.float64)
sizes = np.arange(1, len(target_values_cs) + 1)
mean_values_cs = target_values_cs / sizes
tpl = DFSNumeric.tpl(sizes, mean_values_cs)
qualities = task.qf.evaluate(None, None, None, tpl)
optimistic_estimate = np.max(qualities)
if optimistic_estimate <= ps.minimum_required_quality(result, task):
return result
sg = ps.Conjunction(copy.copy(prefix))
quality = qualities[-1]
ps.add_if_required(result, sg, quality, task)
if len(prefix) < task.depth:
new_modification_set = copy.copy(modification_set)
for sel in modification_set:
prefix.append(sel)
new_bitset = bitset & self.bitsets[sel]
new_modification_set.pop(0)
self.search_internal(
task, prefix, new_modification_set, result, new_bitset
)
# Remove the selector again
prefix.pop(-1)
return result