diff --git a/README.md b/README.md index 3939935..8bf0dc5 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # LinearBoost Classifier -![Lastest Release](https://img.shields.io/badge/release-v0.1.6-green) +![Latest Release](https://img.shields.io/badge/release-v0.1.7-green) [![PyPI Version](https://img.shields.io/pypi/v/linearboost)](https://pypi.org/project/linearboost/) ![Python Versions](https://img.shields.io/badge/python-3.8%20%7C%203.9%20%7C%203.10%20%7C%203.11%20%7C%203.12%20%7C%203.13-blue) [![PyPI Downloads](https://static.pepy.tech/badge/linearboost)](https://pepy.tech/projects/linearboost) @@ -32,9 +32,45 @@ Key Features: --- -## 🚀 New in Version 0.1.6 +## 🚀 New in Version 0.1.7 -The latest release introduces major architectural improvements designed for **scalability**, **robustness on imbalanced data**, and **training speed**. +### Gradient Boosting Mode + +LinearBoost now supports **gradient boosting** in addition to AdaBoost via the `boosting_type` parameter: + +- **`boosting_type='adaboost'`** (default): Classic AdaBoost (SAMME or SAMME.R) that reweights samples by classification error. +- **`boosting_type='gradient'`**: Fits each base estimator to pseudo-residuals (negative gradient of log-loss). Often better for highly non-linear or XOR-like patterns and smoother decision boundaries. + +```python +# Gradient boosting for complex non-linear patterns +clf = LinearBoostClassifier( + boosting_type='gradient', + n_estimators=200, + kernel='rbf' +) +``` + +### Class Weighting & Custom Loss + +- **`class_weight`**: Use `'balanced'` or a dict of class weights for imbalanced data. Weights are applied in the boosting loop. +- **`loss_function`**: Optional callable `(y_true, y_pred, sample_weight) -> float` for custom optimization objectives. + +```python +clf = LinearBoostClassifier( + class_weight='balanced', # Adjust for imbalanced classes + n_estimators=200 +) +``` + +### Default Algorithm + +The default **`algorithm`** is now **`'SAMME.R'`** for faster convergence and typically lower test error with fewer iterations (when using `boosting_type='adaboost'`). + +--- + +## 🚀 New in Version 0.1.5 + +Version 0.1.5 introduced major architectural improvements designed for **scalability**, **robustness on imbalanced data**, and **training speed**. ### ⚡ Scalable Kernel Approximation @@ -157,8 +193,7 @@ Version 0.1.2 of **LinearBoost Classifier** is released. Here are the changes: - Improved Scikit-learn compatibility. -Get Started and Documentation ------------------------------ +## Get Started and Documentation The documentation is available at https://linearboost.readthedocs.io/. @@ -172,13 +207,20 @@ The following parameters yielded optimal results during testing. All results are - **`learning_rate`**: Values between 0.01 and 1 typically perform well. Adjust based on the dataset's complexity and noise. -- **`algorithm`**: - Use either `SAMME` or `SAMME.R`. The choice depends on the specific problem: +- **`algorithm`** (when `boosting_type='adaboost'`): + Use either `SAMME` or `SAMME.R` (default). SAMME.R typically converges faster with lower test error. - `SAMME`: May be better for datasets with clearer separations between classes. - - `SAMME.R`: Can handle more nuanced class probabilities. + - `SAMME.R`: Uses class probabilities; often better for nuanced boundaries. **Note:** As of scikit-learn v1.6, the `algorithm` parameter is deprecated and will be removed in v1.8. LinearBoostClassifier will only implement the 'SAMME' algorithm in newer versions. +- **`boosting_type`** *(new in v0.1.7)*: + - `'adaboost'`: Classic AdaBoost (default). + - `'gradient'`: Gradient boosting on pseudo-residuals; try for highly non-linear or XOR-like data. + +- **`class_weight`** *(new in v0.1.7)*: + Use `'balanced'` for imbalanced datasets so class weights are adjusted automatically. + - **`scaler`**: The following scaling methods are recommended based on dataset characteristics: - `minmax`: Best for datasets where features are on different scales but bounded. @@ -193,25 +235,24 @@ The following parameters yielded optimal results during testing. All results are - `poly`: For polynomial relationships. - `sigmoid`: For sigmoid-like decision boundaries. -- **`kernel_approx`** *(new in v0.1.6)*: +- **`kernel_approx`** *(new in v0.1.5)*: For large datasets with non-linear kernels: - `None`: Use full kernel matrix (default, exact but \(O(n^2)\) memory). - `'rff'`: Random Fourier Features (only with `kernel='rbf'`). - `'nystrom'`: Nyström approximation (works with any kernel). -- **`subsample`** *(new in v0.1.6)*: +- **`subsample`** *(new in v0.1.5)*: Values in (0, 1] control stochastic boosting. Use `0.8` for variance reduction while maintaining speed. -- **`shrinkage`** *(new in v0.1.6)*: +- **`shrinkage`** *(new in v0.1.5)*: Values in (0, 1] scale each estimator's contribution. Use `0.8-0.95` to improve generalization. -- **`early_stopping`** *(new in v0.1.6)*: +- **`early_stopping`** *(new in v0.1.5)*: Set to `True` with `n_iter_no_change=5` and `tol=1e-4` to automatically stop training when validation performance plateaus. These parameters should serve as a solid starting point for most datasets. For fine-tuning, consider using hyperparameter optimization tools like [Optuna](https://optuna.org/). -Results -------- +## Results All of the results are reported based on 10-fold Cross-Validation. The weighted F1 score is reported, i.e. f1_score(y_valid, y_pred, average = 'weighted'). @@ -337,6 +378,8 @@ params = { 'algorithm': trial.suggest_categorical('algorithm', ['SAMME', 'SAMME.R']), 'scaler': trial.suggest_categorical('scaler', ['minmax', 'robust', 'quantile-uniform', 'quantile-normal']), 'kernel': trial.suggest_categorical('kernel', ['linear', 'rbf', 'poly']), + 'boosting_type': trial.suggest_categorical('boosting_type', ['adaboost', 'gradient']), + 'class_weight': trial.suggest_categorical('class_weight', [None, 'balanced']), 'subsample': trial.suggest_float('subsample', 0.6, 1.0), 'shrinkage': trial.suggest_float('shrinkage', 0.7, 1.0), 'early_stopping': True, @@ -353,19 +396,18 @@ LinearBoost's combination of **runtime efficiency** and **high accuracy** makes *Discusses how LinearBoost outperforms traditional boosting frameworks in terms of speed while maintaining accuracy.* -Future Developments ------------------------------ +## Future Developments + These are not yet supported in this current version, but are in the future plans: - Supporting categorical variables natively - Adding regression support (`LinearBoostRegressor`) - Multi-output classification -Reference Paper ------------------------------ +## Reference Paper + The paper is written by Hamidreza Keshavarz (Independent Researcher based in Berlin, Germany) and Reza Rawassizadeh (Department of Computer Science, Metropolitan college, Boston University, United States). It will be available soon. -License -------- +## License This project is licensed under the terms of the MIT license. See [LICENSE](https://github.com/LinearBoost/linearboost-classifier/blob/main/LICENSE) for additional details. diff --git a/pyproject.toml b/pyproject.toml index 8adb6a0..e2638b1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,6 @@ authors = [ ] description = "LinearBoost Classifier is a rapid and accurate classification algorithm that builds upon a very fast, linear classifier." readme = "README.md" -readme-content-type = "text/markdown" keywords = [ "classification", "classifier", "linear", "adaboost", "boosting", "boost" ] diff --git a/src/linearboost/__init__.py b/src/linearboost/__init__.py index 344dff5..6c2c0a8 100644 --- a/src/linearboost/__init__.py +++ b/src/linearboost/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.1.6" +__version__ = "0.1.7" from .linear_boost import LinearBoostClassifier from .sefr import SEFR diff --git a/src/linearboost/linear_boost.py b/src/linearboost/linear_boost.py index cb6d190..4088de8 100644 --- a/src/linearboost/linear_boost.py +++ b/src/linearboost/linear_boost.py @@ -42,8 +42,7 @@ from sklearn.utils.multiclass import check_classification_targets, type_of_target from sklearn.utils.validation import check_is_fitted from sklearn.kernel_approximation import RBFSampler, Nystroem -from sklearn.model_selection import train_test_split -from sklearn.metrics import accuracy_score, f1_score, roc_auc_score +from sklearn.metrics import f1_score, roc_auc_score from ._utils import SKLEARN_V1_6_OR_LATER, check_X_y, validate_data from .sefr import SEFR @@ -307,7 +306,7 @@ class LinearBoostClassifier(_DenseAdaBoostClassifier): kernel : {'linear', 'poly', 'rbf', 'sigmoid'} or callable, default='linear' Specifies the kernel type to be used in the algorithm. If a callable is given, it is used to pre-compute the kernel matrix. - + kernel_approx : {'rff', 'nystrom'} or None, default=None Optional kernel approximation strategy for non-linear kernels. @@ -358,7 +357,7 @@ class LinearBoostClassifier(_DenseAdaBoostClassifier): early_stopping : bool, default=False Whether to use early stopping to terminate training when validation score is not improving. If True, it requires ``n_iter_no_change`` to be set. - + If ``subsample < 1.0`` (subsampling is enabled), Out-of-Bag (OOB) evaluation is automatically used instead of a fixed validation split. This is more data-efficient as it uses all training data while still providing validation @@ -394,13 +393,30 @@ class LinearBoostClassifier(_DenseAdaBoostClassifier): each base learner, helping to prevent overfitting and improve generalization. This is similar to the shrinkage used in gradient boosting methods. - + - If `shrinkage = 1.0`: no shrinkage (full weight) - If `shrinkage < 1.0`: apply shrinkage (e.g., 0.8 means 80% weight) - + Values must be in the range `(0, 1]`. Typical values are in the range `[0.8, 1.0]` for moderate regularization or `1.0` for no regularization. + boosting_type : {'adaboost', 'gradient'}, default='adaboost' + The type of boosting algorithm to use: + + - 'adaboost': Use the AdaBoost algorithm (SAMME or SAMME.R) which + reweights samples based on classification errors. This is the + original LinearBoost approach. + - 'gradient': Use gradient boosting which fits each new estimator + to the pseudo-residuals (negative gradient of log-loss). This can + be more effective for complex non-linear patterns and provides + smoother decision boundaries. + + When ``boosting_type='gradient'``: + - The ``algorithm`` parameter is ignored + - Each estimator predicts pseudo-residuals instead of class labels + - The ensemble prediction is the sum of estimator predictions + - Better suited for XOR-like and highly non-linear patterns + Attributes ---------- estimator_ : estimator @@ -450,6 +466,14 @@ class LinearBoostClassifier(_DenseAdaBoostClassifier): The precomputed kernel matrix on training data, stored when kernel != 'linear'. + F_ : ndarray of shape (n_samples,) + The raw prediction scores (log-odds) from gradient boosting. + Only present when ``boosting_type='gradient'``. + + init_score_ : float + The initial score (log-odds of class prior) for gradient boosting. + Only present when ``boosting_type='gradient'``. + Notes ----- This classifier only supports binary classification tasks. @@ -493,6 +517,7 @@ class LinearBoostClassifier(_DenseAdaBoostClassifier): "tol": [Interval(Real, 0, None, closed="left")], "subsample": [Interval(Real, 0, 1, closed="right")], "shrinkage": [Interval(Real, 0, 1, closed="right")], + "boosting_type": [StrOptions({"adaboost", "gradient"})], } def __init__( @@ -516,6 +541,7 @@ def __init__( tol=1e-4, subsample=1.0, shrinkage=1.0, + boosting_type="adaboost", ): self.algorithm = algorithm self.scaler = scaler @@ -533,6 +559,7 @@ def __init__( self.tol = tol self.subsample = subsample self.shrinkage = shrinkage + self.boosting_type = boosting_type # Decide how SEFR sees the input: # - If we use a kernel approximation, the base estimator should work @@ -568,6 +595,7 @@ def __init__( self.tol = tol self.subsample = subsample self.shrinkage = shrinkage + self.boosting_type = boosting_type if SKLEARN_V1_6_OR_LATER: @@ -648,6 +676,7 @@ def _get_kernel_matrix(self, X, Y=None): degree=self.degree, coef0=self.coef0, ) + def _use_kernel_approx(self) -> bool: """Return True if we should use kernel approximation.""" return self.kernel != "linear" and self.kernel_approx is not None @@ -775,14 +804,19 @@ def fit(self, X, y, sample_weight=None) -> Self: validation_data = None y_val = None training_data_val = None - X_val_transformed = None # Store original features for validation (needed for exact kernels) + X_val_transformed = ( + None # Store original features for validation (needed for exact kernels) + ) use_oob = False # Flag to use OOB evaluation instead of fixed validation split - + # Use OOB evaluation if subsampling is enabled and early stopping is requested - if (self.early_stopping and self.n_iter_no_change is not None and - self.subsample < 1.0): + if ( + self.early_stopping + and self.n_iter_no_change is not None + and self.subsample < 1.0 + ): # Check if we can use OOB (skip for exact kernels) - is_exact_kernel = (not self._use_kernel_approx() and self.kernel != "linear") + is_exact_kernel = not self._use_kernel_approx() and self.kernel != "linear" if not is_exact_kernel: use_oob = True # Store full data for OOB evaluation @@ -792,7 +826,7 @@ def fit(self, X, y, sample_weight=None) -> Self: validation_data = (X_transformed, y, sample_weight) else: validation_data = (training_data, y, sample_weight) - + if self.early_stopping and self.n_iter_no_change is not None and not use_oob: # Split BEFORE kernel computation for exact kernels # For exact kernels, we need to split X_transformed, not the kernel matrix @@ -800,33 +834,32 @@ def fit(self, X, y, sample_weight=None) -> Self: # For exact kernels, split the original features n_samples = X_transformed.shape[0] n_val_samples = max(1, int(self.validation_fraction * n_samples)) - + from sklearn.model_selection import StratifiedShuffleSplit + splitter = StratifiedShuffleSplit( - n_splits=1, - test_size=n_val_samples, - random_state=42 + n_splits=1, test_size=n_val_samples, random_state=42 ) train_idx, val_idx = next(splitter.split(X_transformed, y)) - + # Split original features X_train_transformed = X_transformed[train_idx] X_val_transformed = X_transformed[val_idx] y_train = y[train_idx] y_val = y[val_idx] - + # Recompute kernel matrix for training only self.X_fit_ = X_train_transformed self.K_train_ = self._get_kernel_matrix(X_train_transformed) training_data = self.K_train_ - + # Split sample weights if provided if sample_weight is not None: sample_weight_val = sample_weight[val_idx] sample_weight = sample_weight[train_idx] else: sample_weight_val = None - + # Store validation data (original features for kernel computation) validation_data = (X_val_transformed, y_val, sample_weight_val) y = y_train @@ -834,28 +867,27 @@ def fit(self, X, y, sample_weight=None) -> Self: # For linear or approximate kernels, split after transformation n_samples = training_data.shape[0] n_val_samples = max(1, int(self.validation_fraction * n_samples)) - + from sklearn.model_selection import StratifiedShuffleSplit + splitter = StratifiedShuffleSplit( - n_splits=1, - test_size=n_val_samples, - random_state=42 + n_splits=1, test_size=n_val_samples, random_state=42 ) train_idx, val_idx = next(splitter.split(training_data, y)) - + # Split training data training_data_val = training_data[val_idx] y_val = y[val_idx] training_data = training_data[train_idx] y_train = y[train_idx] - + # Split sample weights if provided if sample_weight is not None: sample_weight_val = sample_weight[val_idx] sample_weight = sample_weight[train_idx] else: sample_weight_val = None - + # Store validation data for checking validation_data = (training_data_val, y_val, sample_weight_val) y = y_train @@ -874,17 +906,39 @@ def fit(self, X, y, sample_weight=None) -> Self: category=FutureWarning, message=".*parameter 'algorithm' is deprecated.*", ) - + + # Use gradient boosting if specified + if self.boosting_type == "gradient": + return self._fit_gradient_boosting( + training_data, + y_train, + sample_weight, + validation_data, + use_oob=use_oob, + ) + # If early stopping is enabled, use custom boosting loop - if self.early_stopping and self.n_iter_no_change is not None and validation_data is not None: - return self._fit_with_early_stopping(training_data, y_train, sample_weight, validation_data, use_oob=use_oob) - else: - # Pass the precomputed kernel matrix (or raw features for linear) - return super().fit(training_data, y_train, sample_weight) + if ( + self.early_stopping + and self.n_iter_no_change is not None + and validation_data is not None + ): + return self._fit_with_early_stopping( + training_data, + y_train, + sample_weight, + validation_data, + use_oob=use_oob, + ) - def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob=False): + # Pass the precomputed kernel matrix (or raw features for linear) + return super().fit(training_data, y_train, sample_weight) + + def _fit_with_early_stopping( + self, X, y, sample_weight, validation_data, use_oob=False + ): """Fit with early stopping based on validation error or OOB evaluation. - + Parameters ---------- X : array-like @@ -907,46 +961,49 @@ def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob else: # Traditional validation split X_val, y_val, sample_weight_val = validation_data - + # Initialize from parent class from sklearn.utils import check_random_state - + # Initialize attributes needed for boosting # Ensure estimator_ is set (needed by _make_estimator) - if not hasattr(self, 'estimator_') or self.estimator_ is None: + if not hasattr(self, "estimator_") or self.estimator_ is None: # Reuse the same logic from __init__ to create base estimator try: if self.kernel_approx is not None or self.kernel == "linear": from .sefr import SEFR + self.estimator_ = SEFR(kernel="linear") else: from .sefr import SEFR + self.estimator_ = SEFR(kernel="precomputed") except (ValueError, TypeError): from .sefr import SEFR + self.estimator_ = SEFR(kernel="linear") - + self.estimators_ = [] self.estimator_weights_ = np.zeros(self.n_estimators, dtype=np.float64) self.estimator_errors_ = np.ones(self.n_estimators, dtype=np.float64) - + # Initialize sample weights if sample_weight is None: sample_weight = np.ones(X.shape[0], dtype=np.float64) sample_weight /= sample_weight.sum() - + random_state = check_random_state(None) - + # Track best validation score and iterations without improvement best_val_score = -np.inf n_no_improvement = 0 best_n_estimators = 0 - + # For OOB, we need to store X_fit_ reference for exact kernels - if use_oob and hasattr(self, 'X_fit_') and self.X_fit_ is not None: + if use_oob and hasattr(self, "X_fit_") and self.X_fit_ is not None: # Store reference to training features for kernel computation pass # Already stored - + # Early stopping loop for iboost in range(self.n_estimators): # Perform a single boost @@ -956,7 +1013,9 @@ def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob iboost, X, y, sample_weight, random_state, return_oob_indices=True ) if len(boost_result) == 4: - sample_weight, estimator_weight, estimator_error, oob_indices = boost_result + sample_weight, estimator_weight, estimator_error, oob_indices = ( + boost_result + ) oob_indices_history.append(oob_indices) else: sample_weight, estimator_weight, estimator_error = boost_result @@ -965,34 +1024,42 @@ def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob sample_weight, estimator_weight, estimator_error = self._boost( iboost, X, y, sample_weight, random_state ) - + if sample_weight is None: break - + # Store results self.estimator_weights_[iboost] = estimator_weight self.estimator_errors_[iboost] = estimator_error - + # Evaluate on validation set or OOB samples using F1/ROC-AUC - if use_oob and len(oob_indices_history) > 0 and oob_indices_history[-1] is not None: + if ( + use_oob + and len(oob_indices_history) > 0 + and oob_indices_history[-1] is not None + ): # Use OOB samples from current iteration oob_idx = oob_indices_history[-1] if len(oob_idx) > 0: # Get OOB data (X_full is already transformed features/kernel matrix) X_oob = X_full[oob_idx] y_oob = y_full[oob_idx] - + # Get predictions and probabilities for F1/ROC-AUC val_pred = self._staged_predict_single(X_oob, iboost + 1) val_proba = self._staged_predict_proba_single(X_oob, iboost + 1) - + # Compute F1 score (primary metric) - f1_val = f1_score(y_oob, val_pred, average='weighted', zero_division=0.0) - + f1_val = f1_score( + y_oob, val_pred, average="weighted", zero_division=0.0 + ) + # Compute ROC-AUC if possible (requires probabilities) try: if val_proba is not None and val_proba.shape[1] >= 2: - roc_auc_val = roc_auc_score(y_oob, val_proba[:, 1], average='weighted') + roc_auc_val = roc_auc_score( + y_oob, val_proba[:, 1], average="weighted" + ) # Combined metric: 70% F1, 30% ROC-AUC val_score = 0.7 * f1_val + 0.3 * roc_auc_val else: @@ -1007,14 +1074,18 @@ def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob # Traditional validation split val_pred = self._staged_predict_single(X_val, iboost + 1) val_proba = self._staged_predict_proba_single(X_val, iboost + 1) - + # Compute F1 score (primary metric) - f1_val = f1_score(y_val, val_pred, average='weighted', zero_division=0.0) - + f1_val = f1_score( + y_val, val_pred, average="weighted", zero_division=0.0 + ) + # Compute ROC-AUC if possible try: if val_proba is not None and val_proba.shape[1] >= 2: - roc_auc_val = roc_auc_score(y_val, val_proba[:, 1], average='weighted') + roc_auc_val = roc_auc_score( + y_val, val_proba[:, 1], average="weighted" + ) # Combined metric: 70% F1, 30% ROC-AUC val_score = 0.7 * f1_val + 0.3 * roc_auc_val else: @@ -1022,7 +1093,7 @@ def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob except (ValueError, IndexError): # Fallback to F1 only if ROC-AUC fails val_score = f1_val - + # Check for improvement if val_score > best_val_score + self.tol: best_val_score = val_score @@ -1030,21 +1101,502 @@ def _fit_with_early_stopping(self, X, y, sample_weight, validation_data, use_oob best_n_estimators = iboost + 1 else: n_no_improvement += 1 - + # Early stopping check if n_no_improvement >= self.n_iter_no_change: # Trim estimators to best point if best_n_estimators > 0: self.estimators_ = self.estimators_[:best_n_estimators] - self.estimator_weights_ = self.estimator_weights_[:best_n_estimators] + self.estimator_weights_ = self.estimator_weights_[ + :best_n_estimators + ] self.estimator_errors_ = self.estimator_errors_[:best_n_estimators] break - + return self - + + def _fit_gradient_boosting( + self, X, y, sample_weight, validation_data=None, use_oob=False + ): + """Fit using gradient boosting instead of AdaBoost. + + Gradient boosting fits each new estimator to the pseudo-residuals + (negative gradient of the log-loss), which can handle non-linear + patterns more effectively than sample reweighting. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training data (features or kernel matrix) + y : array-like of shape (n_samples,) + Target labels (0 or 1) + sample_weight : array-like of shape (n_samples,) or None + Sample weights + validation_data : tuple or None + Validation data for early stopping + use_oob : bool + Whether to use OOB evaluation + + Returns + ------- + self : object + Fitted estimator + """ + from sklearn.utils import check_random_state + + n_samples = X.shape[0] + + # Determine if we're using exact (precomputed) kernels + # Exact kernel: non-linear kernel without approximation + self._gradient_exact_kernel = ( + not self._use_kernel_approx() and self.kernel != "linear" + ) + + # For exact kernels, X is the kernel matrix K_train_ + # We also have X_fit_ which contains the original transformed features + if self._gradient_exact_kernel: + # Store all training indices for computing prediction kernels + self._gradient_train_indices = np.arange(n_samples) + + # Initialize estimator list and weights + self.estimators_ = [] + self.estimator_weights_ = np.zeros(self.n_estimators, dtype=np.float64) + self.estimator_errors_ = np.ones(self.n_estimators, dtype=np.float64) + + # For exact kernels, store estimator training info + self._gradient_estimator_info = [] + + # Map labels to 0/1 if needed + y_binary = np.where(y == self.classes_[0], 0, 1).astype(np.float64) + + # Initialize with log-odds of class prior + pos_rate = np.clip(y_binary.mean(), 1e-10, 1 - 1e-10) + self.init_score_ = np.log(pos_rate / (1 - pos_rate)) + + # Current predictions (log-odds space) + F = np.full(n_samples, self.init_score_, dtype=np.float64) + self.F_ = F # Store for reference + + # Sample weights + if sample_weight is None: + sample_weight = np.ones(n_samples, dtype=np.float64) + sample_weight = sample_weight / sample_weight.sum() + + random_state = check_random_state(None) + + # Early stopping tracking + best_val_score = -np.inf + n_no_improvement = 0 + best_n_estimators = 0 + + # Validation data setup + if validation_data is not None: + X_val, y_val, _ = validation_data + if use_oob: + y_val_binary = np.where(y_val == self.classes_[0], 0, 1).astype( + np.float64 + ) + else: + y_val_binary = np.where(y_val == self.classes_[0], 0, 1).astype( + np.float64 + ) + + for iboost in range(self.n_estimators): + # Compute probabilities from current predictions + p = 1 / (1 + np.exp(-F)) + p = np.clip(p, 1e-10, 1 - 1e-10) + + # Compute pseudo-residuals (negative gradient of log-loss) + # For log-loss: gradient = p - y, so negative gradient = y - p + residuals = y_binary - p + + # Apply subsample if enabled + if self.subsample < 1.0: + n_subsample = max(1, int(self.subsample * n_samples)) + subsample_idx = random_state.choice( + n_samples, size=n_subsample, replace=False + ) + X_train = X[subsample_idx] + residuals_train = residuals[subsample_idx] + weights_train = sample_weight[subsample_idx] + oob_idx = np.setdiff1d(np.arange(n_samples), subsample_idx) + else: + X_train = X + residuals_train = residuals + weights_train = sample_weight + oob_idx = None + + # Convert residuals to binary labels for SEFR + # Positive residual (y > p) -> class 1 (need to increase prediction) + # Negative residual (y < p) -> class 0 (need to decrease prediction) + residual_labels = (residuals_train > 0).astype(int) + + # Use magnitude of residuals as sample weights (larger residuals = more important) + residual_weights = np.abs(residuals_train) * weights_train + residual_weights = residual_weights / (residual_weights.sum() + 1e-10) + + # Create SEFR estimator with appropriate kernel settings + # For exact kernels, we need to handle the kernel matrix properly + if self._gradient_exact_kernel: + estimator = SEFR(kernel="precomputed") + # For subsampling, extract the relevant submatrix of the kernel + if self.subsample < 1.0: + # X_train is already indexed from subsample_idx + # But for precomputed kernel, we need K[subsample_idx][:, subsample_idx] + X_train_kernel = X[np.ix_(subsample_idx, subsample_idx)] + else: + X_train_kernel = X # Full kernel matrix + else: + estimator = SEFR(kernel="linear") + X_train_kernel = X_train # Original features or approximated features + + try: + # Check if we have both classes in the residual labels + unique_labels = np.unique(residual_labels) + if len(unique_labels) < 2: + # All residuals have the same sign - use a constant prediction + # Store a dummy estimator and skip + self.estimators_.append(None) + self.estimator_weights_[iboost] = 0.0 + self._gradient_estimator_info.append(None) + continue + + estimator.fit( + X_train_kernel, residual_labels, sample_weight=residual_weights + ) + except Exception: + # If fitting fails, stop boosting + break + + # Store info about which training samples were used (for exact kernel prediction) + if self._gradient_exact_kernel: + if self.subsample < 1.0: + estimator_info = {"train_idx": subsample_idx.copy()} + else: + estimator_info = {"train_idx": np.arange(n_samples)} + else: + estimator_info = None + + # Get continuous predictions from SEFR using predict_proba + # Use the probability of class 1 (positive residual direction) + # Transform to [-1, 1] range: 2 * proba - 1 + if self._gradient_exact_kernel: + # For exact kernels, compute kernel between all training and this estimator's training + if self.subsample < 1.0: + # K_pred[i, j] = kernel(X[i], X_train[j]) where X_train are the subsampled points + K_pred = X[:, subsample_idx] # X is K_train_, get relevant columns + else: + K_pred = X # Full kernel matrix + proba = estimator.predict_proba(K_pred) + else: + proba = estimator.predict_proba(X) + h = 2 * proba[:, 1] - 1 # Maps [0, 1] to [-1, 1] + + # Line search for optimal step size (simplified Newton step) + # For log-loss, optimal step is approximately residuals / (p * (1-p)) + # We use a simplified approach with learning_rate * shrinkage + step_size = self.learning_rate * self.shrinkage + + # Update predictions + F = F + step_size * h + + # Store estimator and its info + self.estimators_.append(estimator) + self.estimator_weights_[iboost] = step_size + self._gradient_estimator_info.append(estimator_info) + + # Compute training error (log-loss) + p_new = 1 / (1 + np.exp(-F)) + p_new = np.clip(p_new, 1e-10, 1 - 1e-10) + train_loss = -np.mean( + y_binary * np.log(p_new) + (1 - y_binary) * np.log(1 - p_new) + ) + self.estimator_errors_[iboost] = train_loss + + # Early stopping check + if self.early_stopping and validation_data is not None: + if use_oob and oob_idx is not None and len(oob_idx) > 0: + # Use OOB samples + # For exact kernels, we need kernel between OOB samples and training + if self._gradient_exact_kernel: + # Pass the kernel submatrix for OOB samples + val_pred = self._gradient_predict_internal( + X, oob_idx, iboost + 1 + ) + val_proba = self._gradient_predict_proba_internal( + X, oob_idx, iboost + 1 + ) + else: + val_pred = self._gradient_predict(X[oob_idx], iboost + 1) + val_proba = self._gradient_predict_proba(X[oob_idx], iboost + 1) + y_oob_binary = y_binary[oob_idx] + + f1_val = f1_score( + (y_oob_binary > 0.5).astype(int), + val_pred, + average="weighted", + zero_division=0.0, + ) + try: + roc_auc_val = roc_auc_score(y_oob_binary, val_proba[:, 1]) + val_score = 0.7 * f1_val + 0.3 * roc_auc_val + except (ValueError, IndexError): + val_score = f1_val + else: + # Use validation set + val_pred = self._gradient_predict(X_val, iboost + 1) + val_proba = self._gradient_predict_proba(X_val, iboost + 1) + + f1_val = f1_score( + y_val_binary.astype(int), + val_pred, + average="weighted", + zero_division=0.0, + ) + try: + roc_auc_val = roc_auc_score(y_val_binary, val_proba[:, 1]) + val_score = 0.7 * f1_val + 0.3 * roc_auc_val + except (ValueError, IndexError): + val_score = f1_val + + if val_score > best_val_score + self.tol: + best_val_score = val_score + n_no_improvement = 0 + best_n_estimators = iboost + 1 + else: + n_no_improvement += 1 + + if ( + self.n_iter_no_change is not None + and n_no_improvement >= self.n_iter_no_change + ): + if best_n_estimators > 0: + self.estimators_ = self.estimators_[:best_n_estimators] + self.estimator_weights_ = self.estimator_weights_[ + :best_n_estimators + ] + self.estimator_errors_ = self.estimator_errors_[ + :best_n_estimators + ] + self._gradient_estimator_info = self._gradient_estimator_info[ + :best_n_estimators + ] + break + + # Trim arrays to actual number of estimators + n_fitted = len(self.estimators_) + self.estimator_weights_ = self.estimator_weights_[:n_fitted] + self.estimator_errors_ = self.estimator_errors_[:n_fitted] + self._gradient_estimator_info = self._gradient_estimator_info[:n_fitted] + + return self + + def _gradient_predict_proba_internal(self, K_train, sample_idx, n_estimators=None): + """Internal method for exact kernel prediction during training. + + Used for OOB evaluation where we have the full training kernel matrix. + + Parameters + ---------- + K_train : ndarray of shape (n_train, n_train) + Full training kernel matrix + sample_idx : ndarray + Indices of samples to predict (rows to use) + n_estimators : int or None + Number of estimators to use + + Returns + ------- + proba : ndarray of shape (len(sample_idx), 2) + """ + if n_estimators is None: + n_estimators = len(self.estimators_) + + n_samples = len(sample_idx) + F = np.full(n_samples, self.init_score_, dtype=np.float64) + + for i in range(min(n_estimators, len(self.estimators_))): + estimator = self.estimators_[i] + if estimator is None: + continue + + info = self._gradient_estimator_info[i] + if info is not None: + # Exact kernel: get K[sample_idx, train_idx] + train_idx = info["train_idx"] + K_pred = K_train[np.ix_(sample_idx, train_idx)] + else: + K_pred = K_train[sample_idx] + + proba_est = estimator.predict_proba(K_pred) + h = 2 * proba_est[:, 1] - 1 + F = F + self.estimator_weights_[i] * h + + p = 1 / (1 + np.exp(-F)) + p = np.clip(p, 1e-10, 1 - 1e-10) + return np.column_stack([1 - p, p]) + + def _gradient_predict_internal(self, K_train, sample_idx, n_estimators=None): + """Internal method for exact kernel prediction during training.""" + proba = self._gradient_predict_proba_internal(K_train, sample_idx, n_estimators) + return self.classes_.take(np.argmax(proba, axis=1), axis=0) + + def _gradient_predict_proba(self, X, n_estimators=None): + """Predict class probabilities using gradient boosting ensemble. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Input samples (original features - will be transformed) + n_estimators : int or None + Number of estimators to use (None = all) + + Returns + ------- + proba : ndarray of shape (n_samples, 2) + Class probabilities + """ + if n_estimators is None: + n_estimators = len(self.estimators_) + + n_samples = X.shape[0] + + # Start with initial score + F = np.full(n_samples, self.init_score_, dtype=np.float64) + + # For exact kernels, we need to compute kernel between X and training samples + # X here is the transformed (scaled) features + is_exact_kernel = getattr(self, "_gradient_exact_kernel", False) + + # Add contributions from estimators + for i in range(min(n_estimators, len(self.estimators_))): + estimator = self.estimators_[i] + if estimator is None: + # Skip None estimators (from failed fits or single-class residuals) + continue + + if is_exact_kernel: + # Get estimator info to know which training samples were used + info = self._gradient_estimator_info[i] + if info is not None: + train_idx = info["train_idx"] + # Compute kernel between X and the training samples used by this estimator + X_train_subset = self.X_fit_[train_idx] + else: + X_train_subset = self.X_fit_ + + # Compute kernel matrix between test and training samples + K_pred = self._compute_kernel_matrix(X, X_train_subset) + proba_est = estimator.predict_proba(K_pred) + else: + # For linear or approximated kernels, X is already the right format + proba_est = estimator.predict_proba(X) + + h = 2 * proba_est[:, 1] - 1 # Maps [0, 1] to [-1, 1] + F = F + self.estimator_weights_[i] * h + + # Convert to probabilities + p = 1 / (1 + np.exp(-F)) + p = np.clip(p, 1e-10, 1 - 1e-10) + + proba = np.column_stack([1 - p, p]) + return proba + + def _gradient_predict(self, X, n_estimators=None): + """Predict class labels using gradient boosting ensemble. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Input samples + n_estimators : int or None + Number of estimators to use (None = all) + + Returns + ------- + y_pred : ndarray of shape (n_samples,) + Predicted class labels + """ + proba = self._gradient_predict_proba(X, n_estimators) + return self.classes_.take(np.argmax(proba, axis=1), axis=0) + + def _gradient_decision_function(self, X): + """Compute decision function using gradient boosting. + + Returns the raw log-odds scores. + """ + n_samples = X.shape[0] + + # Start with initial score + F = np.full(n_samples, self.init_score_, dtype=np.float64) + + # For exact kernels, we need to compute kernel between X and training samples + is_exact_kernel = getattr(self, "_gradient_exact_kernel", False) + + # Add contributions from estimators + for i in range(len(self.estimators_)): + estimator = self.estimators_[i] + if estimator is None: + # Skip None estimators (from failed fits or single-class residuals) + continue + + if is_exact_kernel: + # Get estimator info to know which training samples were used + info = self._gradient_estimator_info[i] + if info is not None: + train_idx = info["train_idx"] + X_train_subset = self.X_fit_[train_idx] + else: + X_train_subset = self.X_fit_ + + # Compute kernel matrix between test and training samples + K_pred = self._compute_kernel_matrix(X, X_train_subset) + proba_est = estimator.predict_proba(K_pred) + else: + proba_est = estimator.predict_proba(X) + + h = 2 * proba_est[:, 1] - 1 # Maps [0, 1] to [-1, 1] + F = F + self.estimator_weights_[i] * h + + return F + + def _compute_kernel_matrix(self, X, Y=None): + """Compute kernel matrix with appropriate parameters for the kernel type. + + Parameters + ---------- + X : array-like of shape (n_samples_X, n_features) + First input + Y : array-like of shape (n_samples_Y, n_features), optional + Second input. If None, compute K(X, X). + + Returns + ------- + K : ndarray of shape (n_samples_X, n_samples_Y) + Kernel matrix + """ + gamma = self.gamma if self.gamma is not None else 1.0 / X.shape[1] + + # Build kernel parameters based on kernel type + if self.kernel == "rbf": + return pairwise_kernels(X, Y, metric="rbf", gamma=gamma) + elif self.kernel == "poly": + return pairwise_kernels( + X, Y, metric="poly", gamma=gamma, degree=self.degree, coef0=self.coef0 + ) + elif self.kernel == "sigmoid": + return pairwise_kernels( + X, Y, metric="sigmoid", gamma=gamma, coef0=self.coef0 + ) + elif self.kernel == "linear": + return pairwise_kernels(X, Y, metric="linear") + else: + # Custom or callable kernel + return pairwise_kernels(X, Y, metric=self.kernel) + def _staged_predict_single(self, X, n_estimators): """Predict using first n_estimators for validation. - + X can be either: - Transformed features (for linear/approximate kernels) - Kernel matrix (for exact kernels) @@ -1053,18 +1605,23 @@ def _staged_predict_single(self, X, n_estimators): if n_estimators == 0: # Return majority class return np.full(X.shape[0], self.classes_[0]) - + # For exact kernels, if X is original features, compute kernel matrix - if (not self._use_kernel_approx() and self.kernel != "linear" and - hasattr(self, 'X_fit_') and self.X_fit_ is not None and - X.shape[1] == self.X_fit_.shape[1] and X.shape[1] != self.X_fit_.shape[0]): + if ( + not self._use_kernel_approx() + and self.kernel != "linear" + and hasattr(self, "X_fit_") + and self.X_fit_ is not None + and X.shape[1] == self.X_fit_.shape[1] + and X.shape[1] != self.X_fit_.shape[0] + ): # X appears to be original features, compute kernel matrix X = self._get_kernel_matrix(X, self.X_fit_) - + if self.algorithm == "SAMME.R": classes = self.classes_ n_classes = len(classes) - + pred = sum( self._samme_proba(estimator, n_classes, X) for estimator in self.estimators_[:n_estimators] @@ -1082,33 +1639,33 @@ def _staged_predict_single(self, X, n_estimators): # SAMME algorithm classes = self.classes_ pred = np.zeros((X.shape[0], n_classes)) - + for i, estimator in enumerate(self.estimators_[:n_estimators]): predictions = estimator.predict(X) for j, class_label in enumerate(classes): - pred[:, j] += ( - self.estimator_weights_[i] * (predictions == class_label) + pred[:, j] += self.estimator_weights_[i] * ( + predictions == class_label ) - + decision = pred - + if self.n_classes_ == 2: return self.classes_.take((decision > 0).astype(int), axis=0) else: return self.classes_.take(np.argmax(decision, axis=1), axis=0) - + def _staged_predict_proba_single(self, X, n_estimators): """Predict probabilities using first n_estimators for validation. - + Similar to _staged_predict_single but returns probabilities instead of predictions. - + Parameters ---------- X : array-like Validation data (features or kernel matrix) n_estimators : int Number of estimators to use - + Returns ------- proba : ndarray of shape (n_samples, n_classes) @@ -1117,20 +1674,25 @@ def _staged_predict_proba_single(self, X, n_estimators): if n_estimators == 0: # Return uniform probabilities return np.ones((X.shape[0], self.n_classes_)) / self.n_classes_ - + # For exact kernels, if X is original features, compute kernel matrix - if (not self._use_kernel_approx() and self.kernel != "linear" and - hasattr(self, 'X_fit_') and self.X_fit_ is not None and - X.shape[1] == self.X_fit_.shape[1] and X.shape[1] != self.X_fit_.shape[0]): + if ( + not self._use_kernel_approx() + and self.kernel != "linear" + and hasattr(self, "X_fit_") + and self.X_fit_ is not None + and X.shape[1] == self.X_fit_.shape[1] + and X.shape[1] != self.X_fit_.shape[0] + ): # X appears to be original features, compute kernel matrix X = self._get_kernel_matrix(X, self.X_fit_) - + if self.algorithm == "SAMME.R": # Use decision function and convert to probabilities # This matches how predict_proba works in the parent class classes = self.classes_ n_classes = len(classes) - + pred = sum( self._samme_proba(estimator, n_classes, X) for estimator in self.estimators_[:n_estimators] @@ -1142,7 +1704,7 @@ def _staged_predict_proba_single(self, X, n_estimators): else: # No valid weights, return uniform return np.ones((X.shape[0], n_classes)) / n_classes - + # Convert SAMME.R output to probabilities # _samme_proba returns log-probability-like values (n_samples, n_classes) if n_classes == 2: @@ -1158,16 +1720,16 @@ def _staged_predict_proba_single(self, X, n_estimators): # Multi-class: use softmax exp_pred = np.exp(pred - np.max(pred, axis=1, keepdims=True)) proba = exp_pred / np.sum(exp_pred, axis=1, keepdims=True) - + return proba else: # SAMME algorithm: use weighted voting classes = self.classes_ n_classes = len(classes) proba = np.zeros((X.shape[0], n_classes)) - + for i, estimator in enumerate(self.estimators_[:n_estimators]): - if hasattr(estimator, 'predict_proba'): + if hasattr(estimator, "predict_proba"): estimator_proba = estimator.predict_proba(X) weight = self.estimator_weights_[i] proba += weight * estimator_proba @@ -1177,12 +1739,12 @@ def _staged_predict_proba_single(self, X, n_estimators): weight = self.estimator_weights_[i] for j, class_label in enumerate(classes): proba[:, j] += weight * (predictions == class_label) - + # Normalize proba_sum = np.sum(proba, axis=1, keepdims=True) proba_sum[proba_sum == 0] = 1.0 # Avoid division by zero proba /= proba_sum - + return proba @staticmethod @@ -1206,10 +1768,12 @@ def _samme_proba(estimator, n_classes, X): log_proba - (1.0 / n_classes) * log_proba.sum(axis=1)[:, np.newaxis] ) - def _compute_adaptive_learning_rate(self, iboost, estimator_error, base_learning_rate): + def _compute_adaptive_learning_rate( + self, iboost, estimator_error, base_learning_rate + ): """ Compute adaptive learning rate based on iteration and estimator error. - + Parameters ---------- iboost : int @@ -1218,7 +1782,7 @@ def _compute_adaptive_learning_rate(self, iboost, estimator_error, base_learning Classification error of the current estimator (0-0.5) base_learning_rate : float Base learning rate from user parameter - + Returns ------- adaptive_lr : float @@ -1227,20 +1791,22 @@ def _compute_adaptive_learning_rate(self, iboost, estimator_error, base_learning # Exponential decay: reduce learning rate as we progress # Factor starts at 1.0 and decays to ~0.7 over all iterations iteration_decay = 1.0 - (iboost / max(self.n_estimators, 1)) * 0.3 - + # Error-based adjustment: lower rate for high error estimators # High error (0.5) -> factor ~0.57, Low error (0.0) -> factor 1.0 error_factor = 1.0 / (1.0 + estimator_error * 1.5) - + # Combine factors adaptive_lr = base_learning_rate * iteration_decay * error_factor - + # Clamp to reasonable range: at least 0.01, at most base_learning_rate adaptive_lr = np.clip(adaptive_lr, 0.01, base_learning_rate) - + return adaptive_lr - def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=False): + def _boost( + self, iboost, X, y, sample_weight, random_state, return_oob_indices=False + ): """ Implement a single boost using precomputed kernel matrix or raw features. @@ -1254,31 +1820,36 @@ def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=F """ estimator = self._make_estimator(random_state=random_state) oob_indices = None - + # Apply subsampling if enabled # Note: For exact kernels (precomputed kernel matrices), subsampling is skipped # because it would require tracking subsample indices per estimator for correct prediction - is_exact_kernel = (X.shape[0] == X.shape[1] and X.shape[0] == y.shape[0] and - not self._use_kernel_approx() and self.kernel != "linear") - + is_exact_kernel = ( + X.shape[0] == X.shape[1] + and X.shape[0] == y.shape[0] + and not self._use_kernel_approx() + and self.kernel != "linear" + ) + if self.subsample < 1.0 and not is_exact_kernel: n_samples = X.shape[0] n_subsample = max(1, int(self.subsample * n_samples)) - + # Use stratified sampling to maintain class distribution from sklearn.model_selection import StratifiedShuffleSplit + splitter = StratifiedShuffleSplit( n_splits=1, train_size=n_subsample, - random_state=random_state.randint(0, 2**31 - 1) + random_state=random_state.randint(0, 2**31 - 1), ) subsample_idx, _ = next(splitter.split(X, y)) - + # Track OOB indices if requested if return_oob_indices: all_indices = np.arange(n_samples) oob_indices = np.setdiff1d(all_indices, subsample_idx) - + # Subsample data and weights (for feature matrices, subsample rows only) X_subsample = X[subsample_idx] y_subsample = y[subsample_idx] @@ -1288,9 +1859,11 @@ def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=F sample_weight_subsample /= sample_weight_subsample.sum() else: sample_weight_subsample = None - + # Fit estimator on subsampled data - estimator.fit(X_subsample, y_subsample, sample_weight=sample_weight_subsample) + estimator.fit( + X_subsample, y_subsample, sample_weight=sample_weight_subsample + ) else: # No subsampling - use all data estimator.fit(X, y, sample_weight=sample_weight) @@ -1319,19 +1892,19 @@ def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=F adaptive_lr = self._compute_adaptive_learning_rate( iboost, estimator_error, self.learning_rate ) - + # Compute F1 score for this estimator to inform weight calculation # This aligns estimator weighting with F1 optimization target - f1 = f1_score(y, y_pred, sample_weight=sample_weight, average='weighted') - + f1 = f1_score(y, y_pred, sample_weight=sample_weight, average="weighted") + # F1 bonus: reward estimators with good F1 performance # Scale: 0.5 F1 -> 1.0x multiplier, 1.0 F1 -> 1.2x multiplier # This ensures estimators contributing to F1 get higher weights f1_bonus = 1.0 + (f1 - 0.5) * 0.6 - + # Compute base weight from error rate base_weight = np.log((1 - estimator_error) / max(estimator_error, 1e-10)) - + # Apply F1 bonus to estimator weight estimator_weight = self.shrinkage * adaptive_lr * base_weight * f1_bonus @@ -1340,18 +1913,22 @@ def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=F # This gives higher weight boosts to minority class samples when misclassified unique_classes, class_counts = np.unique(y, return_counts=True) class_freq = class_counts / len(y) - class_weights = {cls: 1.0 / freq for cls, freq in zip(unique_classes, class_freq)} - + class_weights = { + cls: 1.0 / freq for cls, freq in zip(unique_classes, class_freq) + } + # Apply class-aware weight updates (minority class gets higher boost) for cls in unique_classes: cls_mask = y == cls cls_weight = class_weights[cls] # Inverse frequency weighting sample_weight[cls_mask] = np.exp( np.log(sample_weight[cls_mask] + 1e-10) - + estimator_weight * incorrect[cls_mask] * cls_weight + + estimator_weight + * incorrect[cls_mask] + * cls_weight * (sample_weight[cls_mask] > 0) ) - + # Normalize to prevent numerical issues sample_weight /= np.sum(sample_weight) @@ -1383,19 +1960,19 @@ def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=F adaptive_lr = self._compute_adaptive_learning_rate( iboost, estimator_error, self.learning_rate ) - + # Compute F1 score for this estimator to inform weight calculation # This aligns estimator weighting with F1 optimization target - f1 = f1_score(y, y_pred, sample_weight=sample_weight, average='weighted') - + f1 = f1_score(y, y_pred, sample_weight=sample_weight, average="weighted") + # F1 bonus: reward estimators with good F1 performance # Scale: 0.5 F1 -> 1.0x multiplier, 1.0 F1 -> 1.2x multiplier # This ensures estimators contributing to F1 get higher weights f1_bonus = 1.0 + (f1 - 0.5) * 0.6 - + # Compute base weight from error rate base_weight = np.log((1.0 - estimator_error) / max(estimator_error, 1e-10)) - + # Apply F1 bonus to estimator weight estimator_weight = self.shrinkage * adaptive_lr * base_weight * f1_bonus @@ -1403,8 +1980,10 @@ def _boost(self, iboost, X, y, sample_weight, random_state, return_oob_indices=F # This gives higher weight boosts to minority class samples when misclassified unique_classes, class_counts = np.unique(y, return_counts=True) class_freq = class_counts / len(y) - class_weights = {cls: 1.0 / freq for cls, freq in zip(unique_classes, class_freq)} - + class_weights = { + cls: 1.0 / freq for cls, freq in zip(unique_classes, class_freq) + } + # Apply class-aware weight updates (minority class gets higher boost) for cls in unique_classes: cls_mask = y == cls @@ -1441,7 +2020,18 @@ class in ``classes_``, respectively. check_is_fitted(self) X_transformed = self.scaler_.transform(X) - # Decide which representation to use at prediction time: + # For gradient boosting, handle kernels differently + if self.boosting_type == "gradient": + if self.kernel == "linear": + test_data = X_transformed + elif self._use_kernel_approx(): + test_data = self.kernel_approx_.transform(X_transformed) + else: + # For exact kernels, pass transformed features + test_data = X_transformed + return self._gradient_decision_function(test_data) + + # Decide which representation to use at prediction time (for AdaBoost): if self.kernel == "linear": test_data = X_transformed @@ -1498,9 +2088,64 @@ def predict(self, X): y : ndarray of shape (n_samples,) The predicted classes. """ + # For gradient boosting, use sigmoid threshold + if self.boosting_type == "gradient": + check_is_fitted(self) + X_transformed = self.scaler_.transform(X) + + # Transform data based on kernel type + if self.kernel == "linear": + test_data = X_transformed + elif self._use_kernel_approx(): + test_data = self.kernel_approx_.transform(X_transformed) + else: + # For exact kernels, pass transformed features - kernel will be computed + # inside _gradient_predict for each estimator + test_data = X_transformed + + return self._gradient_predict(test_data) + pred = self.decision_function(X) if self.n_classes_ == 2: return self.classes_.take(pred > 0, axis=0) return self.classes_.take(np.argmax(pred, axis=1), axis=0) + + def predict_proba(self, X): + """Predict class probabilities for X. + + The predicted class probabilities of an input sample is computed as + the weighted mean predicted class probabilities of the classifiers + in the ensemble. + + Parameters + ---------- + X : {array-like} of shape (n_samples, n_features) + The training input samples. + + Returns + ------- + p : ndarray of shape (n_samples, n_classes) + The class probabilities of the input samples. The order of + outputs is the same of that of the :term:`classes_` attribute. + """ + # For gradient boosting, use sigmoid of log-odds + if self.boosting_type == "gradient": + check_is_fitted(self) + X_transformed = self.scaler_.transform(X) + + # Transform data based on kernel type + if self.kernel == "linear": + test_data = X_transformed + elif self._use_kernel_approx(): + test_data = self.kernel_approx_.transform(X_transformed) + else: + # For exact kernels, pass transformed features - kernel will be computed + # inside _gradient_predict_proba for each estimator + test_data = X_transformed + + return self._gradient_predict_proba(test_data) + + # For AdaBoost, use parent implementation + return super().predict_proba(X) diff --git a/src/linearboost/sefr.py b/src/linearboost/sefr.py index 0b4e723..1a328c1 100644 --- a/src/linearboost/sefr.py +++ b/src/linearboost/sefr.py @@ -280,6 +280,7 @@ def fit(self, X, y, sample_weight=None) -> Self: else: K = self._get_kernel_matrix(X) + # Validate sample weights pos_labels = y_ == 1 neg_labels = y_ == 0