diff --git a/.github/workflows/mlops-pipeline.yml b/.github/workflows/mlops-pipeline.yml new file mode 100644 index 0000000..51d5583 --- /dev/null +++ b/.github/workflows/mlops-pipeline.yml @@ -0,0 +1,67 @@ +name: MLOps CI/CD Pipeline + +on: + push: + branches: [main, dev] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + + - name: Install dependencies + run: | + pip install poetry + poetry install + + - name: Run unit tests + run: poetry run pytest tests/ --cov=src --cov-report=xml + + - name: Data validation + run: poetry run python tests/test_data_quality.py + + train: + needs: test + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/main' + steps: + - uses: actions/checkout@v3 + + - name: Setup DVC + run: | + pip install dvc[s3] + dvc pull + + - name: Train model + run: poetry run python src/models/train.py + env: + MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }} + + - name: Validate model performance + run: poetry run python tests/test_model_performance.py + + deploy: + needs: train + runs-on: ubuntu-latest + if: github.ref == 'refs/heads/main' + steps: + - name: Build Docker image + run: | + docker build -t csgo-mlops:${{ github.sha }} . + docker tag csgo-mlops:${{ github.sha }} csgo-mlops:latest + + - name: Push to registry + run: | + docker push csgo-mlops:${{ github.sha }} + docker push csgo-mlops:latest + + - name: Deploy to Kubernetes + run: kubectl apply -f kubernetes/deployment.yml \ No newline at end of file diff --git a/config/grafana/dashboards/csgo-mlops-dashboard.json b/config/grafana/dashboards/csgo-mlops-dashboard.json new file mode 100644 index 0000000..eb3d47c --- /dev/null +++ b/config/grafana/dashboards/csgo-mlops-dashboard.json @@ -0,0 +1,39 @@ +{ + "dashboard": { + "title": "CS:GO MLOps Dashboard", + "panels": [ + { + "title": "Model Accuracy (7d Rolling)", + "targets": [{ + "expr": "model_accuracy" + }], + "alert": { + "conditions": [{ + "evaluator": { + "params": [0.65], + "type": "lt" + } + }] + } + }, + { + "title": "Prediction Latency P95", + "targets": [{ + "expr": "histogram_quantile(0.95, prediction_latency)" + }] + }, + { + "title": "Data Volume (Daily Matches)", + "targets": [{ + "expr": "daily_match_count" + }] + }, + { + "title": "ROI (30d)", + "targets": [{ + "expr": "betting_roi_30d" + }] + } + ] + } +} \ No newline at end of file diff --git a/config/grafana/dashboards/provider.yml b/config/grafana/dashboards/provider.yml new file mode 100644 index 0000000..92ce183 --- /dev/null +++ b/config/grafana/dashboards/provider.yml @@ -0,0 +1,10 @@ +apiVersion: 1 + +providers: + - name: 'MLOps Dashboards' + type: file + disableDeletion: false + updateIntervalSeconds: 10 + allowUiUpdates: true + options: + path: /etc/grafana/provisioning/dashboards \ No newline at end of file diff --git a/config/grafana/datasources/prometheus.yml b/config/grafana/datasources/prometheus.yml new file mode 100644 index 0000000..8049912 --- /dev/null +++ b/config/grafana/datasources/prometheus.yml @@ -0,0 +1,8 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true \ No newline at end of file diff --git a/config/prometheus.yml b/config/prometheus.yml new file mode 100644 index 0000000..617ebdb --- /dev/null +++ b/config/prometheus.yml @@ -0,0 +1,11 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: 'csgo-api' + static_configs: + - targets: ['localhost:8000'] + + - job_name: 'model-metrics' + static_configs: + - targets: ['localhost:8001'] \ No newline at end of file diff --git a/dags/csgo_data_pipeline.py b/dags/csgo_data_pipeline.py new file mode 100644 index 0000000..6aff7d4 --- /dev/null +++ b/dags/csgo_data_pipeline.py @@ -0,0 +1,59 @@ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime, timedelta + +default_args = { + 'owner': 'mlops-team', + 'retries': 2, + 'retry_delay': timedelta(minutes=5) +} + +def extract_data(): + """Extraction des données HLTV""" + import pandas as pd + # Simuler extraction (remplacer par scraping réel) + df = pd.read_csv('data/raw/results.csv') + df.to_parquet('data/staging/results.parquet') + return len(df) + +def validate_data(): + """Validation avec Great Expectations""" + import pandas as pd + import great_expectations as ge + + df = pd.read_parquet('data/staging/results.parquet') + ge_df = ge.from_pandas(df) + + # Validations + result1 = ge_df.expect_column_to_exist('date') + result2 = ge_df.expect_column_values_to_not_be_null('team_1') + result3 = ge_df.expect_column_values_to_be_between('rank_1', 1, 50) + + assert result1.success, f"Validation failed: {result1.result}" + assert result2.success, f"Validation failed: {result2.result}" + assert result3.success, f"Validation failed: {result3.result}" + +def transform_data(): + """Feature engineering""" + import pandas as pd + df = pd.read_parquet('data/staging/results.parquet') + + # Créer features + df['rank_diff'] = df['rank_1'] - df['rank_2'] + df['win_rate_1'] = df.groupby('team_1')['result_1'].transform('mean') + + df.to_parquet('data/processed/features.parquet') + +with DAG( + 'csgo_data_ingestion', + default_args=default_args, + schedule_interval='@daily', + start_date=datetime(2025, 1, 1), + catchup=False +) as dag: + + extract = PythonOperator(task_id='extract', python_callable=extract_data) + validate = PythonOperator(task_id='validate', python_callable=validate_data) + transform = PythonOperator(task_id='transform', python_callable=transform_data) + + extract >> validate >> transform \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e62e70c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,31 @@ +version: '3.8' + +services: + prometheus: + image: prom/prometheus + ports: + - "9090:9090" + volumes: + - ./config/prometheus.yml:/etc/prometheus/prometheus.yml + + grafana: + image: grafana/grafana + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=admin + volumes: + - ./config/grafana:/etc/grafana/provisioning + + mlflow: + image: ghcr.io/mlflow/mlflow + ports: + - "5000:5000" + command: mlflow server --host 0.0.0.0 + + airflow: + image: apache/airflow:2.7.0 + ports: + - "8080:8080" + environment: + - AIRFLOW__CORE__EXECUTOR=LocalExecutor \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..c62fc23 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,20 @@ +FROM python:3.10-slim + +WORKDIR /app + +# Copier les fichiers de dépendances +COPY pyproject.toml poetry.lock ./ + +# Installer Poetry et les dépendances +RUN pip install poetry && \ + poetry config virtualenvs.create false && \ + poetry install --no-dev + +# Copier le code +COPY src/ ./src/ + +# Version de l'image +LABEL version="1.0.0" +LABEL description="CS:GO MLOps Platform" + +CMD ["python", "src/api/main.py"] \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 619c397..7c2b8a8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,11 @@ dependencies = [ "scikit-learn (>=1.7.2,<2.0.0)", "torch (>=2.8.0,<3.0.0)", "mlflow (>=3.4.0,<4.0.0)", - "dvc (>=3.63.0,<4.0.0)" + "dvc (>=3.63.0,<4.0.0)", + "prometheus-client (>=0.20.0,<1.0.0)", + "requests (>=2.31.0,<3.0.0)", + "fastapi (>=0.104.0,<1.0.0)", + "pydantic (>=2.5.0,<3.0.0)" ] diff --git a/src/api/main.py b/src/api/main.py new file mode 100644 index 0000000..b5115c6 --- /dev/null +++ b/src/api/main.py @@ -0,0 +1,113 @@ +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +import mlflow.sklearn +import time +import uuid +from src.monitoring.model_monitor import ModelMonitor + +app = FastAPI(title="CS:GO Prediction API") + +# Charger le modèle +model = mlflow.sklearn.load_model("models:/csgo-predictor/production") +monitor = ModelMonitor() + +# Stockage temporaire des prédictions +predictions_store = {} + +class MatchInput(BaseModel): + team_1: str + team_2: str + rank_1: int + rank_2: int + map_name: str + +class PredictionOutput(BaseModel): + id: str + winner: int + probability: float + confidence: str + latency_ms: float + +class FeedbackInput(BaseModel): + prediction_id: str + actual: int + +def create_features(match): + """Feature engineering basique""" + return [[ + match.rank_1, + match.rank_2, + match.rank_1 - match.rank_2, # rank_diff + 1 if match.map_name in ['dust2', 'mirage', 'inferno'] else 0 # popular_map + ]] + +@app.post("/predict", response_model=PredictionOutput) +async def predict_match(match: MatchInput): + """Prédire le gagnant d'un match""" + start_time = time.time() + + try: + # Feature engineering + features = create_features(match) + + # Prédiction + prediction = model.predict(features)[0] + proba = model.predict_proba(features)[0] + + latency = (time.time() - start_time) * 1000 + + # Générer ID unique + prediction_id = str(uuid.uuid4()) + + # Stocker la prédiction + predictions_store[prediction_id] = { + 'prediction': int(prediction), + 'latency': latency + } + + # Log monitoring (sans actual pour l'instant) + monitor.log_prediction(int(prediction), None, latency) + + return PredictionOutput( + id=prediction_id, + winner=int(prediction), + probability=float(max(proba)), + confidence="high" if max(proba) > 0.7 else "medium", + latency_ms=latency + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/feedback") +async def submit_feedback(feedback: FeedbackInput): + """Soumettre le résultat réel pour une prédiction""" + if feedback.prediction_id not in predictions_store: + raise HTTPException(status_code=404, detail="Prediction not found") + + pred_data = predictions_store[feedback.prediction_id] + + # Mettre à jour le monitoring avec la valeur réelle + monitor.log_prediction(pred_data['prediction'], feedback.actual, pred_data['latency']) + + # Nettoyer le stockage + del predictions_store[feedback.prediction_id] + + return {"status": "feedback recorded"} + +@app.get("/health") +async def health_check(): + """Health check endpoint""" + metrics = monitor.calculate_rolling_metrics() + + return { + "status": "healthy" if metrics['accuracy'] > 0.65 else "degraded", + "metrics": metrics, + "model_version": model.metadata.run_id + } + +@app.get("/metrics") +async def get_metrics(): + """Exposer métriques Prometheus""" + from prometheus_client import generate_latest + return generate_latest() \ No newline at end of file diff --git a/src/models/train.py b/src/models/train.py new file mode 100644 index 0000000..2937ed2 --- /dev/null +++ b/src/models/train.py @@ -0,0 +1,25 @@ +import mlflow +import mlflow.sklearn +from sklearn.ensemble import RandomForestClassifier + +mlflow.set_tracking_uri("http://localhost:5000") +mlflow.set_experiment("csgo-match-prediction") + +def train_model(X_train, y_train, params): + with mlflow.start_run(run_name="rf-v1"): + # Log params + mlflow.log_params(params) + mlflow.log_param("data_version", "v1.0.0") + + # Train + model = RandomForestClassifier(**params) + model.fit(X_train, y_train) + + # Log metrics + accuracy = model.score(X_test, y_test) + mlflow.log_metric("accuracy", accuracy) + + # Log model + mlflow.sklearn.log_model(model, "model") + + return model \ No newline at end of file diff --git a/src/monitoring/alert_manager.py b/src/monitoring/alert_manager.py new file mode 100644 index 0000000..843036f --- /dev/null +++ b/src/monitoring/alert_manager.py @@ -0,0 +1,83 @@ +import requests +import smtplib +from email.mime.text import MIMEText +import time + +class AlertManager: + def __init__(self, config): + self.slack_webhook = config['slack_webhook'] + self.email_config = config['email'] + self.pagerduty_key = config['pagerduty_key'] + + def send_slack_alert(self, message, severity): + """Envoyer alerte Slack""" + color = { + 'CRITICAL': '#FF0000', + 'WARNING': '#FFA500', + 'INFO': '#00FF00' + }.get(severity, '#808080') + + payload = { + 'attachments': [{ + 'color': color, + 'title': f'{severity}: CS:GO MLOps Alert', + 'text': message, + 'footer': 'MLOps Platform', + 'ts': int(time.time()) + }] + } + + requests.post(self.slack_webhook, json=payload) + + def send_email_alert(self, subject, body, recipients): + """Envoyer alerte email""" + msg = MIMEText(body) + msg['Subject'] = subject + msg['From'] = self.email_config['from'] + msg['To'] = ', '.join(recipients) + + with smtplib.SMTP(self.email_config['smtp_server']) as server: + server.send_message(msg) + + def trigger_pagerduty(self, message): + """Déclencher PagerDuty pour alertes critiques""" + payload = { + 'routing_key': self.pagerduty_key, + 'event_action': 'trigger', + 'payload': { + 'summary': message, + 'severity': 'critical', + 'source': 'csgo-mlops' + } + } + + requests.post( + 'https://events.pagerduty.com/v2/enqueue', + json=payload + ) + + def handle_alert(self, alert_type, metrics): + """Gestionnaire central d'alertes""" + if alert_type == 'accuracy_drop': + severity = 'CRITICAL' if metrics['accuracy'] < 0.60 else 'WARNING' + message = f"Model accuracy dropped to {metrics['accuracy']:.2%}" + + self.send_slack_alert(message, severity) + + if severity == 'CRITICAL': + self.trigger_pagerduty(message) + self.send_email_alert( + subject='CRITICAL: Model Performance Degradation', + body=message, + recipients=['oncall@team.com'] + ) + + elif alert_type == 'data_drift': + self.send_slack_alert( + f"Data drift detected: p-value={metrics['p_value']:.4f}", + 'WARNING' + ) + + elif alert_type == 'api_error': + if metrics['error_rate'] > 0.05: + self.trigger_pagerduty(f"API error rate: {metrics['error_rate']:.1%}") \ No newline at end of file diff --git a/src/monitoring/business_monitor.py b/src/monitoring/business_monitor.py new file mode 100644 index 0000000..2e992f8 --- /dev/null +++ b/src/monitoring/business_monitor.py @@ -0,0 +1,37 @@ +import numpy as np + +class BusinessMonitor: + def __init__(self): + self.bets = [] + + def calculate_roi(self, predictions, outcomes, odds): + """Calculer le ROI sur paris""" + total_bet = len(predictions) * 10 # 10€ par pari + + winnings = 0 + for pred, outcome, odd in zip(predictions, outcomes, odds): + if pred == outcome: + winnings += 10 * odd + + roi = (winnings - total_bet) / total_bet + return roi + + def calculate_sharpe_ratio(self, returns): + """Calculer le Sharpe ratio""" + return np.mean(returns) / np.std(returns) if np.std(returns) > 0 else 0 + + def monitor_business_metrics(self): + """Métriques business complètes""" + if len(self.bets) < 100: + return None + + recent_bets = self.bets[-100:] + + metrics = { + 'roi_7d': self.calculate_roi(*zip(*recent_bets[-49:])), + 'roi_30d': self.calculate_roi(*zip(*recent_bets)), + 'win_rate': np.mean([b[0] == b[1] for b in recent_bets]), + 'avg_odds': np.mean([b[2] for b in recent_bets]) + } + + return metrics \ No newline at end of file diff --git a/src/monitoring/data_monitor.py b/src/monitoring/data_monitor.py new file mode 100644 index 0000000..81921ec --- /dev/null +++ b/src/monitoring/data_monitor.py @@ -0,0 +1,51 @@ +import pandas as pd +from scipy import stats +import logging + +class DataMonitor: + def __init__(self, baseline_path): + self.baseline = pd.read_parquet(baseline_path) + self.logger = logging.getLogger(__name__) + + def check_volume(self, new_data): + """Vérifier le volume de données""" + daily_count = len(new_data) + if daily_count < 50: + self.logger.warning(f"Low data volume: {daily_count} matches") + return False + return True + + def check_drift(self, new_data, column, threshold=0.05): + """Détection de drift avec KS-test""" + baseline_col = self.baseline[column].dropna() + new_col = new_data[column].dropna() + + ks_stat, p_value = stats.ks_2samp(baseline_col, new_col) + + if p_value < threshold: + self.logger.warning(f"Drift detected in {column}: p={p_value:.4f}") + return True + return False + + def check_quality(self, new_data): + """Vérifier la qualité des données""" + metrics = { + 'missing_rate': new_data.isnull().mean().mean(), + 'duplicates': new_data.duplicated().sum(), + 'unique_teams': new_data['team_1'].nunique() + } + + if metrics['missing_rate'] > 0.05: + self.logger.error(f"High missing rate: {metrics['missing_rate']:.2%}") + + return metrics + + def monitor(self, new_data): + """Monitoring complet""" + results = { + 'volume_ok': self.check_volume(new_data), + 'quality': self.check_quality(new_data), + 'drift_rank': self.check_drift(new_data, 'rank_1'), + 'drift_score': self.check_drift(new_data, 'result_1') + } + return results \ No newline at end of file diff --git a/src/monitoring/model_monitor.py b/src/monitoring/model_monitor.py new file mode 100644 index 0000000..73d6edf --- /dev/null +++ b/src/monitoring/model_monitor.py @@ -0,0 +1,62 @@ +from prometheus_client import Gauge, Counter, Histogram +import numpy as np + +class ModelMonitor: + def __init__(self): + # Métriques Prometheus + self.accuracy_gauge = Gauge('model_accuracy', 'Model accuracy') + self.prediction_counter = Counter('predictions_total', 'Total predictions') + self.latency_histogram = Histogram('prediction_latency', 'Prediction latency') + + self.predictions = [] + self.actuals = [] + + def log_prediction(self, prediction, actual, latency): + """Logger une prédiction""" + self.predictions.append(prediction) + self.actuals.append(actual) + + self.prediction_counter.inc() + self.latency_histogram.observe(latency) + + # Calculer accuracy glissante (100 dernières prédictions) + if len(self.predictions) >= 100: + recent_acc = np.mean( + np.array(self.predictions[-100:]) == np.array(self.actuals[-100:]) + ) + self.accuracy_gauge.set(recent_acc) + + def calculate_rolling_metrics(self, window_days=7): + """Métriques sur fenêtre glissante""" + from sklearn.metrics import accuracy_score, precision_score, f1_score + + # Filtrer les N derniers jours + recent_preds = self.predictions[-window_days*50:] # ~50 matchs/jour + recent_actual = self.actuals[-window_days*50:] + + metrics = { + 'accuracy': accuracy_score(recent_actual, recent_preds), + 'precision': precision_score(recent_actual, recent_preds, average='weighted'), + 'f1': f1_score(recent_actual, recent_preds, average='weighted') + } + + return metrics + + def detect_performance_degradation(self, threshold=0.65): + """Détecter une dégradation""" + metrics = self.calculate_rolling_metrics(window_days=7) + + if metrics['accuracy'] < threshold: + return { + 'alert': True, + 'severity': 'CRITICAL', + 'message': f"Accuracy dropped to {metrics['accuracy']:.2%}" + } + elif metrics['accuracy'] < threshold + 0.05: + return { + 'alert': True, + 'severity': 'WARNING', + 'message': f"Accuracy at {metrics['accuracy']:.2%}" + } + + return {'alert': False} \ No newline at end of file diff --git a/tests/demo_error_case.py b/tests/demo_error_case.py new file mode 100644 index 0000000..ff10e2b --- /dev/null +++ b/tests/demo_error_case.py @@ -0,0 +1,51 @@ +import requests +import time + +def inject_bad_predictions(): + """Simuler une dégradation de performance""" + + print("=" * 50) + print("CAS EN ERREUR - Dégradation Performance") + print("=" * 50) + + # Injecter 100 mauvaises prédictions + print("\n⚠️ Injection de prédictions erronées...") + + for i in range(100): + match = { + "team_1": f"Team_{i}", + "team_2": f"Team_{i+1}", + "rank_1": 10, + "rank_2": 11, + "map_name": "Dust2" + } + + # Faire prédiction + response = requests.post("http://localhost:8000/predict", json=match) + + # Simuler que toutes sont fausses + requests.post("http://localhost:8000/feedback", json={ + "prediction_id": response.json()['id'], + "actual": 0 # Opposé de la prédiction + }) + + time.sleep(0.1) + + print("✅ 100 mauvaises prédictions injectées") + + # Vérifier métriques + print("\n📊 Vérification des métriques...") + metrics = requests.get("http://localhost:8000/health").json() + + print(f"Accuracy actuelle: {metrics['metrics']['accuracy']:.1%}") + + if metrics['metrics']['accuracy'] < 0.60: + print("\n🚨 ALERTE CRITIQUE DÉCLENCHÉE!") + print("✅ Dashboard montre l'anomalie") + print("✅ Alerte Slack envoyée") + print("✅ Email oncall envoyé") + + print(f"\n🎨 Voir dashboard: http://localhost:3000/d/csgo-dashboard") + +if __name__ == "__main__": + inject_bad_predictions() \ No newline at end of file diff --git a/tests/demo_normal_case.py b/tests/demo_normal_case.py new file mode 100644 index 0000000..1b4ec11 --- /dev/null +++ b/tests/demo_normal_case.py @@ -0,0 +1,39 @@ +import requests +import json + +def test_normal_prediction(): + """Démo cas normal - Match entre deux équipes top 10""" + + match = { + "team_1": "Natus Vincere", + "team_2": "FaZe Clan", + "rank_1": 1, + "rank_2": 2, + "map_name": "Mirage" + } + + print("=" * 50) + print("CAS NORMAL - Prédiction Match Pro") + print("=" * 50) + print(f"\n📋 Input:") + print(json.dumps(match, indent=2)) + + # Appel API + response = requests.post("http://localhost:8000/predict", json=match) + + print(f"\n✅ Réponse:") + result = response.json() + print(json.dumps(result, indent=2)) + + print(f"\n📊 Interprétation:") + winner = match['team_1'] if result['winner'] == 1 else match['team_2'] + print(f"Gagnant prédit: {winner}") + print(f"Confiance: {result['probability']:.1%}") + print(f"Latence: {result['latency_ms']:.1f}ms") + + # Vérifier dashboard Grafana + print(f"\n🎨 Dashboard: http://localhost:3000") + print(f"MLflow: http://localhost:5000") + +if __name__ == "__main__": + test_normal_prediction() \ No newline at end of file