Source code for cleanlab.internal.multiannotator_utils
"""
Helper methods used internally in cleanlab.multiannotator
"""
import warnings
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from cleanlab.internal.numerics import softmax
from cleanlab.internal.util import get_num_classes, value_counts
from cleanlab.internal.validation import assert_valid_class_labels
from cleanlab.typing import LabelLike
SMALL_CONST = 1e-30
[docs]def assert_valid_inputs_multiannotator(
labels_multiannotator: np.ndarray,
pred_probs: Optional[np.ndarray] = None,
ensemble: bool = False,
allow_single_label: bool = False,
annotator_ids: Optional[pd.Index] = None,
) -> None:
"""Validate format of multi-annotator labels"""
# Check that labels_multiannotator is a 2D array
if labels_multiannotator.ndim != 2:
raise ValueError(
"labels_multiannotator must be a 2D array or dataframe, "
"each row represents an example and each column represents an annotator."
)
# Raise error if labels are not formatted properly
if any([isinstance(label, str) for label in labels_multiannotator.ravel()]):
raise ValueError(
"Labels cannot be strings, they must be zero-indexed integers corresponding to class indices."
)
# Raise error if labels_multiannotator has NaN rows
nan_row_mask = np.isnan(labels_multiannotator).all(axis=1)
if nan_row_mask.any():
nan_rows = list(np.where(nan_row_mask)[0])
raise ValueError(
"labels_multiannotator cannot have rows with all NaN, each example must have at least one label.\n"
f"Examples {nan_rows} do not have any labels."
)
# Raise error if labels_multiannotator has NaN columns
nan_col_mask = np.isnan(labels_multiannotator).all(axis=0)
if nan_col_mask.any():
if annotator_ids is not None:
nan_columns = list(annotator_ids[np.where(nan_col_mask)[0]])
else:
nan_columns = list(np.where(nan_col_mask)[0])
raise ValueError(
"labels_multiannotator cannot have columns with all NaN, each annotator must annotator at least one example.\n"
f"Annotators {nan_columns} did not label any examples."
)
if not allow_single_label:
# Raise error if labels_multiannotator has <= 1 column
if labels_multiannotator.shape[1] <= 1:
raise ValueError(
"labels_multiannotator must have more than one column.\n"
"If there is only one annotator, use cleanlab.rank.get_label_quality_scores instead"
)
# Raise error if labels_multiannotator only has 1 label per example
if (np.sum(~np.isnan(labels_multiannotator), axis=1) == 1).all():
raise ValueError(
"Each example only has one label, collapse the labels into a 1-D array and use "
"cleanlab.rank.get_label_quality_scores instead"
)
# Raise warning if no examples with 2 or more annotators agree
# TODO: might shift this later in the code to avoid extra compute
has_agreement = np.zeros(labels_multiannotator.shape[0], dtype=bool)
for i in np.unique(labels_multiannotator):
has_agreement |= (labels_multiannotator == i).sum(axis=1) > 1
if not has_agreement.any():
warnings.warn("Annotators do not agree on any example. Check input data.")
# Check labels
all_labels_flatten = labels_multiannotator.ravel()
all_labels_flatten = all_labels_flatten[~np.isnan(all_labels_flatten)]
assert_valid_class_labels(all_labels_flatten, allow_one_class=True)
# Raise error if number of classes in labels_multiannoator does not match number of classes in pred_probs
if pred_probs is not None:
if not isinstance(pred_probs, np.ndarray):
raise TypeError("pred_probs must be a numpy array.")
if ensemble:
if pred_probs.ndim != 3:
error_message = "pred_probs must be a 3d array."
if pred_probs.ndim == 2:
error_message += " If you have a 2d pred_probs array, use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs.shape[1] != len(labels_multiannotator):
raise ValueError("each pred_probs and labels_multiannotator must have same length.")
num_classes = pred_probs.shape[2]
else:
if pred_probs.ndim != 2:
error_message = "pred_probs must be a 2d array."
if pred_probs.ndim == 3:
error_message += " If you have a 3d pred_probs array, use the ensemble version of this function."
raise ValueError(error_message)
if len(pred_probs) != len(labels_multiannotator):
raise ValueError("pred_probs and labels_multiannotator must have same length.")
num_classes = pred_probs.shape[1]
highest_class = np.nanmax(labels_multiannotator) + 1
# this allows for missing labels, but not missing columns in pred_probs
if num_classes < highest_class:
raise ValueError(
f"pred_probs must have at least {int(highest_class)} columns based on the largest class label "
"which appears in labels_multiannotator. Perhaps some rarely-annotated classes were lost while "
"establishing consensus labels used to train your classifier."
)
[docs]def assert_valid_pred_probs(
pred_probs: Optional[np.ndarray] = None,
pred_probs_unlabeled: Optional[np.ndarray] = None,
ensemble: bool = False,
):
"""Validate format of pred_probs for multiannotator active learning functions"""
if pred_probs is None and pred_probs_unlabeled is None:
raise ValueError(
"pred_probs and pred_probs_unlabeled cannot both be None, specify at least one of the two."
)
if ensemble:
if pred_probs is not None:
if not isinstance(pred_probs, np.ndarray):
raise TypeError("pred_probs must be a numpy array.")
if pred_probs.ndim != 3:
error_message = "pred_probs must be a 3d array."
if pred_probs.ndim == 2: # pragma: no cover
error_message += " If you have a 2d pred_probs array (ie. only one predictor), use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs_unlabeled is not None:
if not isinstance(pred_probs_unlabeled, np.ndarray):
raise TypeError("pred_probs_unlabeled must be a numpy array.")
if pred_probs_unlabeled.ndim != 3:
error_message = "pred_probs_unlabeled must be a 3d array."
if pred_probs_unlabeled.ndim == 2: # pragma: no cover
error_message += " If you have a 2d pred_probs_unlabeled array, use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs is not None and pred_probs_unlabeled is not None:
if pred_probs.shape[2] != pred_probs_unlabeled.shape[2]:
raise ValueError(
"pred_probs and pred_probs_unlabeled must have the same number of classes"
)
else:
if pred_probs is not None:
if not isinstance(pred_probs, np.ndarray):
raise TypeError("pred_probs must be a numpy array.")
if pred_probs.ndim != 2:
error_message = "pred_probs must be a 2d array."
if pred_probs.ndim == 3: # pragma: no cover
error_message += " If you have a 3d pred_probs array, use the ensemble version of this function."
raise ValueError(error_message)
if pred_probs_unlabeled is not None:
if not isinstance(pred_probs_unlabeled, np.ndarray):
raise TypeError("pred_probs_unlabeled must be a numpy array.")
if pred_probs_unlabeled.ndim != 2:
error_message = "pred_probs_unlabeled must be a 2d array."
if pred_probs_unlabeled.ndim == 3: # pragma: no cover
error_message += " If you have a 3d pred_probs_unlabeled array, use the non-ensemble version of this function."
raise ValueError(error_message)
if pred_probs is not None and pred_probs_unlabeled is not None:
if pred_probs.shape[1] != pred_probs_unlabeled.shape[1]:
raise ValueError(
"pred_probs and pred_probs_unlabeled must have the same number of classes"
)
[docs]def format_multiannotator_labels(labels: LabelLike) -> Tuple[pd.DataFrame, dict]:
"""Takes an array of labels and formats it such that labels are in the set ``0, 1, ..., K-1``,
where ``K`` is the number of classes. The labels are assigned based on lexicographic order.
Returns
-------
formatted_labels
Returns pd.DataFrame of shape ``(N,M)``. The return labels will be properly formatted and can be passed to
cleanlab.multiannotator functions.
mapping
A dictionary showing the mapping of new to old labels, such that ``mapping[k]`` returns the name of the k-th class.
"""
if isinstance(labels, pd.DataFrame):
np_labels = labels.values
elif isinstance(labels, np.ndarray):
np_labels = labels
else:
raise TypeError("labels must be 2D numpy array or pandas DataFrame")
unique_labels = pd.unique(np_labels.ravel())
try:
unique_labels = unique_labels[~np.isnan(unique_labels)]
unique_labels.sort()
except TypeError: # np.unique / np.sort cannot handle string values or pd.NA types
nan_mask = np.array([(l is np.nan) or (l is pd.NA) or (l == "nan") for l in unique_labels])
unique_labels = unique_labels[~nan_mask]
unique_labels.sort()
# convert float labels (that arose because np.nan is float type) to int
if unique_labels.dtype == "float":
unique_labels = unique_labels.astype("int")
label_map = {label: i for i, label in enumerate(unique_labels)}
inverse_map = {i: label for label, i in label_map.items()}
if isinstance(labels, np.ndarray):
labels = pd.DataFrame(labels)
formatted_labels = labels.replace(label_map)
return formatted_labels, inverse_map
[docs]def check_consensus_label_classes(
labels_multiannotator: np.ndarray,
consensus_label: np.ndarray,
consensus_method: str,
) -> None:
"""Check if any classes no longer appear in the set of consensus labels (established using the consensus_method stated)"""
unique_ma_labels = np.unique(labels_multiannotator)
unique_ma_labels = unique_ma_labels[~np.isnan(unique_ma_labels)]
labels_set_difference = set(unique_ma_labels) - set(consensus_label)
if len(labels_set_difference) > 0:
print(
"CAUTION: Number of unique classes has been reduced from the original data when establishing consensus labels "
f"using consensus method '{consensus_method}', likely due to some classes being rarely annotated. "
"If training a classifier on these consensus labels, it will never see any of the omitted classes unless you "
"manually replace some of the consensus labels.\n"
f"Classes in the original data but not in consensus labels: {list(map(int, labels_set_difference))}"
)
[docs]def compute_soft_cross_entropy(
labels_multiannotator: np.ndarray,
pred_probs: np.ndarray,
) -> float:
"""Compute soft cross entropy between the annotators' empirical label distribution and model pred_probs"""
num_classes = get_num_classes(pred_probs=pred_probs)
empirical_label_distribution = np.full((len(labels_multiannotator), num_classes), np.nan)
for i, labels in enumerate(labels_multiannotator):
labels_subset = labels[~np.isnan(labels)]
empirical_label_distribution[i, :] = value_counts(
labels_subset, num_classes=num_classes
) / len(labels_subset)
clipped_pred_probs = np.clip(pred_probs, a_min=SMALL_CONST, a_max=None)
soft_cross_entropy = -np.sum(
empirical_label_distribution * np.log(clipped_pred_probs), axis=1
) / np.log(num_classes)
return soft_cross_entropy
[docs]def find_best_temp_scaler(
labels_multiannotator: np.ndarray,
pred_probs: np.ndarray,
coarse_search_range: list = [0.1, 0.2, 0.5, 0.8, 1, 2, 3, 5, 8],
fine_search_size: int = 4,
) -> float:
"""Find the best temperature scaling factor that minimizes the soft cross entropy between the annotators' empirical label distribution
and model pred_probs"""
soft_cross_entropy_coarse = np.full(len(coarse_search_range), np.nan)
log_pred_probs = np.log(
pred_probs, where=pred_probs > 0, out=np.full(pred_probs.shape, -np.inf)
)
for i, curr_temp in enumerate(coarse_search_range):
scaled_pred_probs = softmax(log_pred_probs, temperature=curr_temp, axis=1, shift=False)
soft_cross_entropy_coarse[i] = np.mean(
compute_soft_cross_entropy(labels_multiannotator, scaled_pred_probs)
)
min_entropy_ind = np.argmin(soft_cross_entropy_coarse)
fine_search_range = _set_fine_search_range(
coarse_search_range, fine_search_size, min_entropy_ind
)
soft_cross_entropy_fine = np.full(len(fine_search_range), np.nan)
for i, curr_temp in enumerate(fine_search_range):
scaled_pred_probs = softmax(log_pred_probs, temperature=curr_temp, axis=1, shift=False)
soft_cross_entropy_fine[i] = np.mean(
compute_soft_cross_entropy(labels_multiannotator, scaled_pred_probs)
)
best_temp = fine_search_range[np.argmin(soft_cross_entropy_fine)]
return best_temp
def _set_fine_search_range(
coarse_search_range: list, fine_search_size: int, min_entropy_ind: np.intp
) -> np.ndarray:
fine_search_range = np.array([])
if min_entropy_ind != 0:
fine_search_range = np.append(
np.linspace(
coarse_search_range[min_entropy_ind - 1],
coarse_search_range[min_entropy_ind],
fine_search_size,
endpoint=False,
),
fine_search_range,
)
if min_entropy_ind != len(coarse_search_range) - 1:
fine_search_range = np.append(
fine_search_range,
np.linspace(
coarse_search_range[min_entropy_ind],
coarse_search_range[min_entropy_ind + 1],
fine_search_size + 1,
endpoint=True,
),
)
return fine_search_range
[docs]def temp_scale_pred_probs(
pred_probs: np.ndarray,
temp: float,
) -> np.ndarray:
"""Scales pred_probs by the given temperature factor. Temperature of <1 will sharpen the pred_probs while temperatures of >1 will smoothen it."""
# clip pred_probs to prevent taking log of 0
pred_probs = np.clip(pred_probs, a_min=SMALL_CONST, a_max=None)
pred_probs = pred_probs / np.sum(pred_probs, axis=1)[:, np.newaxis]
# apply temperate scale
scaled_pred_probs = softmax(np.log(pred_probs), temperature=temp, axis=1, shift=False)
scaled_pred_probs = (
scaled_pred_probs / np.sum(scaled_pred_probs, axis=1)[:, np.newaxis]
) # normalize
return scaled_pred_probs