Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

model_selection module: Group splitter #74

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions molpipeline/experimental/model_selection/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
"""Model selection module."""

from molpipeline.experimental.model_selection.splitter import GroupShuffleSplit


__all__ = ["GroupShuffleSplit"]
224 changes: 224 additions & 0 deletions molpipeline/experimental/model_selection/splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
from sklearn.model_selection import (
BaseShuffleSplit,
)

import numpy as np
from sklearn.model_selection._split import _validate_shuffle_split
from sklearn.utils import check_array, shuffle
from sklearn.utils.validation import _num_samples, check_random_state


class GroupShuffleSplit(BaseShuffleSplit):
def __init__(
self,
n_splits=5,
*,
test_size=None,
train_size=None,
group_by="number",
random_state=None
):
super().__init__(
n_splits=n_splits,
test_size=test_size,
train_size=train_size,
random_state=random_state,
)
self._default_test_size = 0.2
if group_by not in ["size", "number"]:
raise ValueError(
"Bad parameter 'group_by'. Allowed are 'size' and 'number'."
)
self._group_by = group_by

# def _iter_indices_by_group_size2(self, X, groups):
# n_samples = _num_samples(X)
# n_train, n_test = _validate_shuffle_split(
# n_samples,
# self.test_size,
# self.train_size,
# default_test_size=self._default_test_size,
# )
# rng = check_random_state(self.random_state)
#
# classes, group_indices, group_counts = np.unique(
# groups,
# return_inverse=True,
# return_counts=True,
# )
# class_indices = np.arange(len(classes))
#
# for i in range(self.n_splits):
#
# # pre-compute random assignments to train or test set for each group
# random_bucket_assignments = rng.randint(0, 2, size=len(classes))
#
# # randomize the group order for assignment to train/test
# group_counts_shuffled, class_indices_shuffled = shuffle(
# group_counts, class_indices, random_state=rng
# )
#
# # track train and test sets in arrays of length 2
# samples_sizes = np.array([n_train, n_test], dtype=np.int_)
# bucket_sizes = np.zeros(2, dtype=np.int_)
# bucket_elements = [[], []]
#
# for class_index, group_size, bucket_index in zip(
# class_indices_shuffled, group_counts_shuffled, random_bucket_assignments
# ):
# first_bucket_size = bucket_sizes[bucket_index] + group_size
# second_bucket_size = bucket_sizes[1 - bucket_index] + group_size
#
# # first, try to assign the group randomly to a bucket
# if first_bucket_size <= samples_sizes[bucket_index]:
# bucket_elements[bucket_index].append(class_index)
# bucket_sizes[bucket_index] += group_size
# elif second_bucket_size <= samples_sizes[1 - bucket_index]:
# bucket_elements[1 - bucket_index].append(class_index)
# bucket_sizes[1 - bucket_index] += group_size
# else:
# # both buckets are full
# # assign the group to the bucket with the small difference to the target split sizes
# first_diff = first_bucket_size - samples_sizes[bucket_index]
# second_diff = second_bucket_size - samples_sizes[1 - bucket_index]
# if first_diff < second_diff:
# bucket_elements[bucket_index].append(class_index)
# bucket_sizes[bucket_index] += group_size
# else:
# bucket_elements[1 - bucket_index].append(class_index)
# bucket_sizes[1 - bucket_index] += group_size
#
# train = np.flatnonzero(np.isin(group_indices, bucket_elements[0]))
# test = np.flatnonzero(np.isin(group_indices, bucket_elements[1]))
#
# train = rng.permutation(train)
# test = rng.permutation(test)
#
# yield train, test

def _iter_indices_by_group_size(self, X, groups):
n_samples = _num_samples(X)
n_train, n_test = _validate_shuffle_split(
n_samples,
self.test_size,
self.train_size,
default_test_size=self._default_test_size,
)
rng = check_random_state(self.random_state)

classes, group_indices, group_counts = np.unique(
groups,
return_inverse=True,
return_counts=True,
)
class_indices = np.arange(len(classes))

for i in range(self.n_splits):

# pre-compute random assignments to train or test set for each group
random_bucket_assignments = rng.randint(0, 2, size=len(classes))

# randomize the group order for assignment to train/test
group_counts_shuffled, class_indices_shuffled = shuffle(
group_counts, class_indices, random_state=rng
)

# track train and test sets in arrays of length 2
samples_sizes = np.array([n_train, n_test], dtype=np.int_)
bucket_sizes = np.zeros(2, dtype=np.int_)
bucket_elements = [[], []]

for class_index, group_size, bucket_index in zip(
class_indices_shuffled, group_counts_shuffled, random_bucket_assignments
):
first_bucket_size = bucket_sizes[bucket_index] + group_size
second_bucket_size = bucket_sizes[1 - bucket_index] + group_size

# first, try to assign the group randomly to a bucket
bucket_selection = bucket_index
if first_bucket_size <= samples_sizes[bucket_index]:
bucket_selection = bucket_index
elif second_bucket_size <= samples_sizes[1 - bucket_index]:
bucket_selection = 1 - bucket_index
else:
# the group does not fit in any bucket. It is assigned to the bucket
# which will be closer to its target sample sizes.
first_diff = first_bucket_size - samples_sizes[bucket_index]
second_diff = second_bucket_size - samples_sizes[1 - bucket_index]
if second_diff < first_diff:
bucket_selection = 1 - bucket_index

bucket_elements[bucket_selection].append(class_index)
bucket_sizes[bucket_selection] += group_size

# map group indices back to sample indices
train = np.flatnonzero(np.isin(group_indices, bucket_elements[0]))
test = np.flatnonzero(np.isin(group_indices, bucket_elements[1]))

train = rng.permutation(train)
test = rng.permutation(test)

yield train, test

# def _iter_indices_by_group_size2(self, X, groups):
# # validation checks taken from super._iter_indices
# n_samples = _num_samples(X)
# n_train, n_test = _validate_shuffle_split(
# n_samples,
# self.test_size,
# self.train_size,
# default_test_size=self._default_test_size,
# )
# rng = check_random_state(self.random_state)
#
# classes, group_indices, group_counts = np.unique(
# groups,
# return_inverse=True,
# return_counts=True,
# )
# n_classes = len(classes)
#
# for i in range(self.n_splits):
# # random partition
# permutation = rng.permutation(n_classes)
#
# # fill the test set first
# test_sample_size = 0
# idx = 0
# for class_index in permutation:
# idx += 1
# test_sample_size += group_counts[class_index]
# if test_sample_size >= n_test:
# break
#
# ind_test = permutation[:idx]
# ind_train = permutation[idx : (n_test + n_train)]
#
# train = np.flatnonzero(np.isin(group_indices, ind_train))
# test = np.flatnonzero(np.isin(group_indices, ind_test))
#
# yield train, test

def _iter_indices(self, X, y=None, groups=None):
if groups is None:
raise ValueError("The 'groups' parameter should not be None.")
groups = check_array(groups, input_name="groups", ensure_2d=False, dtype=None)

if self._group_by == "number":
classes, group_indices = np.unique(groups, return_inverse=True)
for group_train, group_test in super()._iter_indices(X=classes):
# these are the indices of classes in the partition
# invert them into data indices

train = np.flatnonzero(np.isin(group_indices, group_train))
test = np.flatnonzero(np.isin(group_indices, group_test))

yield train, test
elif self._group_by == "size":
yield from self._iter_indices_by_group_size(X, groups)

else:
raise AssertionError("Unknown parameter for 'group_by'.")

def split(self, X, y=None, groups=None):
return super().split(X, y, groups)
Empty file.
87 changes: 87 additions & 0 deletions tests/test_experimental/test_model_selection/test_splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import unittest

import numpy as np
from numpy.testing import assert_array_equal
from sklearn.model_selection import GroupShuffleSplit

from molpipeline.experimental.model_selection.splitter import GroupShuffleSplit


class TestSampleBasedGroupShuffleSplit(unittest.TestCase):
"""Unit test for the functionality of the pipeline class."""

def test_group_shuffle_split_default_test_size(self) -> None:
"""Test the group shuffle split using the number samples in the groups for balancing train and test set."""

for train_size, exp_train, exp_test in [(None, 8, 2), (7, 7, 3), (0.7, 7, 3)]:

# Check that the default value has the expected behavior, i.e. 0.2 if both
# unspecified or complement train_size unless both are specified.
X = np.ones(10)
y = np.ones(10)
groups = range(10)

for group_by in ["size", "number"]:

X_train, X_test = next(
GroupShuffleSplit(train_size=train_size, group_by=group_by).split(
X, y, groups
)
)

self.assertEqual(len(X_train), exp_train)
self.assertEqual(len(X_test), exp_test)

def test_group_shuffle_split_default_test_size(self) -> None:
test_groups = (
np.array([1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 3]),
np.array([0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3]),
np.array([0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2]),
np.array([1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4]),
[1, 1, 1, 1, 2, 2, 2, 3, 3, 3, 3, 3],
["1", "1", "1", "1", "2", "2", "2", "3", "3", "3", "3", "3"],
)

for groups_i in test_groups:
X = y = np.ones(len(groups_i))
n_splits = 6
test_size = 1.0 / 3
for group_by in ["size", "number"]:
slo = GroupShuffleSplit(
n_splits, test_size=test_size, random_state=0, group_by=group_by
)

# Make sure the repr works
repr(slo)

# Test that the length is correct
assert slo.get_n_splits(X, y, groups=groups_i) == n_splits

l_unique = np.unique(groups_i)
l = np.asarray(groups_i)

for train, test in slo.split(X, y, groups=groups_i):
# First test: no train group is in the test set and vice versa
l_train_unique = np.unique(l[train])
l_test_unique = np.unique(l[test])
assert not np.any(np.isin(l[train], l_test_unique))
assert not np.any(np.isin(l[test], l_train_unique))

# Second test: train and test add up to all the data
assert l[train].size + l[test].size == l.size

# Third test: train and test are disjoint
assert_array_equal(np.intersect1d(train, test), [])

# Fourth test:
# unique train and test groups are correct, +- 1 for rounding error
assert (
abs(len(l_test_unique) - round(test_size * len(l_unique))) <= 1
)
assert (
abs(
len(l_train_unique)
- round((1.0 - test_size) * len(l_unique))
)
<= 1
)
Loading