Source code for scparadise.sceve

import warnings
warnings.filterwarnings("ignore")

import numpy as np
import anndata as ad
import muon as mu
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, Subset
from pytorch_tabnet.tab_model import TabNetRegressor
from scipy.sparse import csr_matrix
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, root_mean_squared_log_error
from tqdm import tqdm
import os
import json
import fsspec
import optuna
from sklearn.model_selection import KFold, StratifiedKFold

# load scParadise dust module
import scparadise.dust as dust 


# Dataset for MuData object
class scRNAseqDataset_multi(Dataset):
    def __init__(
        self,
        mdata,
        first_modality_name = 'rna',
        second_modality_name = 'prot',
        first_layer = None,
        second_layer = None
    ):
        """
        Function for converting MuData objects with 2 modalities to a scEve model compatible format.
        
        Parameters
        ----------
        mdata : MuData
            MuData object.
        first_modality_name: str (default: 'rna')
            Name of first modality in MuData object.
        second_modality_name: str (default: 'prot')
            Name of second modality in MuData object.
        first_layer: str (default: None)
            If specified, use mdata.mod[first_modality_name].layers[first_layer] for expression values instead of mdata.mod[first_modality_name].X.
        second_layer: str (default: None)
            If specified, use mdata.mod[second_modality_name].layers[second_layer] for expression values instead of mdata.mod[second_modality_name].X.

        Internal abbreviations:
            n_first_features: number of features in first modality
            n_second_features: number of features in second modality
            first_feature_names: feature names of first modality
            second_feature_names: feature names of second modality
            obs_names: barcode (cell) names
        """
        self.mdata = mdata
        self.first_modality_name = first_modality_name
        self.second_modality_name = second_modality_name

        if first_modality_name not in mdata.mod:
            raise ValueError(f"Modality '{first_modality_name}' not found in mdata")
        if second_modality_name not in mdata.mod:
            raise ValueError(f"Modality '{second_modality_name}' not found in mdata")

        # First modality processing
        adata_first = mdata.mod[first_modality_name]
        if first_layer is not None and first_layer in adata_first.layers:
            X_first = adata_first.layers[first_layer]
        else:
            X_first = adata_first.X
        if hasattr(X_first, 'toarray'):
            X_first = X_first.toarray()

        X_first = np.maximum(X_first, 0)
        X_first_norm = (X_first - X_first.min(0)) / (np.ptp(X_first, axis=0) + 1e-10)
        self.X_first = torch.FloatTensor(X_first_norm)

        # Second modality processing
        adata_second = mdata.mod[second_modality_name]
        if second_layer is not None and second_layer in adata_second.layers:
            X_second = adata_second.layers[second_layer]
        else:
            X_second = adata_second.X
        if hasattr(X_second, 'toarray'):
            X_second = X_second.toarray()

        # Without min–max: 
        # The model learns to predict exactly what is in the second modality
        X_second = np.maximum(X_second, 0)
        self.X_second = torch.FloatTensor(X_second)

        # Check for NaN and infinity values
        assert not torch.isnan(self.X_first).any(), "NaN in first modality"
        assert not torch.isinf(self.X_first).any(), "Inf in first modality"
        assert not torch.isnan(self.X_second).any(), "NaN in second modality"
        assert not torch.isinf(self.X_second).any(), "Inf in second modality"

        self.n_first_features = self.X_first.shape[1]
        self.n_second_features = self.X_second.shape[1]
        self.first_feature_names = adata_first.var_names.tolist()
        self.second_feature_names = adata_second.var_names.tolist()
        self.obs_names = mdata.obs_names.tolist()

    def __len__(self):
        return self.X_first.shape[0]

    def __getitem__(self, idx):
        x = self.X_first[idx]
        y = self.X_second[idx]
        return x, y


# scEve regressor model
class scEveRegressor(nn.Module):
    def __init__(
        self,
        input_dim,
        output_dim,
        hd = 256,
        dropout = 0.1
    ):
        """
        scEve regressor model for predicting one modality from another modality.
        
        Parameters
        ----------
        input_dim: int 
            Number nodes in first layer. Number of features in first modality.
        output_dim: int
            Number nodes in last layer. Number of features in second modality.
        hd: int (default: 256)
            Number of nodes in each layer. hd = hidden dim
        dropout: float (default: 0.1)
            Portion of neurons that temporarily ignored during training (prevents overfitting).
        
        """
        super().__init__()
        self.regressor = nn.Sequential(
            nn.Linear(input_dim, hd),
            nn.LayerNorm(hd),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hd, hd // 2),
            nn.LayerNorm(hd // 2),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hd // 2, output_dim)
        )

    def forward(self, x):
        return self.regressor(x)

        
# scEve transformer model
class scEveTransformer(nn.Module):
    def __init__(
        self,
        n_first_features,
        n_second_features,
        ed=256,
        nc=16,
        nb=5,
        nh=8,
        ff_hd = 512,
        regressor_hd = 256,
        dropout = 0.1
    ):
        """
        scEve transformer for solving regression problems with multimodal scRNA-seq data imputation.
        Includes Transformer model and scEve regressor.
        
        Parameters
        ----------
        n_first_features: int
            Number of features (e.g., genes) in first modality.
        n_second_features: int 
            Number of features (e.g., proteins) in second modality.
        ed: int (default: 256)
            Embedding dimensionality.
        nb: int (default 5)
            Number of blocks in scEve model.
        nc: int (default: 16)
            Number of chunks for genes from adata.
        nh: int (default: 5)
            Number of heads in scEve model attention mechanism.
        ff_hd: int (default: 512)
            Number of nodes in each scEve model layer in feed forward network.
        regressor_hd: int (default: 256)
            Number of nodes in scEve regressor.
        dropout: float (default: 0.3)
            Portion of neurons that temporarily ignored during training (prevents overfitting).
            
        """
        super().__init__()
        self.n_first_features = n_first_features
        self.n_second_features = n_second_features
        self.ed = ed
        self.nc = nc
        self.nb = nb
        self.nh = nh
        self.ff_hd = ff_hd
        self.regressor_hd = regressor_hd
        self.dropout = dropout

        # Gene embedding with chunking
        self.feature_embedding = dust.Embedding(n_first_features, ed, nc)

        # Transformer blocks
        self.blocks = nn.ModuleList([
            dust.RibBlock(ed, nh, ff_hd, dropout)
            for _ in range(nb)
        ])

        # Regressor
        self.regressor = scEveRegressor(
            ed,
            n_second_features,
            regressor_hd,
            dropout
        )
       
        # Layer norm
        self.norm = nn.LayerNorm(ed)

    def forward(self, x, return_attention=False):
        # Gene embedding
        x = self.feature_embedding(x)

        # Transformer blocks
        attention_weights = []
        for block in self.blocks:
            x, attn = block(x)
            if return_attention:
                attention_weights.append(attn)

        # Global average pooling over chunks
        x = self.norm(x)
        x = x.mean(dim=1)

        # Regression
        output = self.regressor(x)
        
        if return_attention:
            return output, attention_weights
        return output


# Save scEve model function
def save_model(model, path, model_name, verbose = True):
    """
    Function for scEve model saving.
    Saves model in a folder 'model_name'.
    """
    save_dict = {
        'state_dict': model.state_dict(),
        'n_first_features': model.n_first_features,
        'n_second_features': model.n_second_features,
        'ed': model.ed,
        'nc': model.nc,
        'nb': model.nb,
        'nh': model.nh,
        'ff_hd': model.ff_hd,
        'regressor_hd': model.regressor_hd,
        'dropout': model.dropout,
        'first_modality_name': model.first_modality_name,
        'second_modality_name': model.second_modality_name,
        'first_feature_names': model.first_feature_names,
        'second_feature_names': model.second_feature_names,
        'history': model.history
    }
    # Create folder to save model
    os.makedirs(os.path.join(path, model_name).replace("\\","/"), exist_ok = True)
    # Save scEve model
    torch.save(save_dict, os.path.join(path, model_name, 'model_v2.pth'))

    if verbose:
        print(f"Model saved to {os.path.join(path, model_name)}")


# Load scEve model function
def load_model(path, device = 'auto', verbose = True):
    """
    Function for scEve model loading.
    Loads model from a folder 'path'.
    """
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if device=='auto' else device

    # Load scEve models checkpoint
    checkpoint = torch.load(os.path.join(path, 'model_v2.pth'), map_location=device, weights_only = False)

    # Create scEve model
    model = scEveTransformer(
        n_first_features=checkpoint['n_first_features'],
        n_second_features=checkpoint['n_second_features'],
        ed=checkpoint['ed'], 
        nc=checkpoint['nc'],
        nb=checkpoint['nb'],
        nh=checkpoint['nh'],
        ff_hd=checkpoint['ff_hd'],
        regressor_hd=checkpoint['regressor_hd'],
        dropout=checkpoint['dropout']
    )
    # Load state to scEve model 
    model.load_state_dict(checkpoint['state_dict'])
    model = model.to(device)
    model.eval()

    # Restore metadata
    for key in ['first_modality_name', 'second_modality_name',
                'first_feature_names', 'second_feature_names', 'history']:
        if key in checkpoint:
            setattr(model, key, checkpoint[key])

    if verbose:
        print(f"scEve model loaded from {path}")
    
    return model

# Available metric functions
def compute_rmse(y_true, y_pred): # Root mean squared error
    return np.sqrt(mean_squared_error(y_true, y_pred))
def compute_mse(y_true, y_pred): # Mean squared error
    return mean_squared_error(y_true, y_pred)
def compute_mae(y_true, y_pred): # Mean absolute error
    return mean_absolute_error(y_true, y_pred)
def compute_rmsle(y_true, y_pred): # Root mean squared log error
    return root_mean_squared_log_error(y_true, y_pred)

# Function for training scEve model
[docs] def train( mdata, first_modality_name, second_modality_name, first_layer = None, second_layer = None, detailed_annotation = None, path = '', model_name = 'scEve_model', test_size = 0.2, epochs = 200, eval_metric = ['rmse'], batch_size = 128, patience = 10, use_augmentation = True, aug_probability = 0.5, prob = 0.15, noise_std = 0.1, dropout_aug = 0.1, alpha = 0.2, nc = 4, nb = 4, nh = 8, ed_nh_ratio = 32, ff_hd = 512, regressor_hd = 512, dropout = 0.1, lr = 1e-4, weight_decay = 1e-4, device = 'auto', random_state = 0, verbose = True, return_model = False ): """ Train custom scEve model using MuData object with different modalities. Parameters ---------- mdata : MuData MuData object. path: str, path object Path to create a model folder containing the training history, cell annotation dictionary, and genes used for training. model_name: str (default: 'model_annotation') Name of a folder to save model. first_modality_name: str (default: 'rna') Name of first modality in MuData object. second_modality_name: str (default: 'prot') Name of second modality in MuData object. first_layer: str (default: None) If specified, use mdata.mod[first_modality_name].layers[first_layer] for expression values instead of mdata.mod[first_modality_name].X. second_layer: str (default: None) If specified, use mdata.mod[second_modality_name].layers[second_layer] for expression values instead of mdata.mod[second_modality_name].X. detailed_annotation : str, (default: None) The most detailed level of cell annotation. Key in mdata.obs dataframe. If given may increase model evaluation score. test_size: float or int (default: 0.2) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test cells. epochs: int (default: 200) Maximum number of epochs for scEve model training eval_metric: list (default: ['rmse']) The metric is used as the target and for early stopping. The last metric is used as the target and for early stopping. Available metrics: 'mse', 'mae', 'rmse', 'rmsle'. batch_size: int, (default: 128) Number of examples per batch. patience: int (default: 10) Number of consecutive epochs without improvement before performing early stopping. If patience is set to 0, then no early stopping will be performed. Note that if patience is enabled, then best weights from best epoch will automatically be loaded at the end of the training. use_augmentation: bool (default: True) Use data augmentation or not. aug_probability: float (default: 0.5) The probability of applying augmentation to a batch prob: float (default: 0.15) Gene masking probability. noise_std: float (default: 0.1) Gaussian noise standard deviation. dropout_aug: float (default: 0.1) Dropout probability for simulating technical noise. alpha: float (default: 0.2) Alpha parameter for mixup augmentation. nc: int (default: 4)f Number of chunks for genes from adata. nb: int (default 4) Number of blocks in scEve model. nh: int (default: 8) Number of heads in scEve model attention mechanism. ed_nh_ratio: int (default: 32) Used for calculating embedding dimensionality ('ed') from 'nh'. Default ed = nh * ed_nh_ratio = 8 * 32 = 256. ff_hd: int (default: 512) Number of nodes in each scEve model layer in feed forward network. regressor_hd: int (default: 256) Number of nodes in scEve regressor. dropout: float (default: 0.3) Portion of neurons that temporarily ignored during training (prevents overfitting). lr: float (default: 1e-4) Determines the step size at each iteration while moving toward a minimum of a loss function. weight_decay: float (default: 1e-4) Weight decay coefficient. device: str (default: 'auto') Type of device to use in training model ('cpu', 'cuda'). Set 'auto' for automatic selection. random_state: int (default: 0) Controls the data shuffling, splitting to folds and model training. Pass an int for reproducible output across multiple function calls. verbose: bool (default: True) Show progress bar for each epoch during training. return_model: bool (default: False) Return model after training or not. """ # Check 'ed' - 'nh' compatibility ed = nh * ed_nh_ratio if ed % nh != 0: raise ValueError(f"Incompatible parameters: 'ed' must be divisible by 'nh' without a remainder.") # Set random state (for reproducibility) np.random.seed(random_state) torch.manual_seed(random_state) if torch.cuda.is_available(): torch.cuda.manual_seed_all(random_state) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # Device selection device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if device=='auto' else device if verbose: print(f"Device: {device}") print(f"Task: Predict '{second_modality_name}' from '{first_modality_name}'") # Create dataset dataset = scRNAseqDataset_multi( mdata, first_modality_name=first_modality_name, second_modality_name=second_modality_name, first_layer=first_layer, second_layer=second_layer ) # Number of features in modalities n_first_features = dataset.n_first_features n_second_features = dataset.n_second_features # Train/val split indices = np.arange(len(dataset)) if detailed_annotation != None: train_idx, val_idx = train_test_split( indices, test_size=test_size, random_state=random_state, stratify = mdata.obs[detailed_annotation] ) else: train_idx, val_idx = train_test_split( indices, test_size=test_size, random_state=random_state ) if verbose: print(f"Number of first modality features: {n_first_features}") print(f"Number of second modality features: {n_second_features}") print(f"\nDataset split:") print(f'Train dataset contains: {len(train_idx)} cells, it is {round(100*(len(train_idx)/(len(train_idx) + len(val_idx))), ndigits=2)} % of input dataset') print(f'Validation dataset contains: {len(val_idx)} cells, it is {round(100*(len(val_idx)/(len(train_idx) + len(val_idx))), ndigits=2)} % of input dataset') # Create dataloaders # Train loader train_loader = DataLoader( Subset(dataset, train_idx), batch_size=batch_size, shuffle=True, num_workers=0, drop_last=False ) # Validation loader val_loader = DataLoader( Subset(dataset, val_idx), batch_size=batch_size, shuffle=False, num_workers=0, drop_last=False ) # Initialize model model = scEveTransformer( n_first_features=n_first_features, n_second_features=n_second_features, ed=ed, nc=nc, nb=nb, nh=nh, ff_hd=ff_hd, regressor_hd=regressor_hd, dropout=dropout ).to(device) # Loss function criterion = nn.MSELoss() # Optimizer optimizer = torch.optim.AdamW( model.parameters(), lr=lr, betas=(0.9, 0.95), weight_decay=weight_decay ) # Learning rate scheduler def lr_lambda(epoch): return 0.5 * (1.0 + np.cos(np.pi * (epoch / epochs))) scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda) # Early stopping early_stopping = dust.EarlyStopping( patience=patience, delta=1e-4, mode='min', # Min due to error metrics verbose=verbose ) # Augmentation augmenter = None if use_augmentation: augmenter = dust.Augmentation( prob = prob, noise_std = noise_std, dropout_prob = dropout_aug, alpha = alpha ) available_metric_functions = { 'rmse': compute_rmse, 'mse': compute_mse, 'mae': compute_mae, 'rmsle': compute_rmsle } # Create dictionary of used for training metric functions metric_functions = {} for metric in eval_metric: if metric not in available_metric_functions: raise ValueError(f"Unknown metric: {metric}") else: metric_functions[metric] = available_metric_functions[metric] # Create training history dictionary history = { 'train_loss': [], 'val_loss': [], 'lr': [] } for metric_name in metric_functions.keys(): history['train_' + metric_name] = [] history['val_' + metric_name] = [] # Training Loop for epoch in tqdm(range(epochs), desc='Training scEve model', colour='blue', disable = not verbose): # Model training model.train() train_loss = 0.0 train_preds = [] train_targets = [] for batch_x, batch_y in train_loader: batch_x = batch_x.to(device) batch_y = batch_y.to(device) # Augmantation block if use_augmentation and np.random.rand() < aug_probability: if np.random.rand() < 0.5: # Apply augmentations: masking_augmentation, noise_augmentation, dropout_augmentation batch_x = augmenter(batch_x) else: # mix cells - generates a new cell using 2 cells if batch_x.size(0) > 1: perm = torch.randperm(batch_x.size(0)) batch_x_mix, batch_y_mix = augmenter.mix_cells( batch_x, batch_x[perm], batch_y, batch_y[perm] ) batch_x, batch_y = batch_x_mix, batch_y_mix # Get predictions using model predictions = model(batch_x) # Calculate model loss loss = criterion(predictions, batch_y) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() train_loss += loss.item() with torch.no_grad(): train_preds.append(predictions.detach().cpu().numpy()) train_targets.append(batch_y.detach().cpu().numpy()) train_loss /= len(train_loader) history['train_loss'].append(train_loss) # Concatenate all train predictions and observed data train_preds = np.concatenate(train_preds) train_targets = np.concatenate(train_targets) # Calculate train metrics for metric_name, metric_func in metric_functions.items(): score = metric_func(train_targets, train_preds) history[f'train_{metric_name}'].append(score) # Model validation model.eval() val_loss = 0.0 val_preds = [] val_targets = [] with torch.no_grad(): for batch_x, batch_y in val_loader: batch_x = batch_x.to(device) batch_y = batch_y.to(device) # Get predictions using model predictions = model(batch_x) # Calculate model loss loss = criterion(predictions, batch_y) val_loss += loss.item() val_preds.append(predictions.cpu().numpy()) val_targets.append(batch_y.cpu().numpy()) val_loss /= len(val_loader) history['val_loss'].append(val_loss) # Concatenate all validation predictions and observed data val_preds = np.concatenate(val_preds) val_targets = np.concatenate(val_targets) # Calculate validation metrics for metric_name, metric_func in metric_functions.items(): score = metric_func(val_targets, val_preds) history[f'val_{metric_name}'].append(score) current_lr = optimizer.param_groups[0]['lr'] history['lr'].append(current_lr) # Early stopping of model training es_score = history[f'val_{eval_metric[-1]}'][-1] if early_stopping(es_score, model): break scheduler.step() # Load best model state early_stopping.load_bm(model) model.eval() # Add elements to model model.first_modality_name = first_modality_name model.second_modality_name = second_modality_name model.first_feature_names = dataset.first_feature_names model.second_feature_names = dataset.second_feature_names model.history = history if verbose: print("Training completed!") # Save trained model save_model(model, path, model_name, verbose) if return_model: return model
# Function for imputation modality using trained model
[docs] def predict( adata, path_model, used_mod_name = 'auto', imputed_mod_name = 'auto', layer = None, batch_size = 256, device = 'auto', return_mdata = False, verbose = True ): """ Predict (impute) the second modality in cells using the pretrained scEve model. Parameters ---------- adata: AnnData Annotated data matrix. path_model: str, path object Path to the folder containing the trained scEve model. used_mod_name: str (default: 'auto') Used for imputation modality name. If the value is "auto", then the name is selected from the model. imputed_mod_name: str (default: 'auto') Imputed modality name. If the value is "auto", then the name is selected from the model layer: str (default: None) If specified, use adata.layers[layer] for expression values instead of adata.X. batch_size: int, (default: 256) Number of examples per batch. Used only by version 2 scEve models. device: str (default: 'auto') Type of device to use in training model ('cpu', 'cuda'). Set 'auto' for automatic selection. return_mdata: bool (default: True) If set 'True' return MuData object. If set 'False' return AnnData object with predicted (imputed) modality. verbose: bool (default: True) Show progress bar for each batch during prediction. Returns ------- MuData or AnnData with imputed modality. """ # Load scEve model if os.path.exists(os.path.join(path_model, 'model_v2.pth')): if verbose: print('Feature prediction using the 2nd version of the scEve model') model = load_model(path_model, device = 'auto', verbose=verbose) # Device selection device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if device=='auto' else device model = model.to(device) model.eval() if not hasattr(model, 'first_feature_names') or model.first_feature_names is None: raise ValueError("Model does not have feature names. Train the model first.") # Get data if layer is not None and layer in adata.layers: X = adata.layers[layer] else: X = adata.X # Convert sparse to dense if needed if hasattr(X, 'toarray'): X = X.toarray() # Get feature names from new data new_feature_names = adata.var_names.tolist() model_feature_names = model.first_feature_names # Align genes: create matrix with genes in the same order as training n_cells = X.shape[0] n_model_features = len(model_feature_names) X_aligned = np.zeros((n_cells, n_model_features), dtype=np.float32) # Find intersection and assign values matched_features = 0 name_to_idx = {g: i for i, g in enumerate(new_feature_names)} for i, feature in enumerate(model_feature_names): if feature in name_to_idx: feature_idx = name_to_idx[feature] X_aligned[:, i] = X[:, feature_idx] matched_features += 1 if verbose: print("Feature alignment:") print(f"Model features: {n_model_features}") print(f"New data features: {len(new_feature_names)}") print(f"Matched features: {matched_features} ({100*matched_features/n_model_features:.1f}%)") # Raise warning if matched genes is lower than 80% from used for training if matched_features < 0.8 * n_model_features: warnings.warn( f"Only {matched_features}/{n_model_features} features matched!" "This may lead to poor predictions." ) # Normalize the same way as during training X_aligned = np.maximum(X_aligned, 0) X_norm = (X_aligned - X_aligned.min(0)) / (np.ptp(X_aligned, axis=0) + 1e-10) # Check for NaN/Inf if np.isnan(X_norm).any() or np.isinf(X_norm).any(): warnings.warn("NaN or Inf found in normalized data. Replacing with zeros.") X_norm = np.nan_to_num(X_norm, nan=0.0, posinf=0.0, neginf=0.0) X_tensor = torch.FloatTensor(X_norm) # Create dataloader dataset = torch.utils.data.TensorDataset(X_tensor) loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) # Predict all_predictions = [] with torch.no_grad(): for (batch_x,) in tqdm(loader, desc='Predicting', colour='blue', disable = not verbose): batch_x = batch_x.to(device) preds = model(batch_x) all_predictions.append(preds.cpu().numpy()) # Concatenate all predictions all_predictions = np.concatenate(all_predictions) # Create anndata object using imputed modality adata_predicted = ad.AnnData( X = csr_matrix(all_predictions, dtype = np.float32), obs = adata.obs.copy(), var = pd.DataFrame(index = model.second_feature_names) ) adata_predicted.var_names = adata_predicted.var_names + '_pred' if return_mdata: # Create mdata object using imputed modality and used modality from adata if used_mod_name == 'auto': used_mod_name = model.first_modality_name if imputed_mod_name == 'auto': imputed_mod_name = model.second_modality_name mdata = mu.MuData({used_mod_name: adata, imputed_mod_name: adata_predicted}) return mdata else: return adata_predicted else: if verbose: print('Feature prediction using the 1st version of the scEve model') # load features of trained model features = pd.read_csv(os.path.join(path_model, 'genes.csv')) features = list(features['feature_name']) if verbose: print('The list of features used to train the model has been loaded successfully') print() # Create dataset for imputation data_genes = adata.raw.var_names.tolist() data_predict = pd.DataFrame(adata.raw.X.toarray(), columns = data_genes) sorted_val_dataset = pd.DataFrame(index = [i for i in range(0, len(adata.obs_names))]) for column in features: if column in data_genes: sorted_val_dataset[column] = data_predict[column] else: sorted_val_dataset[column] = 0 # Load dictionary of trained cell types proteins = pd.read_csv(os.path.join(path_model, 'proteins.csv')) proteins = list(proteins['feature_name']) proteins = [s + '_pred' for s in proteins] if verbose: print('The list of predicted features has been loaded successfully') print() # Load pretrained model loaded_model = TabNetRegressor() for file in os.listdir(path_model): if file.endswith('.zip'): loaded_model.load_model(os.path.join(path_model, file)) if verbose: print('Model has been loaded successfully') print() # Impute modality predictions = loaded_model.predict(sorted_val_dataset.values) # Create DataFrame with imputed modality predictions = pd.DataFrame(predictions, columns = proteins) # Create anndata object using imputed modality adata_predicted = ad.AnnData( X = csr_matrix(predictions.values, dtype = np.float32), obs = adata.obs, var = pd.DataFrame(index = predictions.columns) ) if return_mdata == True: # Create mdata object using imputed modality and used modality from adata if used_mod_name == 'auto': used_mod_name = 'used_mod' if imputed_mod_name == 'auto': imputed_mod_name = 'imputed_mod' mdata = mu.MuData({used_mod_name: adata, imputed_mod_name: adata_predicted}) return mdata else: return adata_predicted
# Function to get dictionary with default tuning parameters and their ranges def get_default_tune_params(): """ Get default ranges of tuned hyperparameters. For integer parameters ('nc', 'nb', 'nh', 'ed_nh_ratio', 'ff_hd', 'regressor_hd', 'batch_size', 'patience', 'epochs') a list is used where the first value is the minimum, the second is the maximum, and the third is the step. For float parameters ('dropout', 'lr', 'weight_decay', 'aug_probability', 'prob', 'noise_std', 'dropout_aug', 'alpha') a list is used where the first value is the minimum, the second is the maximum. For categorical parameters ('use_augmentation') a list [True, False] is used. ed_nh_ratio - Used for "ed" calculation: ed = nh * ed_nh_ratio. """ return { "nc": [2, 16, 2], "nb": [1, 8, 1], "nh": [2, 16, 2], "ed_nh_ratio": [8, 32, 4], "ff_hd": [128, 1024, 128], "regressor_hd": [128, 1024, 128], "dropout": [0.0, 0.2], "lr": [1e-5, 1e-2], "weight_decay": [1e-6, 1e-2], "batch_size": [64, 2048, 64], "patience": [5, 30, 5], "epochs": [50, 200, 5], "use_augmentation": [True, False], "aug_probability": [0.1, 1.0], "prob": [0.05, 0.4], "noise_std": [0.0, 0.4], "dropout_aug": [0.0, 0.4], "alpha": [0.0, 0.4] } # Function for hyperparameters tuning
[docs] def hyperparameter_tuning( mdata, path = '', first_modality_name = "rna", second_modality_name = "prot", first_layer = None, second_layer = None, detailed_annotation = None, model_name = "scEve_model_tuning", storage = "sceve_model_tuning.db", study_name = "study", load_if_exists = True, device = "auto", tune_params = "auto", random_state = 0, num_trials = 100, n_splits = 5, epochs = None, patience = None, batch_size = None, eval_metric = "rmse", use_augmentation = None, aug_probability = None, prob = None, noise_std = None, dropout_aug = None, alpha = None, nc = None, nb = None, nh = None, ed_nh_ratio = None, ff_hd = None, regressor_hd = None, dropout = None, lr = None, weight_decay = None, verbose = True ): """ Hyperparameter tuning for scEve model with k-fold cross validation using Optuna. Notes ----- Parameters ---------- mdata : MuData MuData object. path: str, path object Path to create a model folder containing the training history, cell annotation dictionary, and genes used for training. model_name: str (default: 'model_annotation') Name of a folder to save model. first_modality_name: str (default: 'rna') Name of first modality in MuData object. second_modality_name: str (default: 'prot') Name of second modality in MuData object. first_layer: str (default: None) If specified, use mdata.mod[first_modality_name].layers[first_layer] for expression values instead of mdata.mod[first_modality_name].X. second_layer: str (default: None) If specified, use mdata.mod[second_modality_name].layers[second_layer] for expression values instead of mdata.mod[second_modality_name].X. detailed_annotation : str, (default: None) The most detailed level of cell annotation. Key in mdata.obs dataframe. If given may increase model evaluation score. storage: str (default: 'sceve_model_tuning.db') Database URL. If this argument is set to None, in-memory (RAM) storage is used, and the study will not be persistent. We don't recommend to use in-memory (RAM) storage to save optimization progress. study_name: str (default: 'study') Study’s name. If this argument is set to None, a unique name is generated automatically. load_if_exists: bool (default: True) Flag to control the behavior to handle a conflict of study names. In the case where a study named study_name already exists in the storage, a DuplicatedStudyError is raised if load_if_exists is set to False. Otherwise, the creation of the study is skipped, and the existing one is returned. If the value is True, allows hyperparameter tuning to continue if interrupted (keyboard interrupt, or OS update). device: str (default: 'auto') Type of device to use in training model ('cpu', 'cuda'). Set 'auto' for automatic selection. eval_metric: list (default: ['rmse']) The metric is used as the target and for early stopping. The last metric is used as the target and for early stopping. Available metrics: 'mse', 'mae', 'rmse', 'rmsle'. num_trials: int (default: 100) The number of trials to get optimized hyperparameters for model training. n_splits: int (default: 5) The number of data splits (folds) per trial. The data is divided into n_splits parts, where each part in turn is validation data, and the rest is training data. The number of folds determines the test_size. If n_splits = 5, then test_size = 0.2. If n_splits = 4, then test_size = 0.25. tune_params: dict or 'auto' (default: 'auto') Dict specifying search spaces or "auto" to use built‑in defaults. Ranges and step for scEve model and training parameters. Default tuning parameters are available using 'scparadise.sceve.get_default_tune_params'. The differences between setting parameters are available in '?scparadise.sceve.get_default_tune_params'. For a description of the parameters, see the 'scparadise.sceve.train' function. batch_size: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. epochs: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. patience: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. nc: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. nb: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. nh: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. ed_nh_ratio: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. ff_hd: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. regressor_hd: int or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. dropout: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. lr: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. weight_decay: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. use_augmentation: bool or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. aug_probability: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. prob: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. noise_std: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. dropout_aug: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. alpha: float or None (default: None) If a value is specified, then the tuning of this parameter will not be performed. random_state: int (default: 0) Controls the data shuffling, splitting to folds and model training. Pass an int for reproducible output across multiple function calls. verbose: bool (default: True) Show progress bar for each trail during hyperparameter tuning. """ # Set random state (for reproducibility) np.random.seed(random_state) torch.manual_seed(random_state) if torch.cuda.is_available(): torch.cuda.manual_seed_all(random_state) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # Device selection device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') if device=='auto' else device if verbose: print(f"Device: {device}") print(f"Task: Predict '{second_modality_name}' from '{first_modality_name}'") print(f"Start model optimization using optuna...") # Set default best_score if os.path.isfile(os.path.join(path, model_name, 'best_score.txt').replace("\\","/")): with open(os.path.join(path, model_name, 'best_score.txt').replace("\\","/")) as best_score: best_score = best_score.read() best_score = json.loads(best_score) else: best_score = 1 # Create folder to save tuning results of scEve model os.makedirs(os.path.join(path, model_name).replace("\\","/"), exist_ok = True) # Create dataset dataset = scRNAseqDataset_multi( mdata, first_modality_name=first_modality_name, second_modality_name=second_modality_name, first_layer=first_layer, second_layer=second_layer ) # Train/val split indices = np.arange(len(dataset)) y_strat = None if detailed_annotation is not None: y_strat = mdata.obs[detailed_annotation].values # Default tuning hyperparameters default_tune_params = { "nc": [2, 16, 2], "nb": [1, 8, 1], "nh": [2, 16, 2], "ed_nh_ratio": [8, 32, 4], "ff_hd": [128, 1024, 128], "regressor_hd": [128, 1024, 128], "dropout": [0.0, 0.2], "lr": [1e-5, 1e-2], "weight_decay": [1e-6, 1e-2], "batch_size": [64, 2048, 64], "patience": [5, 30, 5], "epochs": [50, 200, 5], "use_augmentation": [True, False], "aug_probability": [0.1, 1.0], "prob": [0.05, 0.4], "noise_std": [0.0, 0.4], "dropout_aug": [0.0, 0.4], "alpha": [0.0, 0.4] } if tune_params == "auto": tune_params = default_tune_params # Function for training a single fold def train_fold( params, train_idx, val_idx, epochs, device, fold_id, verbose ): # Create dataloaders train_loader = DataLoader( Subset(dataset, train_idx), batch_size=params["batch_size"], shuffle=True, num_workers=0, drop_last=False, ) val_loader = DataLoader( Subset(dataset, val_idx), batch_size=params["batch_size"], shuffle=False, num_workers=0, drop_last=False, ) # Initialize model model = scEveTransformer( n_first_features=dataset.n_first_features, n_second_features=dataset.n_second_features, ed=params["ed_nh_ratio"] * params["nh"], nc=params["nc"], nb=params["nb"], nh=params["nh"], ff_hd=params["ff_hd"], regressor_hd=params["regressor_hd"], dropout=params["dropout"], ).to(device) criterion = nn.MSELoss() optimizer = torch.optim.AdamW( model.parameters(), lr=params["lr"], betas=(0.9, 0.95), weight_decay=params["weight_decay"], ) # Same scheduler style as in train(): cosine from 1.0 -> 0.0 scaling. def lr_lambda(epoch): return 0.5 * (1.0 + np.cos(np.pi * epoch / max(params["epochs"], 1))) scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda) early_stopping = dust.EarlyStopping( patience=params["patience"], delta=1e-4, mode="min", verbose=False ) augmenter = None if params["use_augmentation"]: augmenter = dust.Augmentation( prob=params["prob"], noise_std=params["noise_std"], dropout_prob=params["dropout_aug"], alpha=params["alpha"], ) available_metric_functions = { 'rmse': compute_rmse, 'mse': compute_mse, 'mae': compute_mae, 'rmsle': compute_rmsle } best_val = None for epoch in range(params["epochs"]): model.train() for batch_x, batch_y in train_loader: batch_x = batch_x.to(device) batch_y = batch_y.to(device) # Augmantation block if use_augmentation and np.random.rand() < aug_probability: if np.random.rand() < 0.5: # Apply augmentations: masking_augmentation, noise_augmentation, dropout_augmentation batch_x = augmenter(batch_x) else: # mix cells - generates a new cell using 2 cells if batch_x.size(0) > 1: perm = torch.randperm(batch_x.size(0)) batch_x_mix, batch_y_mix = augmenter.mix_cells( batch_x, batch_x[perm], batch_y, batch_y[perm] ) batch_x, batch_y = batch_x_mix, batch_y_mix # Get predictions using model predictions = model(batch_x) # Calculate model loss loss = criterion(predictions, batch_y) optimizer.zero_grad() loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) optimizer.step() # validation model.eval() val_preds = [] val_targets = [] with torch.no_grad(): for batch_x, batch_y in val_loader: batch_x = batch_x.to(device) batch_y = batch_y.to(device) # Get predictions using model predictions = model(batch_x) val_preds.append(predictions.detach().cpu().numpy()) val_targets.append(batch_y.detach().cpu().numpy()) # Concatenate all validation predictions and observed data val_preds = np.concatenate(val_preds, axis=0) val_targets = np.concatenate(val_targets, axis=0) # Calculate validation metrics fold_score = available_metric_functions[eval_metric](val_targets, val_preds) if best_val is None or fold_score < best_val: best_val = fold_score # early stop on chosen metric if early_stopping(fold_score, model): break scheduler.step() # restore best weights if EarlyStopping supports it (it does in sceve_v2.train()). early_stopping.load_bm(model) if verbose: print(f"Fold {fold_id} finished with {eval_metric} value = {best_val:.6f}", flush=True) return float(best_val) # Function for processing integer hyperparameters def int_param(name, fixed_value, trial): if fixed_value is not None: return trial.suggest_int(name, fixed_value, fixed_value) if name in tune_params: lo, hi, step = tune_params[name][0], tune_params[name][1], tune_params[name][2] return trial.suggest_int(name, lo, hi, step=step) # Function for processing float hyperparameters def float_param(name, fixed_value, trial, log=False): if fixed_value is not None: return trial.suggest_float(name, fixed_value, fixed_value, log=log) if name in tune_params: lo, hi = tune_params[name][0], tune_params[name][1] return trial.suggest_float(name, lo, hi, log=log) # Function for processing categorial hyperparameters def categorical_param(name, fixed_value, trial): if fixed_value is not None: return trial.suggest_categorical(name, [fixed_value, fixed_value]) if name in tune_params: return trial.suggest_categorical(name, tune_params[name]) # Function to get hyperparameters in a trial def suggest_params(trial): # helper to suggest values either from tune_params or from fixed user-specified ones params = {} # Model initialization parameters params["nc"] = int_param("nc", nc, trial) params["nb"] = int_param("nb", nb, trial) params["nh"] = int_param("nh", nh, trial) params["ed_nh_ratio"] = int_param("ed_nh_ratio", ed_nh_ratio, trial) params["ff_hd"] = int_param("ff_hd", ff_hd, trial) params["regressor_hd"] = int_param("regressor_hd", regressor_hd, trial) params["dropout"] = float_param("dropout", dropout, trial, log=False) # Optimizer parameters params["lr"] = float_param("lr", lr, trial, log=True) params["weight_decay"] = float_param("weight_decay", weight_decay, trial, log=True) # Training params["batch_size"] = int_param("batch_size", batch_size, trial) params["patience"] = int_param("patience", patience, trial) params["epochs"] = int_param("epochs", epochs, trial) # Augmentation parameters params["use_augmentation"] = categorical_param("use_augmentation", use_augmentation, trial) params["aug_probability"] = float_param("aug_probability", aug_probability, trial, log=False) params["prob"] = float_param("prob", prob, trial, log=False) params["noise_std"] = float_param("noise_std", noise_std, trial, log=False) params["dropout_aug"] = float_param("dropout_aug", dropout_aug, trial, log=False) params["alpha"] = float_param("alpha", alpha, trial, log=False) return params # Function for define objective and params def objective(trial, best_score = best_score): # Get trial params params = suggest_params(trial) # N splits of data for K fold cross validation # Uses StratifiedKFold if detailed_annotation is specified if detailed_annotation is not None: skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state) split_iter = skf.split(indices, mdata.obs[detailed_annotation]) else: kf = KFold(n_splits=n_splits, shuffle=True, random_state=random_state) split_iter = kf.split(indices) fold_scores = [] for fold_id, (train_idx, val_idx) in enumerate(split_iter, start=1): score = train_fold(params, train_idx, val_idx, epochs, device, fold_id, verbose) fold_scores.append(score) # pruning hook trial.report(score, step=fold_id) if trial.should_prune(): raise optuna.TrialPruned() # Get average score between folds score = float(np.mean(fold_scores)) if score < best_score: best_score = score # Write best_params to model folder with open(os.path.join(path, model_name, 'best_params.txt').replace("\\","/"), 'w') as f: f.write(json.dumps(params)) with open(os.path.join(path, model_name, 'best_score.txt').replace("\\","/"), 'w') as f: f.write(json.dumps(best_score)) return score # Study storage storage_url = None if storage is not None: # optuna expects URL-like string or sqlite if storage.startswith("sqlite:///") or storage.startswith("postgresql://") or storage.startswith("mysql://"): storage_url = storage else: storage_url = "sqlite:///" + os.path.join(path, model_name, storage).replace("\\","/") # Create optuna study study = optuna.create_study( direction = "minimize", study_name = study_name, storage = storage_url, load_if_exists = load_if_exists, pruner = optuna.pruners.HyperbandPruner() ) # Set default parameters params_default = { "nc": 4, "nb": 4, "nh": 8, "ed_nh_ratio": 32, "ff_hd": 512, "regressor_hd": 512, "dropout": 0.1, "lr": 1e-4, "weight_decay": 1e-4, "batch_size": 128, "patience": 10, "epochs": 200, "use_augmentation": True, "aug_probability": 0.5, "prob": 0.15, "noise_std": 0.1, "dropout_aug": 0.1, "alpha": 0.2 } # Enqueue a trial which uses the default parameters if not study.trials: study.enqueue_trial(params_default) # Restart optimization trials = study.get_trials(deepcopy=False) if len(trials) > 0: last = trials[-1] if last.state in (optuna.trial.TrialState.FAIL, optuna.trial.TrialState.RUNNING): if len(last.params) > 0: if verbose: print(f"Re-enqueue last interrupted trial: Trial number {last.number} (will run as new trial).", flush=True) study.enqueue_trial(last.params) # Study optimization study.optimize(objective, n_trials=num_trials, n_jobs=1) best_params = dict(study.best_params) best_score = float(study.best_value) # Save best parameters and score with open(os.path.join(path, model_name, 'best_params.txt').replace("\\","/"), "w") as f: f.write(json.dumps(best_params)) with open(os.path.join(path, model_name, 'best_score.txt').replace("\\","/"), "w") as f: f.write(json.dumps({"best_value": best_score, "eval_metric": eval_metric})) if verbose: print(f"Best value ({eval_metric}) = {best_score}") print(f"Best params saved to: {os.path.join(path, model_name, 'best_params.txt')}") return best_params
# Function for training model using parameters tuned by scparadise.sceve.hyperparameter_tuning
[docs] def train_tuned( mdata, first_modality_name, second_modality_name, first_layer = None, second_layer = None, detailed_annotation = None, path = '', path_tuned = '', model_name = 'scEve_model_tuned', test_size = 0.2, epochs = None, eval_metric = ['rmse'], batch_size = None, patience = None, use_augmentation = None, aug_probability = None, prob = None, noise_std = None, dropout_aug = None, alpha = None, nc = None, nb = None, nh = None, ed_nh_ratio = None, ff_hd = None, regressor_hd = None, dropout = None, lr = None, weight_decay = None, device = 'auto', random_state = 0, verbose = True, return_model = False ): """ Train custom scEve model with tuned hyperparameters. The function automatically uses the configured hyperparameters. However, you can change any hyperparameter by passing it via the corresponding parameter. Parameters ---------- mdata : MuData MuData object. path: str, path object Path to create a model folder containing the training history, cell annotation dictionary, and genes used for training. path_tuned: str, path object Path to folder with tuned parameters by 'scparadise.sceve.hyperparameter_tuning' function. model_name: str (default: 'scEve_model_tuned') Name of a folder to save model. first_modality_name: str (default: 'rna') Name of first modality in MuData object. second_modality_name: str (default: 'prot') Name of second modality in MuData object. first_layer: str (default: None) If specified, use mdata.mod[first_modality_name].layers[layer] for expression values instead of mdata.mod[first_modality_name].X. second_layer: str (default: None) If specified, use mdata.mod[second_modality_name].layers[layer] for expression values instead of mdata.mod[second_modality_name].X. detailed_annotation : str, (default: None) The most detailed level of cell annotation. Key in mdata.obs dataframe. If given may increase model evaluation score. test_size: float or int (default: 0.2) If float, should be between 0.0 and 1.0 and represent the proportion of the dataset to include in the test split. If int, represents the absolute number of test cells. epochs: int (default: None) Maximum number of epochs for scEve model training. If specified, the specified value is used. eval_metric: list (default: ['rmse']) The metric is used as the target and for early stopping. The last metric is used as the target and for early stopping. Available metrics: 'mse', 'mae', 'rmse', 'rmsle'. batch_size: int, (default: None) Number of examples per batch. If specified, the specified value is used. patience: int (default: None) Number of consecutive epochs without improvement before performing early stopping. If patience is set to 0, then no early stopping will be performed. Note that if patience is enabled, then best weights from best epoch will automatically be loaded at the end of the training. If specified, the specified value is used. use_augmentation: bool (default: None) Use data augmentation or not. If specified, the specified value is used. aug_probability: float (default: None) The probability of applying augmentation to a batch. If specified, the specified value is used. prob: float (default: None) Gene masking probability. If specified, the specified value is used. noise_std: float (default: None) Gaussian noise standard deviation. If specified, the specified value is used. dropout_aug: float (default: None) Dropout probability for simulating technical noise. If specified, the specified value is used. alpha: float (default: None) Alpha parameter for mixup augmentation. If specified, the specified value is used. nc: int (default: None) Number of chunks for genes from adata. If specified, the specified value is used. nb: int (default None) Number of blocks in scEve model. If specified, the specified value is used. nh: int (default: None) Number of heads in scEve model attention mechanism. If specified, the specified value is used. ed_nh_ratio: int (default: None) Used for calculating embedding dimensionality ('ed') from 'nh'. If specified, the specified value is used. ff_hd: int (default: None) Number of nodes in each scEve model layer in feed forward network. If specified, the specified value is used. regressor_hd: int (default: None) Number of nodes in scEve regressor. If specified, the specified value is used. dropout: float (default: None) Portion of neurons that temporarily ignored during training (prevents overfitting). If specified, the specified value is used. lr: float (default: None) Determines the step size at each iteration while moving toward a minimum of a loss function. If specified, the specified value is used. weight_decay: float (default: None) Weight decay coefficient. If specified, the specified value is used. device: str (default: 'auto') Type of device to use in training model ('cpu', 'cuda'). Set 'auto' for automatic selection. random_state: int (default: 0) Controls the data shuffling, splitting to folds and model training. Pass an int for reproducible output across multiple function calls. verbose: bool (default: True) Show progress bar for each epoch during training. return_model: bool (default: False) Return model after training or not. """ # Create new directory with model and list of genes if not os.path.exists(os.path.join(path, model_name).replace("\\","/")): os.makedirs(os.path.join(path, model_name).replace("\\","/")) # load parameters for scEve model training with open(os.path.join(path_tuned, 'best_params.txt')) as params: params = params.read() params = json.loads(params) print('Successfully loaded tuned hyperparameters!') # Dictionary of given parameters for a function params_given = { 'epochs': epochs, 'batch_size': batch_size, 'patience': patience, 'use_augmentation': use_augmentation, 'aug_probability': aug_probability, 'prob': prob, 'noise_std': noise_std, 'dropout_aug': dropout_aug, 'alpha': alpha, 'nc': nc, 'nb': nb, 'nh': nh, 'ed_nh_ratio': ed_nh_ratio, 'ff_hd': ff_hd, 'regressor_hd': regressor_hd, 'dropout': dropout, 'lr': lr, 'weight_decay': weight_decay, } # Dictionary of default parameters params_default = { 'epochs': 200, 'batch_size': 128, 'patience': 10, 'use_augmentation': True, 'aug_probability': 0.5, 'prob': 0.15, 'noise_std': 0.1, 'dropout_aug': 0.1, 'alpha': 0.2, 'nc': 4, 'nb': 4, 'nh': 8, 'ed_nh_ratio': 32, 'ff_hd': 512, 'regressor_hd': 512, 'dropout': 0.1, 'lr': 1e-4, 'weight_decay': 1e-4, } # Replace param in loaded parameters with a given value for i in params_given.keys(): if params_given[i] is not None: params[i] = params_given[i] # Check params for None for i in params.keys(): if params[i] is None: params[i] = params_default[i] # Train model with loaded parameters (corrected if given) model = train( mdata = mdata, first_modality_name = first_modality_name, second_modality_name = second_modality_name, first_layer = first_layer, second_layer = second_layer, detailed_annotation = detailed_annotation, path = path, model_name = model_name, test_size = test_size, epochs = params['epochs'], eval_metric = eval_metric, batch_size = params['batch_size'], patience = params['patience'], use_augmentation = params['use_augmentation'], aug_probability = params['aug_probability'], prob = params['prob'], noise_std = params['noise_std'], dropout_aug = params['dropout_aug'], alpha = params['alpha'], nc = params['nc'], nb = params['nb'], nh = params['nh'], ed_nh_ratio = params['ed_nh_ratio'], ff_hd = params['ff_hd'], regressor_hd = params['regressor_hd'], dropout = params['dropout'], lr = params['lr'], weight_decay = params['weight_decay'], device = device, random_state = random_state, return_model = True, verbose = verbose ) if return_model: return model
# Function to display available models in github
[docs] def available_models( ): ''' Download dataframe with available trained scEve models. ''' print("WARNING: RMSE, MASE, MSE are error metrics. Lower error metric value -> Better prediction.") print('RMSE - Root Mean Squared Error') print('MSE - Mean Squared Error') print('MAE - Mean Absolute Error') print() models = pd.read_csv('https://raw.githubusercontent.com/Chechekhins/scParadise/main/sceve_available_models.csv', sep=',') return models
# Function for downloading tuned pretrained models from github
[docs] def download_model( model_name='', save_path='', github_username=None, github_token=None ): """ Download pretrained tuned model for highly accurate cell type annotation. Parameters ---------- model_name: str Name of the model from column 'model' from scparadise.sceve.available_models(). save_path: str, path object Path to save trained scEve model. github_username: str Your GitHub username. github_token: str Token for GitHub API. """ # Create new directory with model save = os.path.join(save_path, model_name + '_scEve').replace("\\", "/") os.makedirs(save, exist_ok=True) # read creds from args or env github_username = github_username or os.getenv("GITHUB_USERNAME") github_token = github_token or os.getenv("GITHUB_TOKEN") or os.getenv("GH_TOKEN") fs_kwargs = dict(org="Chechekhins", repo="scParadise") if github_username and github_token: fs_kwargs.update(username=github_username, token=github_token) fs = fsspec.filesystem("github", **fs_kwargs) # Download content of model remote_dir = os.path.join("models_sceve", model_name + "_scEve").replace("\\", "/") fs.get(fs.ls(remote_dir), save)