Source code for cleanlab.models.fasttext

# Copyright (C) 2017-2023  Cleanlab Inc.
# This file is part of cleanlab.
#
# cleanlab is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# cleanlab is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with cleanlab.  If not, see <https://www.gnu.org/licenses/>.

"""
Text classification with fastText models that are compatible with cleanlab.
This module allows you to easily find label issues in your text datasets.

You must have fastText installed: ``pip install fasttext``.

Tips:

* Check out our example using this class: `fasttext_amazon_reviews <https://github.com/cleanlab/examples/blob/master/fasttext_amazon_reviews/fasttext_amazon_reviews.ipynb>`_
* Our `unit tests <https://github.com/cleanlab/cleanlab/blob/master/tests/test_frameworks.py>`_ also provide basic usage examples.

"""

import time
import os
import copy
import numpy as np
from sklearn.base import BaseEstimator
from fasttext import train_supervised, load_model


LABEL = "__label__"
NEWLINE = " __newline__ "


[docs]def data_loader( fn=None, indices=None, label=LABEL, batch_size=1000, ): """Returns a generator, yielding two lists containing [labels], [text]. Items are always returned in the order in the file, regardless if indices are provided.""" def _split_labels_and_text(batch): l, t = [list(t) for t in zip(*(z.split(" ", 1) for z in batch))] return l, t # Prepare a stack of indices if indices is not None: stack_indices = sorted(indices, reverse=True) stack_idx = stack_indices.pop() with open(fn, "r") as f: len_label = len(label) idx = 0 batch_counter = 0 prev = f.readline() batch = [] while True: try: line = f.readline() line = line if line[:len_label] == label or line == "": if indices is None or stack_idx == idx: # Write out prev line and reset prev batch.append(prev.strip().replace("\n", NEWLINE)) batch_counter += 1 if indices is not None: if len(stack_indices): stack_idx = stack_indices.pop() else: # No more data in indices, quit loading data. yield _split_labels_and_text(batch) break prev = "" idx += 1 if batch_counter == batch_size: yield _split_labels_and_text(batch) # Reset batch batch_counter = 0 batch = [] prev += line if line == "": if len(batch) > 0: yield _split_labels_and_text(batch) break except EOFError: if indices is None or stack_idx == idx: # Write out prev line and reset prev batch.append(prev.strip().replace("\n", NEWLINE)) batch_counter += 1 yield _split_labels_and_text(batch) break
[docs]class FastTextClassifier(BaseEstimator): # Inherits sklearn base classifier """Instantiate a fastText classifier that is compatible with :py:class:`CleanLearning <cleanlab.classification.CleanLearning>`. Parameters ---------- train_data_fn: str File name of the training data in the format compatible with fastText. test_data_fn: str, optional File name of the test data in the format compatible with fastText. """ def __init__( self, train_data_fn, test_data_fn=None, labels=None, tmp_dir="", label=LABEL, del_intermediate_data=True, kwargs_train_supervised={}, p_at_k=1, batch_size=1000, ): self.train_data_fn = train_data_fn self.test_data_fn = test_data_fn self.tmp_dir = tmp_dir self.label = label self.del_intermediate_data = del_intermediate_data self.kwargs_train_supervised = kwargs_train_supervised self.p_at_k = p_at_k self.batch_size = batch_size self.clf = None self.labels = labels if labels is None: # Find all class labels across the train and test set (if provided) unique_labels = set([]) for labels, _ in data_loader(fn=train_data_fn, batch_size=batch_size): unique_labels = unique_labels.union(set(labels)) if test_data_fn is not None: for labels, _ in data_loader(fn=test_data_fn, batch_size=batch_size): unique_labels = unique_labels.union(set(labels)) else: # Prepend labels with self.label token (e.g. '__label__'). unique_labels = [label + str(l) for l in labels] # Create maps: label strings <-> integers when label strings are used unique_labels = sorted(list(unique_labels)) self.label2num = dict(zip(unique_labels, range(len(unique_labels)))) self.num2label = dict((y, x) for x, y in self.label2num.items()) def _create_train_data(self, data_indices): """Returns filename of the masked fasttext data file. Items are written in the order they are in the file, regardless if indices are provided.""" # If X indexes all training data, no need to rewrite the file. if data_indices is None: self.masked_data_was_created = False return self.train_data_fn # Mask training data by data_indices else: len_label = len(LABEL) data_indices = sorted(data_indices, reverse=True) masked_fn = "fastTextClf_" + str(int(time.time())) + ".txt" open(masked_fn, "w").close() # Read in training data one line at a time with open(self.train_data_fn, "r") as rf: idx = 0 data_idx = data_indices.pop() for line in rf: # Mask by data_indices if idx == data_idx: with open(masked_fn, "a") as wf: wf.write(line.strip().replace("\n", NEWLINE) + "\n") if line[:len_label] == LABEL: if len(data_indices): data_idx = data_indices.pop() else: break # Increment data index if starts with __label__ # This enables support for text data containing '\n'. if line[:len_label] == LABEL: idx += 1 self.masked_data_was_created = True return masked_fn def _remove_masked_data(self, fn): """Deletes intermediate data files.""" if self.del_intermediate_data and self.masked_data_was_created: os.remove(fn) def __deepcopy__(self, memo): if self.clf is None: self_clf_copy = None else: fn = "tmp_{}.fasttext.model".format(int(time.time())) self.clf.save_model(fn) self_clf_copy = load_model(fn) os.remove(fn) # Store self.clf params = self.__dict__ clf = params.pop("clf") # Copy params without self.clf (it can't be copied) params_copy = copy.deepcopy(params) # Add clf back to self.clf self.clf = clf # Create copy to return clf_copy = FastTextClassifier(self.train_data_fn) params_copy["clf"] = self_clf_copy clf_copy.__dict__ = params_copy return clf_copy
[docs] def fit(self, X=None, y=None, sample_weight=None): """Trains the fast text classifier. Typical usage requires NO parameters, just clf.fit() # No params. Parameters ---------- X : iterable, e.g. list, numpy array (default None) The list of indices of the data to use. When in doubt, set as None. None defaults to range(len(data)). y : None Leave this as None. It's a filler to suit sklearns reqs. sample_weight : None Leave this as None. It's a filler to suit sklearns reqs.""" train_fn = self._create_train_data(data_indices=X) self.clf = train_supervised(train_fn, **self.kwargs_train_supervised) self._remove_masked_data(train_fn)
[docs] def predict_proba(self, X=None, train_data=True, return_labels=False): """Produces a probability matrix with examples on rows and classes on columns, where each row sums to 1 and captures the probability of the example belonging to each class.""" fn = self.train_data_fn if train_data else self.test_data_fn pred_probs_list = [] if return_labels: labels_list = [] for labels, text in data_loader(fn=fn, indices=X, batch_size=self.batch_size): pred = self.clf.predict(text=text, k=len(self.clf.get_labels())) # Get p(label = k | x) matrix of shape (N x K) of pred probs for each x pred_probs = [ [p for _, p in sorted(list(zip(*l)), key=lambda x: x[0])] for l in list(zip(*pred)) ] pred_probs_list.append(np.array(pred_probs)) if return_labels: labels_list.append(labels) pred_probs = np.concatenate(pred_probs_list, axis=0) if return_labels: gold_labels = [self.label2num[z] for l in labels_list for z in l] return (pred_probs, np.array(gold_labels)) else: return pred_probs
[docs] def predict(self, X=None, train_data=True, return_labels=False): """Predict labels of X""" fn = self.train_data_fn if train_data else self.test_data_fn pred_list = [] if return_labels: labels_list = [] for labels, text in data_loader(fn=fn, indices=X, batch_size=self.batch_size): pred = [self.label2num[z[0]] for z in self.clf.predict(text)[0]] pred_list.append(pred) if return_labels: labels_list.append(labels) pred = np.array([z for l in pred_list for z in l]) if return_labels: gold_labels = [self.label2num[z] for l in labels_list for z in l] return (pred, np.array(gold_labels)) else: return pred
[docs] def score(self, X=None, y=None, sample_weight=None, k=None): """Compute the average precision @ k (single label) of the labels predicted from X and the true labels given by y. score expects a `y` variable. In this case, `y` is the noisy labels.""" # Set the k for precision@k. # For single label: 1 if label is in top k, else 0 if k is None: k = self.p_at_k fn = self.test_data_fn pred_list = [] if y is None: labels_list = [] for labels, text in data_loader(fn=fn, indices=X, batch_size=self.batch_size): pred = self.clf.predict(text, k=k)[0] pred_list.append(pred) if y is None: labels_list.append(labels) pred = np.array([z for l in pred_list for z in l]) if y is None: y = [z for l in labels_list for z in l] else: y = [self.num2label[z] for z in y] apk = np.mean([y[i] in l for i, l in enumerate(pred)]) return apk