Source code for starboost.boosting

import abc
import collections
import copy
import itertools

import numpy as np
from scipy.special import expit as sigmoid
from sklearn import base
from sklearn import ensemble
from sklearn import preprocessing
from sklearn import tree
from sklearn import utils

from . import losses


__all__ = ['BoostingClassifier', 'BoostingRegressor']


class BaseBoosting(abc.ABC, ensemble.BaseEnsemble):
    """Implements logic common to all other boosting classes."""

    def __init__(self, loss=None, base_estimator=None, base_estimator_is_tree=False,
                 n_estimators=30, init_estimator=None, line_searcher=None, learning_rate=0.1,
                 row_sampling=1.0, col_sampling=1.0, eval_metric=None, early_stopping_rounds=None,
                 random_state=None):
        self.loss = loss
        self.base_estimator = base_estimator
        self.base_estimator_is_tree = base_estimator_is_tree
        self.n_estimators = n_estimators
        self.init_estimator = init_estimator
        self.line_searcher = line_searcher
        self.learning_rate = learning_rate
        self.row_sampling = row_sampling
        self.col_sampling = col_sampling
        self.eval_metric = eval_metric
        self.early_stopping_rounds = early_stopping_rounds
        self.random_state = random_state

    @abc.abstractmethod
    def _transform_y_pred(self, y_pred):
        pass

    @property
    @abc.abstractmethod
    def _default_loss(self):
        pass

    def fit(self, X, y, eval_set=None):
        """Fit a gradient boosting procedure to a dataset.

        Args:
            X (array-like or sparse matrix of shape (n_samples, n_features)): The training input
                samples. Sparse matrices are accepted only if they are supported by the weak model.
            y (array-like of shape (n_samples,)): The training target values (strings or integers
                in classification, real numbers in regression).
            eval_set (tuple of length 2, optional, default=None): The evaluation set is a tuple
                ``(X_val, y_val)``. It has to respect the same conventions as ``X`` and ``y``.
        Returns:
            self
        """
        # Verify the input parameters
        base_estimator = self.base_estimator
        base_estimator_is_tree = self.base_estimator_is_tree
        if base_estimator is None:
            base_estimator = tree.DecisionTreeRegressor(max_depth=1, random_state=self.random_state)
            base_estimator_is_tree = True

        if not isinstance(base_estimator, base.RegressorMixin):
            raise ValueError('base_estimator must be a RegressorMixin')

        loss = self.loss or self._default_loss

        self.init_estimator_ = base.clone(self.init_estimator or loss.default_init_estimator)

        eval_metric = self.eval_metric or loss

        line_searcher = self.line_searcher
        if line_searcher is None and base_estimator_is_tree:
            line_searcher = loss.tree_line_searcher

        self._rng = utils.check_random_state(self.random_state)

        # At this point we assume the input data has been checked
        if y.ndim == 1:
            y = y.reshape(-1, 1)

        # Instantiate some training variables
        self.estimators_ = []
        self.line_searchers_ = []
        self.columns_ = []
        self.eval_scores_ = [] if eval_set else None

        # Use init_estimator for the first fit
        self.init_estimator_.fit(X, y)
        y_pred = self.init_estimator_.predict(X)

        # We keep training weak learners until we reach n_estimators or early stopping occurs
        for _ in range(self.n_estimators):

            # If row_sampling is lower than 1 then we're doing stochastic gradient descent
            rows = None
            if self.row_sampling < 1:
                n_rows = int(X.shape[0] * self.row_sampling)
                rows = self._rng.choice(X.shape[0], n_rows, replace=False)

            # If col_sampling is lower than 1 then we only use a subset of the features
            cols = None
            if self.col_sampling < 1:
                n_cols = int(X.shape[1] * self.col_sampling)
                cols = self._rng.choice(X.shape[1], n_cols, replace=False)

            # Subset X
            X_fit = X
            if rows is not None:
                X_fit = X_fit[rows, :]
            if cols is not None:
                X_fit = X_fit[:, cols]

            # Compute the gradients of the loss for the current prediction
            gradients = loss.gradient(y, self._transform_y_pred(y_pred))

            # We will train one weak model per column in y
            estimators = []
            line_searchers = []

            for i, gradient in enumerate(gradients.T):

                # Fit a weak learner to the negative gradient of the loss function
                estimator = base.clone(base_estimator)
                estimator = estimator.fit(X_fit, -gradient if rows is None else -gradient[rows])
                estimators.append(estimator)

                # Estimate the descent direction using the weak learner
                direction = estimator.predict(X if cols is None else X[:, cols])

                # Apply line search if a line searcher has been provided
                if line_searcher:
                    ls = copy.copy(line_searcher)
                    ls = ls.fit(y[:, i], y_pred[:, i], gradient, direction)
                    line_searchers.append(ls)
                    direction = ls.update(direction)

                # Move the predictions along the estimated descent direction
                y_pred[:, i] += self.learning_rate * direction

            # Store the estimator and the step
            self.estimators_.append(estimators)
            if line_searchers:
                self.line_searchers_.append(line_searchers)
            self.columns_.append(cols)

            # We're now at the end of a round so we can evaluate the model on the validation set
            if not eval_set:
                continue
            X_val, y_val = eval_set
            self.eval_scores_.append(eval_metric(y_val, self.predict(X_val)))

            # Check for early stopping
            if self.early_stopping_rounds and len(self.eval_scores_) > self.early_stopping_rounds:
                if self.eval_scores_[-1] > self.eval_scores_[-(self.early_stopping_rounds + 1)]:
                    break

        return self

    def iter_predict(self, X, include_init=False):
        """Returns the predictions for ``X`` at every stage of the boosting procedure.

        Args:
            X (array-like or sparse matrix of shape (n_samples, n_features): The input samples.
                Sparse matrices are accepted only if they are supported by the weak model.
            include_init (bool, default=False): If ``True`` then the prediction from
                ``init_estimator`` will also be returned.
        Returns:
            iterator of arrays of shape (n_samples,) containing the predicted values at each stage
        """
        utils.validation.check_is_fitted(self, 'init_estimator_')
        X = utils.check_array(X, accept_sparse=['csr', 'csc'], dtype=None, force_all_finite=False)

        y_pred = self.init_estimator_.predict(X)

        # The user decides if the initial prediction should be included or not
        if include_init:
            yield y_pred

        for estimators, line_searchers, cols in itertools.zip_longest(self.estimators_,
                                                                      self.line_searchers_,
                                                                      self.columns_):

            for i, (estimator, line_searcher) in enumerate(itertools.zip_longest(estimators,
                                                                                 line_searchers or [])):

                # If we used column sampling then we have to make sure the columns of X are arranged
                # in the correct order
                if cols is None:
                    direction = estimator.predict(X)
                else:
                    direction = estimator.predict(X[:, cols])

                if line_searcher:
                    direction = line_searcher.update(direction)

                y_pred[:, i] += self.learning_rate * direction

            yield y_pred

    def predict(self, X):
        """Returns the predictions for ``X``.

        Under the hood this method simply goes through the outputs of ``iter_predict`` and returns
        the final one.

        Arguments:
            X (array-like or sparse matrix of shape (n_samples, n_features)): The input samples.
                Sparse matrices are accepted only if they are supported by the weak model.

        Returns:
            array of shape (n_samples,) containing the predicted values.
        """
        return collections.deque(self.iter_predict(X), maxlen=1).pop()


[docs]class BoostingRegressor(BaseBoosting, base.RegressorMixin): """Gradient boosting for regression. Arguments: loss (starboost.losses.Loss, default=starboost.loss.L2Loss) The loss function that will be optimized. At every stage a weak learner will be fit to the negative gradient of the loss. The provided value must be a class that at the very least implements a ``__call__`` method and a ``gradient`` method. base_estimator (sklearn.base.RegressorMixin, default=None): The weak learner. This must be a regression model. If `None` then a decision stump will be used. base_estimator_is_tree (bool, default=False): Indicates if the provided ``base_estimator`` is a tree model or not. Various boosting optimizations specific to trees can be made to improve the overall performance. n_estimators (int, default=30): The maximum number of weak learners to train. The final number of trained weak learners will be lower than ``n_estimators`` if early stopping happens. init_estimator (sklearn.base.BaseEstimator, default=None): The estimator used to make the initial guess. If ``None`` then the ``init_estimator`` property from the ``loss`` will be used. line_searcher (starboost.line_searchers.LineSearcher, default=None): A line searcher which can be used to find the optimal step size during gradient descent. If you've set ``base_estimator_is_tree`` to ``True`` and are using one of StarBoost's losses then an optimal line searcher will be used, meaning you safely set this field to ``None``. learning_rate (float, default=0.1): The learning rate shrinks the contribution of each tree. Specifically the descent direction estimated by each weak learner will be multiplied by ``learning_rate``. There is a trade-off between learning_rate and ``n_estimators``. row_sampling (float, default=1.0): The ratio of rows to sample at each stage. col_sampling (float, default=1.0): The ratio of columns to sample at each stage. eval_metric (function, default=None): The evaluation metric used to check for early stopping. If ``None`` it will default to ``loss``. random_state (int, RandomState instance or None, default=None): If int, ``random_state`` is the seed used by the random number generator; if ``RandomState`` instance, ``random_state`` is the random number generator; if ``None``, the random number generator is the ``RandomState`` instance used by ``np.random``. """ def _transform_y_pred(self, y_pred): return y_pred @property def _default_loss(self): return losses.L2Loss()
[docs] def fit(self, X, y, eval_set=None): X, y = utils.check_X_y(X, y, ['csr', 'csc'], dtype=None, force_all_finite=False) return super().fit(X=X, y=y, eval_set=eval_set)
[docs] def iter_predict(self, X, include_init=False): for y_pred in super().iter_predict(X, include_init=include_init): yield y_pred[:, 0]
def softmax(x): """Can be replaced once scipy 1.3 is released, although numeric stability should be checked.""" e_x = np.exp(x - np.max(x)) return e_x / e_x.sum(axis=1)[:, None]
[docs]class BoostingClassifier(BaseBoosting, base.ClassifierMixin): """Gradient boosting for regression. Arguments: loss (starboost.losses.Loss, default=starboost.loss.L2Loss) The loss function that will be optimized. At every stage a weak learner will be fit to the negative gradient of the loss. The provided value must be a class that at the very least implements a ``__call__`` method and a ``gradient`` method. base_estimator (sklearn.base.RegressorMixin, default=None): The weak learner. This must be a regression model, even though the task is classification. If `None` then a decision stump will be used. base_estimator_is_tree (bool, default=False): Indicates if the provided ``base_estimator`` is a tree model or not. Various boosting optimizations specific to trees can be made to improve the overall performance. n_estimators (int, default=30): The maximum number of weak learners to train. The final number of trained weak learners will be lower than ``n_estimators`` if early stopping happens. init_estimator (sklearn.base.BaseEstimator, default=None): The estimator used to make the initial guess. If ``None`` then the ``init_estimator`` property from the ``loss`` will be used. line_searcher (starboost.line_searchers.LineSearcher, default=None): A line searcher which can be used to find the optimal step size during gradient descent. If you've set ``base_estimator_is_tree`` to ``True`` and are using one of StarBoost's losses then an optimal line searcher will be used, meaning you safely set this field to ``None``. learning_rate (float, default=0.1): The learning rate shrinks the contribution of each tree. Specifically the descent direction estimated by each weak learner will be multiplied by ``learning_rate``. There is a trade-off between learning_rate and ``n_estimators``. row_sampling (float, default=1.0): The ratio of rows to sample at each stage. col_sampling (float, default=1.0): The ratio of columns to sample at each stage. eval_metric (function, default=None): The evaluation metric used to check for early stopping. If ``None`` it will default to ``loss``. random_state (int, RandomState instance or None, default=None): If int, ``random_state`` is the seed used by the random number generator; if ``RandomState`` instance, ``random_state`` is the random number generator; if ``None``, the random number generator is the ``RandomState`` instance used by ``np.random``. """ def _transform_y_pred(self, y_pred): if len(self.classes_) > 2: return softmax(y_pred) return sigmoid(y_pred) @property def _default_loss(self): return losses.LogLoss()
[docs] def fit(self, X, y, eval_set=None): # Check the inputs X, y = utils.check_X_y( X, y, ['csr', 'csc'], dtype=None, force_all_finite=False, multi_output=True ) utils.multiclass.check_classification_targets(y) # We first encode the labels into integers starting from 0 self.encoder_ = preprocessing.LabelEncoder().fit(y) y = self.encoder_.transform(y) self.classes_ = self.encoder_.classes_ # Next we one-hot encode the labels self.binarizer_ = preprocessing.LabelBinarizer(sparse_output=False).fit(y) y = self.binarizer_.transform(y) return super().fit(X=X, y=y, eval_set=eval_set)
[docs] def iter_predict_proba(self, X, include_init=False): """Returns the predicted probabilities for ``X`` at every stage of the boosting procedure. Arguments: X (array-like or sparse matrix of shape (n_samples, n_features)): The input samples. Sparse matrices are accepted only if they are supported by the weak model. include_init (bool, default=False): If ``True`` then the prediction from ``init_estimator`` will also be returned. Returns: iterator of arrays of shape (n_samples, n_classes) containing the predicted probabilities at each stage """ utils.validation.check_is_fitted(self, 'init_estimator_') X = utils.check_array(X, accept_sparse=['csr', 'csc'], dtype=None, force_all_finite=False) probas = np.empty(shape=(len(X), len(self.classes_)), dtype=np.float64) for y_pred in super().iter_predict(X, include_init=include_init): if len(self.classes_) == 2: probas[:, 1] = sigmoid(y_pred[:, 0]) probas[:, 0] = 1. - probas[:, 1] else: probas[:] = softmax(y_pred) yield probas
[docs] def iter_predict(self, X, include_init=False): """Returns the predicted classes for ``X`` at every stage of the boosting procedure. Arguments: X (array-like or sparse matrix of shape (n_samples, n_features)): The input samples. Sparse matrices are accepted only if they are supported by the weak model. include_init (bool, default=False): If ``True`` then the prediction from ``init_estimator`` will also be returned. Returns: iterator of arrays of shape (n_samples, n_classes) containing the predicted classes at each stage. """ for probas in self.iter_predict_proba(X, include_init=include_init): yield self.encoder_.inverse_transform(np.argmax(probas, axis=1))
[docs] def predict_proba(self, X): """Returns the predicted probabilities for ``X``. Arguments: X (array-like or sparse matrix of shape (n_samples, n_features)): The input samples. Sparse matrices are accepted only if they are supported by the weak model. Returns: array of shape (n_samples, n_classes) containing the predicted probabilities. """ return collections.deque(self.iter_predict_proba(X), maxlen=1).pop()