From 9440f4eecd961133cd2351d208c19cb2f4cbbcc7 Mon Sep 17 00:00:00 2001 From: Alexis Bruneteau Date: Wed, 1 Oct 2025 20:28:06 +0200 Subject: [PATCH] Implement multi-task learning pipeline for CSGO predictions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Created comprehensive multi-objective modeling system: **6 Prediction Tasks:** 1. Match Winner (Binary Classification) - Who wins the match? 2. Map Winner (Binary Classification) - Who wins this specific map? 3. Team 1 Score (Regression) - Predict exact round score for team 1 4. Team 2 Score (Regression) - Predict exact round score for team 2 5. Round Difference (Regression) - Predict score margin 6. Total Maps (Regression) - Predict number of maps in match **Implementation:** - Updated preprocessing to generate all target variables - Created train_multitask.py with separate models per task - Classification tasks use Random Forest Classifier - Regression tasks use Random Forest Regressor - All models logged to MLflow experiment 'csgo-match-prediction-multitask' - Metrics tracked per task (accuracy/precision for classification, MAE/RMSE for regression) - Updated DVC pipeline to use new training script **No Data Leakage:** - All features are pre-match only (rankings, map, starting side) - Target variables properly separated and saved with 'target_' prefix This enables comprehensive match analysis and multiple betting/analytics use cases. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- dvc.yaml | 6 +- src/data/preprocess.py | 81 ++++++++--- src/models/train_multitask.py | 260 ++++++++++++++++++++++++++++++++++ 3 files changed, 324 insertions(+), 23 deletions(-) create mode 100644 src/models/train_multitask.py diff --git a/dvc.yaml b/dvc.yaml index a66a999..520eab1 100644 --- a/dvc.yaml +++ b/dvc.yaml @@ -16,9 +16,9 @@ stages: cache: false train: - cmd: python src/models/train.py + cmd: python src/models/train_multitask.py deps: - - src/models/train.py + - src/models/train_multitask.py - data/processed/train.csv - data/processed/test.csv params: @@ -26,7 +26,7 @@ stages: - train.max_depth - train.random_state outs: - - models/model.pkl + - models/ metrics: - models/metrics.json: cache: false diff --git a/src/data/preprocess.py b/src/data/preprocess.py index 0d3f745..85b5f5a 100644 --- a/src/data/preprocess.py +++ b/src/data/preprocess.py @@ -43,10 +43,28 @@ def engineer_features(df): features['both_top_tier'] = ((features['rank_1'] <= 10) & (features['rank_2'] <= 10)).astype(int) features['underdog_matchup'] = (abs(features['rank_diff']) > 50).astype(int) - # Target: match_winner (1 or 2) -> convert to 0 or 1 - target = df['match_winner'] - 1 + # Multi-task targets + targets = {} - return features, target + # Task 1: Match Winner (Binary Classification) + targets['match_winner'] = df['match_winner'] - 1 # Convert 1/2 to 0/1 + + # Task 2: Exact Score (Regression - two outputs) + targets['score_team1'] = df['result_1'] + targets['score_team2'] = df['result_2'] + + # Task 3: Round Difference (Regression) + targets['round_diff'] = df['result_1'] - df['result_2'] + + # Task 4: Map Count for the match (Multi-class) + # Group by match_id to get total maps played + match_maps = df.groupby('match_id').size().to_dict() + targets['total_maps'] = df['match_id'].map(match_maps) + + # Task 5: Map Winner (Binary Classification for this specific map) + targets['map_winner'] = df['map_winner'] - 1 # Convert 1/2 to 0/1 + + return features, targets def save_metrics(X_train, X_test, y_train, y_test): """Save dataset metrics""" @@ -72,44 +90,67 @@ def main(): print("Loading raw data...") df = load_raw_data() - print(f"Loaded {len(df)} matches") + print(f"Loaded {len(df)} maps") print("Engineering features...") - X, y = engineer_features(df) + X, targets = engineer_features(df) print(f"Created {X.shape[1]} features") + print(f"Created {len(targets)} prediction targets:") + for target_name in targets.keys(): + print(f" - {target_name}") print("Splitting data...") - X_train, X_test, y_train, y_test = train_test_split( - X, y, + # Use match_winner for stratification + X_train, X_test, idx_train, idx_test = train_test_split( + X, X.index, test_size=params["test_size"], random_state=params["random_state"], - stratify=y + stratify=targets['match_winner'] ) print("Saving processed data...") Path("data/processed").mkdir(parents=True, exist_ok=True) - # Save full features - full_features = X.copy() - full_features['target'] = y - full_features.to_csv("data/processed/features.csv", index=False) - - # Save train set + # Save train set with all targets train_data = X_train.copy() - train_data['target'] = y_train + for target_name, target_values in targets.items(): + train_data[f'target_{target_name}'] = target_values.iloc[idx_train].values train_data.to_csv("data/processed/train.csv", index=False) - # Save test set + # Save test set with all targets test_data = X_test.copy() - test_data['target'] = y_test + for target_name, target_values in targets.items(): + test_data[f'target_{target_name}'] = target_values.iloc[idx_test].values test_data.to_csv("data/processed/test.csv", index=False) - # Save metrics - save_metrics(X_train, X_test, y_train, y_test) + # Save full features with all targets + full_features = X.copy() + for target_name, target_values in targets.items(): + full_features[f'target_{target_name}'] = target_values.values + full_features.to_csv("data/processed/features.csv", index=False) - print("Preprocessing completed successfully!") + # Save metrics + print("\nDataset statistics:") print(f"Train set: {len(X_train)} samples") print(f"Test set: {len(X_test)} samples") + print(f"Features: {X.shape[1]}") + + metrics = { + "n_samples": len(X), + "n_train": len(X_train), + "n_test": len(X_test), + "n_features": X.shape[1], + "targets": list(targets.keys()), + "class_balance_match_winner": { + "class_0": int((targets['match_winner'] == 0).sum()), + "class_1": int((targets['match_winner'] == 1).sum()) + } + } + + with open("data/processed/data_metrics.json", "w") as f: + json.dump(metrics, f, indent=2) + + print("Preprocessing completed successfully!") if __name__ == "__main__": main() diff --git a/src/models/train_multitask.py b/src/models/train_multitask.py new file mode 100644 index 0000000..d0719f4 --- /dev/null +++ b/src/models/train_multitask.py @@ -0,0 +1,260 @@ +""" +Multi-task model training pipeline for CSGO match prediction. +Trains separate models for different prediction objectives and logs to MLflow. +""" +import mlflow +import mlflow.sklearn +import yaml +import json +import pickle +import os +from pathlib import Path +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor +from sklearn.metrics import ( + accuracy_score, precision_score, recall_score, f1_score, roc_auc_score, + mean_absolute_error, mean_squared_error, r2_score +) +import pandas as pd +import numpy as np + +# Configure MLflow +# MLflow will automatically use MLFLOW_TRACKING_USERNAME and MLFLOW_TRACKING_PASSWORD env vars +tracking_uri = os.getenv("MLFLOW_TRACKING_URI", "https://mlflow.sortifal.dev") +mlflow.set_tracking_uri(tracking_uri) + +if os.getenv("MLFLOW_TRACKING_USERNAME") and os.getenv("MLFLOW_TRACKING_PASSWORD"): + print(f"MLflow configured with authentication for {tracking_uri}") +else: + print(f"MLflow configured without authentication for {tracking_uri}") + +# Try to set experiment, but handle auth errors gracefully +USE_MLFLOW = True +try: + mlflow.set_experiment("csgo-match-prediction-multitask") + print(f"Connected to MLflow at {mlflow.get_tracking_uri()}") +except Exception as e: + print(f"Warning: Could not connect to MLflow: {e}") + print("Training will continue without MLflow tracking.") + USE_MLFLOW = False + +def load_params(): + """Load training parameters from params.yaml""" + with open("params.yaml") as f: + params = yaml.safe_load(f) + return params["train"] + +def load_data(): + """Load preprocessed training and test data""" + train_df = pd.read_csv("data/processed/train.csv") + test_df = pd.read_csv("data/processed/test.csv") + + # Separate features and targets + feature_cols = [col for col in train_df.columns if not col.startswith('target_')] + target_cols = [col for col in train_df.columns if col.startswith('target_')] + + X_train = train_df[feature_cols] + X_test = test_df[feature_cols] + + # Extract all targets + targets_train = {col.replace('target_', ''): train_df[col] for col in target_cols} + targets_test = {col.replace('target_', ''): test_df[col] for col in target_cols} + + return X_train, X_test, targets_train, targets_test + +def train_classification_model(X_train, y_train, params, task_name): + """Train a classification model""" + print(f"\n[{task_name}] Training Random Forest Classifier...") + model = RandomForestClassifier( + n_estimators=params["n_estimators"], + max_depth=params["max_depth"], + random_state=params["random_state"], + n_jobs=-1 + ) + model.fit(X_train, y_train) + return model + +def train_regression_model(X_train, y_train, params, task_name): + """Train a regression model""" + print(f"\n[{task_name}] Training Random Forest Regressor...") + model = RandomForestRegressor( + n_estimators=params["n_estimators"], + max_depth=params["max_depth"], + random_state=params["random_state"], + n_jobs=-1 + ) + model.fit(X_train, y_train) + return model + +def evaluate_classification(model, X_test, y_test, task_name): + """Evaluate classification model""" + print(f"[{task_name}] Evaluating...") + y_pred = model.predict(X_test) + y_pred_proba = model.predict_proba(X_test)[:, 1] + + metrics = { + f"{task_name}_accuracy": float(accuracy_score(y_test, y_pred)), + f"{task_name}_precision": float(precision_score(y_test, y_pred, zero_division=0)), + f"{task_name}_recall": float(recall_score(y_test, y_pred, zero_division=0)), + f"{task_name}_f1_score": float(f1_score(y_test, y_pred, zero_division=0)), + f"{task_name}_roc_auc": float(roc_auc_score(y_test, y_pred_proba)) + } + + return metrics + +def evaluate_regression(model, X_test, y_test, task_name): + """Evaluate regression model""" + print(f"[{task_name}] Evaluating...") + y_pred = model.predict(X_test) + + metrics = { + f"{task_name}_mae": float(mean_absolute_error(y_test, y_pred)), + f"{task_name}_mse": float(mean_squared_error(y_test, y_pred)), + f"{task_name}_rmse": float(np.sqrt(mean_squared_error(y_test, y_pred))), + f"{task_name}_r2": float(r2_score(y_test, y_pred)) + } + + return metrics + +def save_models(models, all_metrics): + """Save all models and metrics locally""" + Path("models").mkdir(parents=True, exist_ok=True) + + # Save each model + for task_name, model in models.items(): + model_path = f"models/model_{task_name}.pkl" + with open(model_path, "wb") as f: + pickle.dump(model, f) + print(f"Saved {task_name} model to {model_path}") + + # Save all metrics + with open("models/metrics.json", "w") as f: + json.dump(all_metrics, f, indent=2) + print(f"Metrics saved to models/metrics.json") + +def main(): + """Main multi-task training pipeline""" + print("=" * 70) + print("CSGO Match Prediction - Multi-Task Model Training") + print("=" * 70) + + # Load parameters and data + params = load_params() + X_train, X_test, targets_train, targets_test = load_data() + + print(f"\nDataset info:") + print(f" Training samples: {len(X_train)}") + print(f" Test samples: {len(X_test)}") + print(f" Features: {X_train.shape[1]}") + print(f" Prediction tasks: {len(targets_train)}") + + # Define tasks + tasks = { + 'match_winner': {'type': 'classification', 'description': 'Match Winner Prediction'}, + 'map_winner': {'type': 'classification', 'description': 'Map Winner Prediction'}, + 'score_team1': {'type': 'regression', 'description': 'Team 1 Score Prediction'}, + 'score_team2': {'type': 'regression', 'description': 'Team 2 Score Prediction'}, + 'round_diff': {'type': 'regression', 'description': 'Round Difference Prediction'}, + 'total_maps': {'type': 'regression', 'description': 'Total Maps Prediction'} + } + + models = {} + all_metrics = {} + + if USE_MLFLOW: + with mlflow.start_run(run_name="multitask-rf-csgo"): + # Log parameters + mlflow.log_params(params) + mlflow.log_param("n_features", X_train.shape[1]) + mlflow.log_param("n_train_samples", len(X_train)) + mlflow.log_param("n_test_samples", len(X_test)) + mlflow.log_param("n_tasks", len(tasks)) + + # Train and evaluate each task + for task_name, task_config in tasks.items(): + print(f"\n{'='*70}") + print(f"Task: {task_config['description']}") + print(f"{'='*70}") + + if task_name not in targets_train: + print(f"Warning: {task_name} not found in training data, skipping...") + continue + + y_train = targets_train[task_name] + y_test = targets_test[task_name] + + # Train model based on task type + if task_config['type'] == 'classification': + model = train_classification_model(X_train, y_train, params, task_name) + metrics = evaluate_classification(model, X_test, y_test, task_name) + else: + model = train_regression_model(X_train, y_train, params, task_name) + metrics = evaluate_regression(model, X_test, y_test, task_name) + + models[task_name] = model + all_metrics.update(metrics) + + # Log metrics to MLflow + mlflow.log_metrics(metrics) + + # Print results + print(f"\n{task_name} Results:") + for metric, value in metrics.items(): + print(f" {metric}: {value:.4f}") + + # Save models and metrics + save_models(models, all_metrics) + + # Print summary + print("\n" + "=" * 70) + print("Training Summary:") + print("=" * 70) + print(f"Models trained: {len(models)}") + print(f"Total metrics: {len(all_metrics)}") + print("=" * 70) + + print(f"\nMLflow run ID: {mlflow.active_run().info.run_id}") + print(f"View run at: {mlflow.get_tracking_uri()}") + else: + # Train without MLflow + for task_name, task_config in tasks.items(): + print(f"\n{'='*70}") + print(f"Task: {task_config['description']}") + print(f"{'='*70}") + + if task_name not in targets_train: + print(f"Warning: {task_name} not found in training data, skipping...") + continue + + y_train = targets_train[task_name] + y_test = targets_test[task_name] + + # Train model based on task type + if task_config['type'] == 'classification': + model = train_classification_model(X_train, y_train, params, task_name) + metrics = evaluate_classification(model, X_test, y_test, task_name) + else: + model = train_regression_model(X_train, y_train, params, task_name) + metrics = evaluate_regression(model, X_test, y_test, task_name) + + models[task_name] = model + all_metrics.update(metrics) + + # Print results + print(f"\n{task_name} Results:") + for metric, value in metrics.items(): + print(f" {metric}: {value:.4f}") + + # Save models and metrics + save_models(models, all_metrics) + + print("\n" + "=" * 70) + print("Training Summary:") + print("=" * 70) + print(f"Models trained: {len(models)}") + print(f"Total metrics: {len(all_metrics)}") + print("=" * 70) + + print("\nMulti-task training pipeline completed successfully!") + +if __name__ == "__main__": + main()