# Copyright (C) 2017-2023  Cleanlab Inc.
# This file is part of cleanlab.
#
# cleanlab is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cleanlab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with cleanlab.  If not, see <https://www.gnu.org/licenses/>.
"""
Helper methods that are useful for benchmarking cleanlab’s core algorithms.
These methods introduce synthetic noise into the labels of a classification dataset.
Specifically, this module provides methods for generating valid noise matrices (for which learning with noise is possible),
generating noisy labels given a noise matrix, generating valid noise matrices with a specific trace value, and more.
"""
from typing import Optional
import numpy as np
from cleanlab.internal.util import value_counts
from cleanlab.internal.constants import FLOATING_POINT_COMPARISON
[docs]def noise_matrix_is_valid(noise_matrix, py, *, verbose=False) -> bool:
    """Given a prior `py` representing ``p(true_label=k)``, checks if the given `noise_matrix` is a
    learnable matrix. Learnability means that it is possible to achieve
    better than random performance, on average, for the amount of noise in
    `noise_matrix`.
    Parameters
    ----------
    noise_matrix : np.ndarray
      An array of shape ``(K, K)`` representing the conditional probability
      matrix ``P(label=k_s|true_label=k_y)`` containing the fraction of
      examples in every class, labeled as every other class. Assumes columns of
      `noise_matrix` sum to 1.
    py : np.ndarray
      An array of shape ``(K,)`` representing the fraction (prior probability)
      of each true class label, ``P(true_label = k)``.
    Returns
    -------
    is_valid : bool
      Whether the noise matrix is a learnable matrix.
    """
    # Number of classes
    K = len(py)
    # let's assume some number of training examples for code readability,
    # but it doesn't matter what we choose as it's not actually used.
    N = float(10000)
    ps = np.dot(noise_matrix, py)  # P(true_label=k)
    # P(label=k, true_label=k')
    joint_noise = np.multiply(noise_matrix, py)  # / float(N)
    # Check that joint_probs is valid probability matrix
    if not (abs(joint_noise.sum() - 1.0) < FLOATING_POINT_COMPARISON):
        return False
    # Check that noise_matrix is a valid matrix
    # i.e. check p(label=k)*p(true_label=k) < p(label=k, true_label=k)
    for i in range(K):
        C = N * joint_noise[i][i]
        E1 = N * joint_noise[i].sum() - C
        E2 = N * joint_noise.T[i].sum() - C
        O = N - E1 - E2 - C
        if verbose:
            print(
                "E1E2/C",
                round(E1 * E2 / C),
                "E1",
                round(E1),
                "E2",
                round(E2),
                "C",
                round(C),
                "|",
                round(E1 * E2 / C + E1 + E2 + C),
                "|",
                round(E1 * E2 / C),
                "<",
                round(O),
            )
            print(
                round(ps[i] * py[i]),
                "<",
                round(joint_noise[i][i]),
                ":",
                ps[i] * py[i] < joint_noise[i][i],
            )
        if not (ps[i] * py[i] < joint_noise[i][i]):
            return False
    return True 
[docs]def generate_noisy_labels(true_labels, noise_matrix) -> np.ndarray:
    """Generates noisy `labels` from perfect labels `true_labels`,
    "exactly" yielding the provided `noise_matrix` between `labels` and `true_labels`.
    Below we provide a for loop implementation of what this function does.
    We do not use this implementation as it is not a fast algorithm, but
    it explains as Python pseudocode what is happening in this function.
    Parameters
    ----------
    true_labels : np.ndarray
      An array of shape ``(N,)`` representing perfect labels, without any
      noise. Contains K distinct natural number classes, 0, 1, ..., K-1.
    noise_matrix : np.ndarray
      An array of shape ``(K, K)`` representing the conditional probability
      matrix ``P(label=k_s|true_label=k_y)`` containing the fraction of
      examples in every class, labeled as every other class. Assumes columns of
      `noise_matrix` sum to 1.
    Returns
    -------
    labels : np.ndarray
      An array of shape ``(N,)`` of noisy labels.
    Examples
    --------
    .. code:: python
        # Generate labels
        count_joint = (noise_matrix * py * len(y)).round().astype(int)
        labels = np.ndarray(y)
        for k_s in range(K):
            for k_y in range(K):
                if k_s != k_y:
                    idx_flip = np.where((labels==k_y)&(true_label==k_y))[0]
                    if len(idx_flip): # pragma: no cover
                        labels[np.random.choice(
                            idx_flip,
                            count_joint[k_s][k_y],
                            replace=False,
                        )] = k_s
    """
    # Make y a numpy array, if it is not
    true_labels = np.asarray(true_labels)
    # Number of classes
    K = len(noise_matrix)
    # Compute p(true_label=k)
    py = value_counts(true_labels) / float(len(true_labels))
    # Counts of pairs (labels, y)
    count_joint = (noise_matrix * py * len(true_labels)).astype(int)
    # Remove diagonal entries as they do not involve flipping of labels.
    np.fill_diagonal(count_joint, 0)
    # Generate labels
    labels = np.array(true_labels)
    for k in range(K):  # Iterate over true_label == k
        # Get the noisy labels that have non-zero counts
        labels_per_class = np.where(count_joint[:, k] != 0)[0]
        # Find out how many of each noisy  label we need to flip to
        label_counts = count_joint[labels_per_class, k]
        # Create a list of the new noisy labels
        noise = [labels_per_class[i] for i, c in enumerate(label_counts) for z in range(c)]
        # Randomly choose y labels for class k and set them to the noisy labels.
        idx_flip = np.where((labels == k) & (true_labels == k))[0]
        if len(idx_flip) and len(noise) and len(idx_flip) >= len(noise):  # pragma: no cover
            labels[np.random.choice(idx_flip, len(noise), replace=False)] = noise
    # Validate that labels indeed produces the correct noise_matrix (or close to it)
    # Compute the actual noise matrix induced by labels
    # counts = confusion_matrix(labels, true_labels).astype(float)
    # new_noise_matrix = counts / counts.sum(axis=0)
    # assert(np.linalg.norm(noise_matrix - new_noise_matrix) <= 2)
    return labels 
[docs]def generate_noise_matrix_from_trace(
    K,
    trace,
    *,
    max_trace_prob=1.0,
    min_trace_prob=1e-5,
    max_noise_rate=1 - 1e-5,
    min_noise_rate=0.0,
    valid_noise_matrix=True,
    py=None,
    frac_zero_noise_rates=0.0,
    seed=0,
    max_iter=10000,
) -> Optional[np.ndarray]:
    """Generates a ``K x K`` noise matrix ``P(label=k_s|true_label=k_y)`` with
    ``np.sum(np.diagonal(noise_matrix))`` equal to the given `trace`.
    Parameters
    ----------
    K : int
      Creates a noise matrix of shape ``(K, K)``. Implies there are
      K classes for learning with noisy labels.
    trace : float
      Sum of diagonal entries of array of random probabilities returned.
    max_trace_prob : float
      Maximum probability of any entry in the trace of the return matrix.
    min_trace_prob : float
      Minimum probability of any entry in the trace of the return matrix.
    max_noise_rate : float
      Maximum noise_rate (non-diagonal entry) in the returned np.ndarray.
    min_noise_rate : float
      Minimum noise_rate (non-diagonal entry) in the returned np.ndarray.
    valid_noise_matrix : bool, default=True
      If ``True``, returns a matrix having all necessary conditions for
      learning with noisy labels. In particular, ``p(true_label=k)p(label=k) < p(true_label=k,label=k)``
      is satisfied. This requires that ``trace > 1``.
    py : np.ndarray
      An array of shape ``(K,)`` representing the fraction (prior probability) of each true class label, ``P(true_label = k)``.
      This argument is **required** when ``valid_noise_matrix=True``.
    frac_zero_noise_rates : float
      The fraction of the ``n*(n-1)`` noise rates
      that will be set to 0. Note that if you set a high trace, it may be
      impossible to also have a low fraction of zero noise rates without
      forcing all non-1 diagonal values. Instead, when this happens we only
      guarantee to produce a noise matrix with `frac_zero_noise_rates` *or
      higher*. The opposite occurs with a small trace.
    seed : int
      Seeds the random number generator for numpy.
    max_iter : int, default=10000
      The max number of tries to produce a valid matrix before returning ``None``.
    Returns
    -------
    noise_matrix : np.ndarray or None
      An array of shape ``(K, K)`` representing the noise matrix ``P(label=k_s|true_label=k_y)`` with `trace`
      equal to ``np.sum(np.diagonal(noise_matrix))``. This a conditional probability matrix and a
      left stochastic matrix. Returns ``None`` if `max_iter` is exceeded.
    """
    if valid_noise_matrix and trace <= 1:
        raise ValueError(
            "trace = {}. trace > 1 is necessary for a".format(trace)
            + " valid noise matrix to be returned (valid_noise_matrix == True)"
        )
    if valid_noise_matrix and py is None and K > 2:
        raise ValueError(
            "py must be provided (not None) if the input parameter" + " valid_noise_matrix == True"
        )
    if K <= 1:
        raise ValueError("K must be >= 2, but K = {}.".format(K))
    if max_iter < 1:
        return None
    np.random.seed(seed)
    # Special (highly constrained) case with faster solution.
    # Every 2 x 2 noise matrix with trace > 1 is valid because p(y) is not used
    if K == 2:
        if frac_zero_noise_rates >= 0.5:  # Include a single zero noise rate
            noise_mat = np.array(
                [
                    [1.0, 1 - (trace - 1.0)],
                    [0.0, trace - 1.0],
                ]
            )
            return noise_mat if np.random.rand() > 0.5 else np.rot90(noise_mat, k=2)
        else:  # No zero noise rates
            diag = generate_n_rand_probabilities_that_sum_to_m(2, trace)
            noise_matrix = np.array(
                [
                    [diag[0], 1 - diag[1]],
                    [1 - diag[0], diag[1]],
                ]
            )
            return noise_matrix
            # K > 2
    for z in range(max_iter):
        noise_matrix = np.zeros(shape=(K, K))
        # Randomly generate noise_matrix diagonal.
        nm_diagonal = generate_n_rand_probabilities_that_sum_to_m(
            n=K,
            m=trace,
            max_prob=max_trace_prob,
            min_prob=min_trace_prob,
        )
        np.fill_diagonal(noise_matrix, nm_diagonal)
        # Randomly distribute number of zero-noise-rates across columns
        num_col_with_noise = K - np.count_nonzero(1 == nm_diagonal)
        num_zero_noise_rates = int(K * (K - 1) * frac_zero_noise_rates)
        # Remove zeros already in [1,0,..,0] columns
        num_zero_noise_rates -= (K - num_col_with_noise) * (K - 1)
        num_zero_noise_rates = np.maximum(num_zero_noise_rates, 0)  # Prevent negative
        num_zero_noise_rates_per_col = (
            randomly_distribute_N_balls_into_K_bins(
                N=num_zero_noise_rates,
                K=num_col_with_noise,
                max_balls_per_bin=K - 2,
                # 2 = one for diagonal, and one to sum to 1
                min_balls_per_bin=0,
            )
            if K > 2
            else np.array([0, 0])
        )  # Special case when K == 2
        stack_nonzero_noise_rates_per_col = list(K - 1 - num_zero_noise_rates_per_col)[::-1]
        # Randomly generate noise rates for columns with noise.
        for col in np.arange(K)[nm_diagonal != 1]:
            num_noise = stack_nonzero_noise_rates_per_col.pop()
            # Generate num_noise noise_rates for the given column.
            noise_rates_col = list(
                generate_n_rand_probabilities_that_sum_to_m(
                    n=num_noise,
                    m=1 - nm_diagonal[col],
                    max_prob=max_noise_rate,
                    min_prob=min_noise_rate,
                )
            )
            # Randomly select which rows of the noisy column to assign the
            # random noise rates
            rows = np.random.choice(
                [row for row in range(K) if row != col], num_noise, replace=False
            )
            for row in rows:
                noise_matrix[row][col] = noise_rates_col.pop()
        if not valid_noise_matrix or noise_matrix_is_valid(noise_matrix, py):
            return noise_matrix
    return None 
[docs]def generate_n_rand_probabilities_that_sum_to_m(
    n,
    m,
    *,
    max_prob=1.0,
    min_prob=0.0,
) -> np.ndarray:
    """
    Generates `n` random probabilities that sum to `m`.
    When ``min_prob=0`` and ``max_prob = 1.0``, use
    ``np.random.dirichlet(np.ones(n))*m`` instead.
    Parameters
    ----------
    n : int
      Length of array of random probabilities to be returned.
    m : float
      Sum of array of random probabilities that is returned.
    max_prob : float, default=1.0
      Maximum probability of any entry in the returned array. Must be between 0 and 1.
    min_prob : float, default=0.0
      Minimum probability of any entry in the returned array. Must be between 0 and 1.
    Returns
    -------
    probabilities : np.ndarray
      An array of probabilities.
    """
    if n == 0:
        return np.array([])
    if (max_prob + FLOATING_POINT_COMPARISON) < m / float(n):
        raise ValueError(
            "max_prob must be greater or equal to m / n, but "
            + "max_prob = "
            + str(max_prob)
            + ", m = "
            + str(m)
            + ", n = "
            + str(n)
            + ", m / n = "
            + str(m / float(n))
        )
    if min_prob > (m + FLOATING_POINT_COMPARISON) / float(n):
        raise ValueError(
            "min_prob must be less or equal to m / n, but "
            + "max_prob = "
            + str(max_prob)
            + ", m = "
            + str(m)
            + ", n = "
            + str(n)
            + ", m / n = "
            + str(m / float(n))
        )
    # When max_prob = 1, min_prob = 0, the next two lines are equivalent to:
    #   intermediate = np.sort(np.append(np.random.uniform(0, 1, n-1), [0, 1]))
    #   result = (intermediate[1:] - intermediate[:-1]) * m
    result = np.random.dirichlet(np.ones(n)) * m
    min_val = min(result)
    max_val = max(result)
    while max_val > (max_prob + FLOATING_POINT_COMPARISON):
        new_min = min_val + (max_val - max_prob)
        # This adjustment prevents the new max from always being max_prob.
        adjustment = (max_prob - new_min) * np.random.rand()
        result[np.argmin(result)] = new_min + adjustment
        result[np.argmax(result)] = max_prob - adjustment
        min_val = min(result)
        max_val = max(result)
    min_val = min(result)
    max_val = max(result)
    while min_val < (min_prob - FLOATING_POINT_COMPARISON):
        min_val = min(result)
        max_val = max(result)
        new_max = max_val - (min_prob - min_val)
        # This adjustment prevents the new min from always being min_prob.
        adjustment = (new_max - min_prob) * np.random.rand()
        result[np.argmax(result)] = new_max - adjustment
        result[np.argmin(result)] = min_prob + adjustment
        min_val = min(result)
        max_val = max(result)
    return result 
[docs]def randomly_distribute_N_balls_into_K_bins(
    N,  # int
    K,  # int
    *,
    max_balls_per_bin=None,
    min_balls_per_bin=None,
) -> np.ndarray:
    """Returns a uniformly random numpy integer array of length `N` that sums
    to `K`.
    Parameters
    ----------
    N : int
      Number of balls.
    K : int
      Number of bins.
    max_balls_per_bin : int
      Ensure that each bin contains at most `max_balls_per_bin` balls.
    min_balls_per_bin : int
      Ensure that each bin contains at least `min_balls_per_bin` balls.
    Returns
    -------
    int_array : np.array
      Length `N` array that sums to `K`.
    """
    if N == 0:
        return np.zeros(K, dtype=int)
    if max_balls_per_bin is None:
        max_balls_per_bin = N
    else:
        max_balls_per_bin = min(max_balls_per_bin, N)
    if min_balls_per_bin is None:
        min_balls_per_bin = 0
    else:
        min_balls_per_bin = min(min_balls_per_bin, N / K)
    if N / float(K) > max_balls_per_bin:
        N = max_balls_per_bin * K
    arr = np.round(
        generate_n_rand_probabilities_that_sum_to_m(
            n=K,
            m=1,
            max_prob=max_balls_per_bin / float(N),
            min_prob=min_balls_per_bin / float(N),
        )
        * N
    )
    while sum(arr) != N:
        while sum(arr) > N:  # pragma: no cover
            arr[np.argmax(arr)] -= 1
        while sum(arr) < N:
            arr[np.argmin(arr)] += 1
    return arr.astype(int)