# Copyright (C) 2017-2023 Cleanlab Inc.
# This file is part of cleanlab.
#
# cleanlab is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cleanlab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with cleanlab. If not, see <https://www.gnu.org/licenses/>.
from __future__ import annotations
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Dict,
List,
Optional,
Union,
)
import warnings
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix
from sklearn.exceptions import NotFittedError
from sklearn.neighbors import NearestNeighbors
from sklearn.utils.validation import check_is_fitted
from cleanlab.datalab.internal.issue_manager import IssueManager
if TYPE_CHECKING: # pragma: no cover
import numpy.typing as npt
import pandas as pd
from cleanlab.datalab.datalab import Datalab
[docs]class DataValuationIssueManager(IssueManager):
"""
Detect which examples in a dataset are least valuable via an approximate Data Shapely value.
Examples
--------
.. code-block:: python
>>> from cleanlab import Datalab
>>> import numpy as np
>>> from sklearn.neighbors import NearestNeighbors
>>>
>>> # Generate two distinct clusters
>>> X = np.vstack([
... np.random.normal(-1, 1, (25, 2)),
... np.random.normal(1, 1, (25, 2)),
... ])
>>> y = np.array([0]*25 + [1]*25)
>>>
>>> # Initialize Datalab with data
>>> lab = Datalab(data={"y": y}, label_name="y")
>>>
>>> # Creating a knn_graph for data valuation
>>> knn = NearestNeighbors(n_neighbors=10).fit(X)
>>> knn_graph = knn.kneighbors_graph(mode='distance')
>>>
>>> # Specifying issue types for data valuation
>>> issue_types = {"data_valuation": {}}
>>> lab.find_issues(knn_graph=knn_graph, issue_types=issue_types)
"""
description: ClassVar[
str
] = """
Examples that contribute minimally to a model's training
receive lower valuation scores.
Since the original knn-shapley value is in [-1, 1], we transform it to [0, 1] by:
.. math::
0.5 \times (\text{shapley} + 1)
here shapley is the original knn-shapley value.
"""
issue_name: ClassVar[str] = "data_valuation"
issue_score_key: ClassVar[str]
verbosity_levels: ClassVar[Dict[int, List[str]]] = {
0: [],
1: [],
2: [],
3: ["average_data_valuation"],
}
DEFAULT_THRESHOLD = 0.5
def __init__(
self,
datalab: Datalab,
metric: Optional[str] = None,
threshold: Optional[float] = None,
k: int = 10,
**kwargs,
):
super().__init__(datalab)
self.metric = metric
self.k = k
self.threshold = threshold if threshold is not None else self.DEFAULT_THRESHOLD
[docs] def find_issues(
self,
features: Optional[npt.NDArray] = None,
**kwargs,
) -> None:
"""Calculate the data valuation score with a provided or existing knn graph.
Based on KNN-Shapley value described in https://arxiv.org/abs/1911.07128
The larger the score, the more valuable the data point is, the more contribution it will make to the model's training.
Parameters
----------
knn_graph : csr_matrix
A sparse matrix representing the knn graph.
"""
self.k = kwargs.get("k", self.k)
knn_graph = self._process_knn_graph_from_inputs(kwargs)
old_knn_metric = self.datalab.get_info("statistics").get("knn_metric")
metric_changes = self.metric and self.metric != old_knn_metric
labels = self.datalab.labels
if not isinstance(labels, np.ndarray):
error_msg = (
f"Expected labels to be a numpy array of shape (n_samples,) to use with DataValuationIssueManager, "
f"but got {type(labels)} instead."
)
raise TypeError(error_msg)
if knn_graph is None or metric_changes:
if features is None:
raise ValueError(
"If a knn_graph is not provided, features must be provided to fit a new knn."
)
if self.metric is None:
self.metric = "cosine" if features.shape[1] > 3 else "euclidean"
knn = NearestNeighbors(n_neighbors=self.k, metric=self.metric).fit(features)
if self.metric and self.metric != knn.metric:
warnings.warn(
f"Metric {self.metric} does not match metric {knn.metric} used to fit knn. "
"Most likely an existing NearestNeighbors object was passed in, but a different "
"metric was specified."
)
self.metric = knn.metric
try:
check_is_fitted(knn)
except NotFittedError:
knn.fit(features)
knn_graph = knn.kneighbors_graph(mode="distance")
if labels is None:
raise ValueError("labels must be provided to run data valuation")
scores = _knn_shapley_score(knn_graph, labels, self.k)
self.issues = pd.DataFrame(
{
f"is_{self.issue_name}_issue": scores < self.threshold,
self.issue_score_key: scores,
},
)
self.summary = self.make_summary(score=scores.mean())
self.info = self.collect_info(issues=self.issues, knn_graph=knn_graph)
def _process_knn_graph_from_inputs(self, kwargs: Dict[str, Any]) -> Union[csr_matrix, None]:
"""Determine if a knn_graph is provided in the kwargs or if one is already stored in the associated Datalab instance."""
knn_graph_kwargs: Optional[csr_matrix] = kwargs.get("knn_graph", None)
knn_graph_stats = self.datalab.get_info("statistics").get("weighted_knn_graph", None)
knn_graph: Optional[csr_matrix] = None
if knn_graph_kwargs is not None:
knn_graph = knn_graph_kwargs
elif knn_graph_stats is not None:
knn_graph = knn_graph_stats
if isinstance(knn_graph, csr_matrix) and self.k > (knn_graph.nnz // knn_graph.shape[0]):
self.k = knn_graph.nnz // knn_graph.shape[0]
Warning(
f"k is larger than the number of neighbors in the knn graph. Using k={self.k} instead."
)
return knn_graph
[docs] def collect_info(self, issues: pd.DataFrame, knn_graph: csr_matrix) -> dict:
issues_info = {
"num_low_valuation_issues": sum(issues[f"is_{self.issue_name}_issue"]),
"average_data_valuation": issues[self.issue_score_key].mean(),
}
params_dict = {
"metric": self.metric,
"k": self.k,
"threshold": self.threshold,
}
statistics_dict = self._build_statistics_dictionary(knn_graph=knn_graph)
info_dict = {
**issues_info,
**params_dict,
**statistics_dict,
}
return info_dict
def _build_statistics_dictionary(self, knn_graph: csr_matrix) -> Dict[str, Dict[str, Any]]:
statistics_dict: Dict[str, Dict[str, Any]] = {"statistics": {}}
# Add the knn graph as a statistic if necessary
graph_key = "weighted_knn_graph"
old_knn_graph = self.datalab.get_info("statistics").get(graph_key, None)
old_graph_exists = old_knn_graph is not None
prefer_new_graph = (
not old_graph_exists
or knn_graph.nnz > old_knn_graph.nnz
or self.metric != self.datalab.get_info("statistics").get("knn_metric", None)
)
if prefer_new_graph:
statistics_dict["statistics"][graph_key] = knn_graph
if self.metric is not None:
statistics_dict["statistics"]["knn_metric"] = self.metric
return statistics_dict
def _knn_shapley_score(knn_graph: csr_matrix, labels: np.ndarray, k: int) -> np.ndarray:
"""Compute the Shapley values of data points based on a knn graph."""
N = labels.shape[0]
scores = np.zeros((N, N))
dist = knn_graph.indices.reshape(N, -1)
for y, s, dist_i in zip(labels, scores, dist):
idx = dist_i[::-1]
ans = labels[idx]
s[idx[k - 1]] = float(ans[k - 1] == y)
ans_matches = (ans == y).flatten()
for j in range(k - 2, -1, -1):
s[idx[j]] = s[idx[j + 1]] + float(int(ans_matches[j]) - int(ans_matches[j + 1]))
return 0.5 * (np.mean(scores / k, axis=0) + 1)