Source code for cleanlab.internal.multiannotator_utils
# Copyright (C) 2017-2023 Cleanlab Inc.
# This file is part of cleanlab.
#
# cleanlab is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cleanlab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with cleanlab. If not, see <https://www.gnu.org/licenses/>.
"""
Helper methods used internally in cleanlab.multiannotator
"""
from cleanlab.typing import LabelLike
from typing import Optional, Tuple
import warnings
import numpy as np
import pandas as pd
from cleanlab.internal.validation import assert_valid_class_labels
from cleanlab.internal.numerics import softmax
from cleanlab.internal.util import get_num_classes, value_counts
SMALL_CONST = 1e-30
[docs]def assert_valid_inputs_multiannotator(
labels_multiannotator: np.ndarray,
pred_probs: Optional[np.ndarray] = None,
ensemble: bool = False,
allow_single_label: bool = False,
annotator_ids: Optional[pd.Index] = None,
) -> None:
"""Validate format of multi-annotator labels"""
# Check that labels_multiannotator is a 2D array
if labels_multiannotator.ndim != 2:
raise ValueError(
"labels_multiannotator must be a 2D array or dataframe, "
"each row represents an example and each column represents an annotator."
)
# Raise error if labels are not formatted properly
if any([isinstance(label, str) for label in labels_multiannotator.ravel()]):
raise ValueError(
"Labels cannot be strings, they must be zero-indexed integers corresponding to class indices."
)
# Raise error if labels_multiannotator has NaN rows
nan_row_mask = np.isnan(labels_multiannotator).all(axis=1)
if nan_row_mask.any():
nan_rows = list(np.where(nan_row_mask)[0])
raise ValueError(
"labels_multiannotator cannot have rows with all NaN, each example must have at least one label.\n"
f"Examples {nan_rows} do not have any labels."
)
# Raise error if labels_multiannotator has NaN columns
nan_col_mask = np.isnan(labels_multiannotator).all(axis=0)
if nan_col_mask.any():
if annotator_ids is not None:
nan_columns = list(annotator_ids[np.where(nan_col_mask)[0]])
else:
nan_columns = list(np.where(nan_col_mask)[0])
raise ValueError(
"labels_multiannotator cannot have columns with all NaN, each annotator must annotator at least one example.\n"
f"Annotators {nan_columns} did not label any examples."
)
if not allow_single_label:
# Raise error if labels_multiannotator has <= 1 column
if labels_multiannotator.shape[1] <= 1:
raise ValueError(
"labels_multiannotator must have more than one column.\n"
"If there is only one annotator, use cleanlab.rank.get_label_quality_scores instead"
)
# Raise error if labels_multiannotator only has 1 label per example
if (np.sum(~np.isnan(labels_multiannotator), axis=1) == 1).all():
raise ValueError(
"Each example only has one label, collapse the labels into a 1-D array and use "
"cleanlab.rank.get_label_quality_scores instead"
)
# Raise warning if no examples with 2 or more annotators agree
# TODO: might shift this later in the code to avoid extra compute
if np.apply_along_axis(
lambda s: np.array_equal(np.unique(s[~np.isnan(s)]), s[~np.isnan(s)]),
axis=1,
arr=labels_multiannotator,
).all():
warnings.warn("Annotators do not agree on any example. Check input data.")
# Check labels
all_labels_flatten = labels_multiannotator.ravel()
all_labels_flatten = all_labels_flatten[~np.isnan(all_labels_flatten)]
assert_valid_class_labels(all_labels_flatten, allow_one_class=True)
# Raise error if number of classes in labels_multiannoator does not match number of classes in pred_probs
if pred_probs is not None:
if not isinstance(pred_probs, np.ndarray):
raise TypeError("pred_probs must be a numpy array.")
if ensemble:
if pred_probs.ndim != 3:
error_message = "pred_probs must be a 3d array."
if pred_probs.ndim == 2:
error_message += " If you have a 2d pred_probs array, use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs.shape[1] != len(labels_multiannotator):
raise ValueError("each pred_probs and labels_multiannotator must have same length.")
num_classes = pred_probs.shape[2]
else:
if pred_probs.ndim != 2:
error_message = "pred_probs must be a 2d array."
if pred_probs.ndim == 3:
error_message += " If you have a 3d pred_probs array, use the ensemble version of this function."
raise ValueError(error_message)
if len(pred_probs) != len(labels_multiannotator):
raise ValueError("pred_probs and labels_multiannotator must have same length.")
num_classes = pred_probs.shape[1]
highest_class = np.nanmax(labels_multiannotator) + 1
# this allows for missing labels, but not missing columns in pred_probs
if num_classes < highest_class:
raise ValueError(
f"pred_probs must have at least {int(highest_class)} columns based on the largest class label "
"which appears in labels_multiannotator. Perhaps some rarely-annotated classes were lost while "
"establishing consensus labels used to train your classifier."
)
[docs]def assert_valid_pred_probs(
pred_probs: Optional[np.ndarray] = None,
pred_probs_unlabeled: Optional[np.ndarray] = None,
ensemble: bool = False,
):
"""Validate format of pred_probs for multiannotator active learning functions"""
if pred_probs is None and pred_probs_unlabeled is None:
raise ValueError(
"pred_probs and pred_probs_unlabeled cannot both be None, specify at least one of the two."
)
if ensemble:
if pred_probs is not None:
if not isinstance(pred_probs, np.ndarray):
raise TypeError("pred_probs must be a numpy array.")
if pred_probs.ndim != 3:
error_message = "pred_probs must be a 3d array."
if pred_probs.ndim == 2: # pragma: no cover
error_message += " If you have a 2d pred_probs array (ie. only one predictor), use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs_unlabeled is not None:
if not isinstance(pred_probs_unlabeled, np.ndarray):
raise TypeError("pred_probs_unlabeled must be a numpy array.")
if pred_probs_unlabeled.ndim != 3:
error_message = "pred_probs_unlabeled must be a 3d array."
if pred_probs_unlabeled.ndim == 2: # pragma: no cover
error_message += " If you have a 2d pred_probs_unlabeled array, use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs is not None and pred_probs_unlabeled is not None:
if pred_probs.shape[2] != pred_probs_unlabeled.shape[2]:
raise ValueError(
"pred_probs and pred_probs_unlabeled must have the same number of classes"
)
else:
if pred_probs is not None:
if not isinstance(pred_probs, np.ndarray):
raise TypeError("pred_probs must be a numpy array.")
if pred_probs.ndim != 2:
error_message = "pred_probs must be a 2d array."
if pred_probs.ndim == 3: # pragma: no cover
error_message += " If you have a 3d pred_probs array, use the ensemble version of this function."
raise ValueError(error_message)
if pred_probs_unlabeled is not None:
if not isinstance(pred_probs_unlabeled, np.ndarray):
raise TypeError("pred_probs_unlabeled must be a numpy array.")
if pred_probs_unlabeled.ndim != 2:
error_message = "pred_probs_unlabeled must be a 2d array."
if pred_probs_unlabeled.ndim == 3: # pragma: no cover
error_message += " If you have a 3d pred_probs_unlabeled array, use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs is not None and pred_probs_unlabeled is not None:
if pred_probs.shape[1] != pred_probs_unlabeled.shape[1]:
raise ValueError(
"pred_probs and pred_probs_unlabeled must have the same number of classes"
)
[docs]def format_multiannotator_labels(labels: LabelLike) -> Tuple[pd.DataFrame, dict]:
"""Takes an array of labels and formats it such that labels are in the set ``0, 1, ..., K-1``,
where ``K`` is the number of classes. The labels are assigned based on lexicographic order.
Returns
-------
formatted_labels
Returns pd.DataFrame of shape ``(N,M)``. The return labels will be properly formatted and can be passed to
cleanlab.multiannotator functions.
mapping
A dictionary showing the mapping of new to old labels, such that ``mapping[k]`` returns the name of the k-th class.
"""
if isinstance(labels, pd.DataFrame):
np_labels = labels.values
elif isinstance(labels, np.ndarray):
np_labels = labels
else:
raise TypeError("labels must be 2D numpy array or pandas DataFrame")
unique_labels = pd.unique(np_labels.ravel())
try:
unique_labels = unique_labels[~np.isnan(unique_labels)]
unique_labels.sort()
except TypeError: # np.unique / np.sort cannot handle string values or pd.NA types
nan_mask = np.array([(l is np.NaN) or (l is pd.NA) or (l == "nan") for l in unique_labels])
unique_labels = unique_labels[~nan_mask]
unique_labels.sort()
# convert float labels (that arose because np.nan is float type) to int
if unique_labels.dtype == "float":
unique_labels = unique_labels.astype("int")
label_map = {label: i for i, label in enumerate(unique_labels)}
inverse_map = {i: label for label, i in label_map.items()}
if isinstance(labels, np.ndarray):
labels = pd.DataFrame(labels)
formatted_labels = labels.replace(label_map)
return formatted_labels, inverse_map
[docs]def check_consensus_label_classes(
labels_multiannotator: np.ndarray,
consensus_label: np.ndarray,
consensus_method: str,
) -> None:
"""Check if any classes no longer appear in the set of consensus labels (established using the consensus_method stated)"""
unique_ma_labels = np.unique(labels_multiannotator)
unique_ma_labels = unique_ma_labels[~np.isnan(unique_ma_labels)]
labels_set_difference = set(unique_ma_labels) - set(consensus_label)
if len(labels_set_difference) > 0:
print(
"CAUTION: Number of unique classes has been reduced from the original data when establishing consensus labels "
f"using consensus method '{consensus_method}', likely due to some classes being rarely annotated. "
"If training a classifier on these consensus labels, it will never see any of the omitted classes unless you "
"manually replace some of the consensus labels.\n"
f"Classes in the original data but not in consensus labels: {list(map(int, labels_set_difference))}"
)
[docs]def compute_soft_cross_entropy(
labels_multiannotator: np.ndarray,
pred_probs: np.ndarray,
) -> float:
"""Compute soft cross entropy between the annotators' empirical label distribution and model pred_probs"""
num_classes = get_num_classes(pred_probs=pred_probs)
empirical_label_distribution = np.full((len(labels_multiannotator), num_classes), np.NaN)
for i, labels in enumerate(labels_multiannotator):
labels_subset = labels[~np.isnan(labels)]
empirical_label_distribution[i, :] = value_counts(
labels_subset, num_classes=num_classes
) / len(labels_subset)
clipped_pred_probs = np.clip(pred_probs, a_min=SMALL_CONST, a_max=None)
soft_cross_entropy = -np.sum(
empirical_label_distribution * np.log(clipped_pred_probs), axis=1
) / np.log(num_classes)
return soft_cross_entropy
[docs]def find_best_temp_scaler(
labels_multiannotator: np.ndarray,
pred_probs: np.ndarray,
coarse_search_range: list = [0.1, 0.2, 0.5, 0.8, 1, 2, 3, 5, 8],
fine_search_size: int = 4,
) -> float:
"""Find the best temperature scaling factor that minimizes the soft cross entropy between the annotators' empirical label distribution
and model pred_probs"""
soft_cross_entropy_coarse = np.full(len(coarse_search_range), np.NaN)
log_pred_probs = np.log(
pred_probs, where=pred_probs > 0, out=np.full(pred_probs.shape, -np.inf)
)
for i, curr_temp in enumerate(coarse_search_range):
scaled_pred_probs = softmax(log_pred_probs, temperature=curr_temp, axis=1, shift=False)
soft_cross_entropy_coarse[i] = np.mean(
compute_soft_cross_entropy(labels_multiannotator, scaled_pred_probs)
)
min_entropy_ind = np.argmin(soft_cross_entropy_coarse)
fine_search_range = _set_fine_search_range(
coarse_search_range, fine_search_size, min_entropy_ind
)
soft_cross_entropy_fine = np.full(len(fine_search_range), np.NaN)
for i, curr_temp in enumerate(fine_search_range):
scaled_pred_probs = softmax(log_pred_probs, temperature=curr_temp, axis=1, shift=False)
soft_cross_entropy_fine[i] = np.mean(
compute_soft_cross_entropy(labels_multiannotator, scaled_pred_probs)
)
best_temp = fine_search_range[np.argmin(soft_cross_entropy_fine)]
return best_temp
def _set_fine_search_range(
coarse_search_range: list, fine_search_size: int, min_entropy_ind: np.intp
) -> np.ndarray:
fine_search_range = np.array([])
if min_entropy_ind != 0:
fine_search_range = np.append(
np.linspace(
coarse_search_range[min_entropy_ind - 1],
coarse_search_range[min_entropy_ind],
fine_search_size,
endpoint=False,
),
fine_search_range,
)
if min_entropy_ind != len(coarse_search_range) - 1:
fine_search_range = np.append(
fine_search_range,
np.linspace(
coarse_search_range[min_entropy_ind],
coarse_search_range[min_entropy_ind + 1],
fine_search_size + 1,
endpoint=True,
),
)
return fine_search_range
[docs]def temp_scale_pred_probs(
pred_probs: np.ndarray,
temp: float,
) -> np.ndarray:
"""Scales pred_probs by the given temperature factor. Temperature of <1 will sharpen the pred_probs while temperatures of >1 will smoothen it."""
# clip pred_probs to prevent taking log of 0
pred_probs = np.clip(pred_probs, a_min=SMALL_CONST, a_max=None)
pred_probs = pred_probs / np.sum(pred_probs, axis=1)[:, np.newaxis]
# apply temperate scale
scaled_pred_probs = softmax(np.log(pred_probs), temperature=temp, axis=1, shift=False)
scaled_pred_probs = (
scaled_pred_probs / np.sum(scaled_pred_probs, axis=1)[:, np.newaxis]
) # normalize
return scaled_pred_probs