Source code for cleanlab.datalab.issue_manager.outlier

# Copyright (C) 2017-2023  Cleanlab Inc.
# This file is part of cleanlab.
#
# cleanlab is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cleanlab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with cleanlab.  If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations

from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional, Tuple, Union, cast

from scipy.sparse import csr_matrix
from scipy.stats import iqr
import numpy as np
import pandas as pd

from cleanlab.datalab.issue_manager import IssueManager
from cleanlab.outlier import OutOfDistribution, transform_distances_to_scores

if TYPE_CHECKING:  # pragma: no cover
    import numpy.typing as npt
    from sklearn.neighbors import NearestNeighbors
    from cleanlab.datalab.datalab import Datalab


[docs]class OutlierIssueManager(IssueManager): """Manages issues related to out-of-distribution examples.""" description: ClassVar[ str ] = """Examples that are very different from the rest of the dataset (i.e. potentially out-of-distribution or rare/anomalous instances). """ issue_name: ClassVar[str] = "outlier" verbosity_levels = { 0: [], 1: [], 2: ["average_ood_score"], 3: [], } DEFAULT_THRESHOLDS = { "features": 0.37037, "pred_probs": 0.13, } """Default thresholds for outlier detection. If outlier detection is performed on the features, an example whose average distance to their k nearest neighbors is greater than Q3_avg_dist + (1 / threshold - 1) * IQR_avg_dist is considered an outlier. If outlier detection is performed on the predicted probabilities, an example whose average score is lower than threshold * median_outlier_score is considered an outlier. """ def __init__( self, datalab: Datalab, threshold: Optional[float] = None, **kwargs, ): super().__init__(datalab) ood_kwargs = kwargs.get("ood_kwargs", {}) valid_ood_params = OutOfDistribution.DEFAULT_PARAM_DICT.keys() params = { key: value for key, value in ((k, kwargs.get(k, None)) for k in valid_ood_params) if value is not None } if params: ood_kwargs["params"] = params self.ood: OutOfDistribution = OutOfDistribution(**ood_kwargs) self.threshold = threshold self._embeddings: Optional[np.ndarray] = None self._metric: str = None # type: ignore
[docs] def find_issues( self, features: Optional[npt.NDArray] = None, pred_probs: Optional[np.ndarray] = None, **kwargs, ) -> None: knn_graph = self._process_knn_graph_from_inputs(kwargs) distances: Optional[np.ndarray] = None if knn_graph is not None: N = knn_graph.shape[0] k = knn_graph.nnz // N t = cast(int, self.ood.params["t"]) distances = knn_graph.data.reshape(-1, k) assert isinstance(distances, np.ndarray) scores = transform_distances_to_scores(distances, k=k, t=t) elif features is not None: scores = self._score_with_features(features, **kwargs) elif pred_probs is not None: scores = self._score_with_pred_probs(pred_probs, **kwargs) else: if kwargs.get("knn_graph", None) is not None: raise ValueError( "knn_graph is provided, but not sufficiently large to compute the scores based on the provided hyperparameters." ) raise ValueError(f"Either features pred_probs must be provided.") if features is not None or knn_graph is not None: if knn_graph is None: assert ( features is not None ), "features must be provided so that we can compute the knn graph." knn_graph = self._process_knn_graph_from_features(kwargs) distances = knn_graph.data.reshape(knn_graph.shape[0], -1) assert isinstance(distances, np.ndarray) ( self.threshold, is_issue_column, ) = self._compute_threshold_and_issue_column_from_distances(distances, self.threshold) else: assert pred_probs is not None # Threshold based on pred_probs, very small scores are outliers if self.threshold is None: self.threshold = self.DEFAULT_THRESHOLDS["pred_probs"] if not 0 <= self.threshold: raise ValueError(f"threshold must be non-negative, but got {self.threshold}.") is_issue_column = scores < self.threshold * np.median(scores) self.issues = pd.DataFrame( { f"is_{self.issue_name}_issue": is_issue_column, self.issue_score_key: scores, }, ) self.summary = self.make_summary(score=scores.mean()) self.info = self.collect_info(knn_graph=knn_graph)
def _process_knn_graph_from_inputs(self, kwargs: Dict[str, Any]) -> Union[csr_matrix, None]: """Determine if a knn_graph is provided in the kwargs or if one is already stored in the associated Datalab instance.""" knn_graph_kwargs: Optional[csr_matrix] = kwargs.get("knn_graph", None) knn_graph_stats = self.datalab.get_info("statistics").get("weighted_knn_graph", None) knn_graph: Optional[csr_matrix] = None if knn_graph_kwargs is not None: knn_graph = knn_graph_kwargs elif knn_graph_stats is not None: knn_graph = knn_graph_stats if isinstance(knn_graph, csr_matrix) and kwargs.get("k", 0) > ( knn_graph.nnz // knn_graph.shape[0] ): # If the provided knn graph is insufficient, then we need to recompute the knn graph # with the provided features knn_graph = None return knn_graph def _compute_threshold_and_issue_column_from_distances( self, distances: np.ndarray, threshold: Optional[float] = None ) -> Tuple[float, np.ndarray]: avg_distances = distances.mean(axis=1) if threshold: if not (isinstance(threshold, (int, float)) and 0 <= threshold <= 1): raise ValueError( f"threshold must be a number between 0 and 1, got {threshold} of type {type(threshold)}." ) if threshold is None: threshold = OutlierIssueManager.DEFAULT_THRESHOLDS["features"] q3_distance = np.percentile(avg_distances, 75) iqr_scale = 1 / threshold - 1 if threshold != 0 else np.inf return threshold, avg_distances > q3_distance + iqr_scale * iqr(avg_distances) def _process_knn_graph_from_features(self, kwargs: Dict) -> csr_matrix: # Check if the weighted knn graph exists in info knn_graph = self.datalab.get_info("statistics").get("weighted_knn_graph", None) k: int = 0 # Used to check if the knn graph needs to be recomputed, already set in the knn object if knn_graph is not None: k = knn_graph.nnz // knn_graph.shape[0] knn: NearestNeighbors = self.ood.params["knn"] # type: ignore if kwargs.get("knn", None) is not None or knn.n_neighbors > k: # type: ignore[union-attr] # If the pre-existing knn graph has fewer neighbors than the knn object, # then we need to recompute the knn graph assert knn == self.ood.params["knn"] # type: ignore[union-attr] knn_graph = knn.kneighbors_graph(mode="distance") # type: ignore[union-attr] self._metric = knn.metric # type: ignore[union-attr] return knn_graph
[docs] def collect_info(self, *, knn_graph: Optional[csr_matrix] = None) -> dict: issues_dict = { "average_ood_score": self.issues[self.issue_score_key].mean(), "threshold": self.threshold, } pred_probs_issues_dict: Dict[str, Any] = {} feature_issues_dict = {} if knn_graph is not None: knn = self.ood.params["knn"] # type: ignore N = knn_graph.shape[0] k = knn_graph.nnz // N dists = knn_graph.data.reshape(N, -1)[:, 0] nn_ids = knn_graph.indices.reshape(N, -1)[:, 0] feature_issues_dict.update( { "k": k, # type: ignore[union-attr] "nearest_neighbor": nn_ids.tolist(), "distance_to_nearest_neighbor": dists.tolist(), } ) if self.ood.params["knn"] is not None: knn = self.ood.params["knn"] feature_issues_dict.update({"metric": knn.metric}) # type: ignore[union-attr] if self.ood.params["confident_thresholds"] is not None: pass # statistics_dict = self._build_statistics_dictionary(knn_graph=knn_graph) ood_params_dict = self.ood.params knn_dict = { **pred_probs_issues_dict, **feature_issues_dict, } info_dict: Dict[str, Any] = { **issues_dict, **ood_params_dict, # type: ignore[arg-type] **knn_dict, **statistics_dict, } return info_dict
def _build_statistics_dictionary( self, *, knn_graph: Optional[csr_matrix] ) -> Dict[str, Dict[str, Any]]: statistics_dict: Dict[str, Dict[str, Any]] = {"statistics": {}} # Add the knn graph as a statistic if necessary graph_key = "weighted_knn_graph" old_knn_graph = self.datalab.get_info("statistics").get(graph_key, None) old_graph_exists = old_knn_graph is not None prefer_new_graph = ( not old_graph_exists or (isinstance(knn_graph, csr_matrix) and knn_graph.nnz > old_knn_graph.nnz) or self._metric != self.datalab.get_info("statistics").get("knn_metric", None) ) if prefer_new_graph: if knn_graph is not None: statistics_dict["statistics"][graph_key] = knn_graph if self._metric is not None: statistics_dict["statistics"]["knn_metric"] = self._metric return statistics_dict def _score_with_pred_probs(self, pred_probs: np.ndarray, **kwargs) -> np.ndarray: # Remove "threshold" from kwargs if it exists kwargs.pop("threshold", None) scores = self.ood.fit_score(pred_probs=pred_probs, labels=self.datalab._labels, **kwargs) return scores def _score_with_features(self, features: npt.NDArray, **kwargs) -> npt.NDArray: scores = self.ood.fit_score(features=features) return scores