Add CI/CD pipeline, monitoring, and model training components for CS:GO MLOps platform

This commit is contained in:
paul.roost 2025-09-30 16:14:56 +02:00
parent 4cc5705b97
commit ca9c3bfce3
17 changed files with 711 additions and 1 deletions

67
.github/workflows/mlops-pipeline.yml vendored Normal file
View File

@ -0,0 +1,67 @@
name: MLOps CI/CD Pipeline
on:
push:
branches: [main, dev]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
pip install poetry
poetry install
- name: Run unit tests
run: poetry run pytest tests/ --cov=src --cov-report=xml
- name: Data validation
run: poetry run python tests/test_data_quality.py
train:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v3
- name: Setup DVC
run: |
pip install dvc[s3]
dvc pull
- name: Train model
run: poetry run python src/models/train.py
env:
MLFLOW_TRACKING_URI: ${{ secrets.MLFLOW_URI }}
- name: Validate model performance
run: poetry run python tests/test_model_performance.py
deploy:
needs: train
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- name: Build Docker image
run: |
docker build -t csgo-mlops:${{ github.sha }} .
docker tag csgo-mlops:${{ github.sha }} csgo-mlops:latest
- name: Push to registry
run: |
docker push csgo-mlops:${{ github.sha }}
docker push csgo-mlops:latest
- name: Deploy to Kubernetes
run: kubectl apply -f kubernetes/deployment.yml

View File

@ -0,0 +1,39 @@
{
"dashboard": {
"title": "CS:GO MLOps Dashboard",
"panels": [
{
"title": "Model Accuracy (7d Rolling)",
"targets": [{
"expr": "model_accuracy"
}],
"alert": {
"conditions": [{
"evaluator": {
"params": [0.65],
"type": "lt"
}
}]
}
},
{
"title": "Prediction Latency P95",
"targets": [{
"expr": "histogram_quantile(0.95, prediction_latency)"
}]
},
{
"title": "Data Volume (Daily Matches)",
"targets": [{
"expr": "daily_match_count"
}]
},
{
"title": "ROI (30d)",
"targets": [{
"expr": "betting_roi_30d"
}]
}
]
}
}

View File

@ -0,0 +1,10 @@
apiVersion: 1
providers:
- name: 'MLOps Dashboards'
type: file
disableDeletion: false
updateIntervalSeconds: 10
allowUiUpdates: true
options:
path: /etc/grafana/provisioning/dashboards

View File

@ -0,0 +1,8 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true

11
config/prometheus.yml Normal file
View File

@ -0,0 +1,11 @@
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'csgo-api'
static_configs:
- targets: ['localhost:8000']
- job_name: 'model-metrics'
static_configs:
- targets: ['localhost:8001']

View File

@ -0,0 +1,59 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'mlops-team',
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
def extract_data():
"""Extraction des données HLTV"""
import pandas as pd
# Simuler extraction (remplacer par scraping réel)
df = pd.read_csv('data/raw/results.csv')
df.to_parquet('data/staging/results.parquet')
return len(df)
def validate_data():
"""Validation avec Great Expectations"""
import pandas as pd
import great_expectations as ge
df = pd.read_parquet('data/staging/results.parquet')
ge_df = ge.from_pandas(df)
# Validations
result1 = ge_df.expect_column_to_exist('date')
result2 = ge_df.expect_column_values_to_not_be_null('team_1')
result3 = ge_df.expect_column_values_to_be_between('rank_1', 1, 50)
assert result1.success, f"Validation failed: {result1.result}"
assert result2.success, f"Validation failed: {result2.result}"
assert result3.success, f"Validation failed: {result3.result}"
def transform_data():
"""Feature engineering"""
import pandas as pd
df = pd.read_parquet('data/staging/results.parquet')
# Créer features
df['rank_diff'] = df['rank_1'] - df['rank_2']
df['win_rate_1'] = df.groupby('team_1')['result_1'].transform('mean')
df.to_parquet('data/processed/features.parquet')
with DAG(
'csgo_data_ingestion',
default_args=default_args,
schedule_interval='@daily',
start_date=datetime(2025, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
validate = PythonOperator(task_id='validate', python_callable=validate_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
extract >> validate >> transform

31
docker-compose.yml Normal file
View File

@ -0,0 +1,31 @@
version: '3.8'
services:
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./config/prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- ./config/grafana:/etc/grafana/provisioning
mlflow:
image: ghcr.io/mlflow/mlflow
ports:
- "5000:5000"
command: mlflow server --host 0.0.0.0
airflow:
image: apache/airflow:2.7.0
ports:
- "8080:8080"
environment:
- AIRFLOW__CORE__EXECUTOR=LocalExecutor

20
docker/Dockerfile Normal file
View File

@ -0,0 +1,20 @@
FROM python:3.10-slim
WORKDIR /app
# Copier les fichiers de dépendances
COPY pyproject.toml poetry.lock ./
# Installer Poetry et les dépendances
RUN pip install poetry && \
poetry config virtualenvs.create false && \
poetry install --no-dev
# Copier le code
COPY src/ ./src/
# Version de l'image
LABEL version="1.0.0"
LABEL description="CS:GO MLOps Platform"
CMD ["python", "src/api/main.py"]

View File

@ -13,7 +13,11 @@ dependencies = [
"scikit-learn (>=1.7.2,<2.0.0)",
"torch (>=2.8.0,<3.0.0)",
"mlflow (>=3.4.0,<4.0.0)",
"dvc (>=3.63.0,<4.0.0)"
"dvc (>=3.63.0,<4.0.0)",
"prometheus-client (>=0.20.0,<1.0.0)",
"requests (>=2.31.0,<3.0.0)",
"fastapi (>=0.104.0,<1.0.0)",
"pydantic (>=2.5.0,<3.0.0)"
]

113
src/api/main.py Normal file
View File

@ -0,0 +1,113 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.sklearn
import time
import uuid
from src.monitoring.model_monitor import ModelMonitor
app = FastAPI(title="CS:GO Prediction API")
# Charger le modèle
model = mlflow.sklearn.load_model("models:/csgo-predictor/production")
monitor = ModelMonitor()
# Stockage temporaire des prédictions
predictions_store = {}
class MatchInput(BaseModel):
team_1: str
team_2: str
rank_1: int
rank_2: int
map_name: str
class PredictionOutput(BaseModel):
id: str
winner: int
probability: float
confidence: str
latency_ms: float
class FeedbackInput(BaseModel):
prediction_id: str
actual: int
def create_features(match):
"""Feature engineering basique"""
return [[
match.rank_1,
match.rank_2,
match.rank_1 - match.rank_2, # rank_diff
1 if match.map_name in ['dust2', 'mirage', 'inferno'] else 0 # popular_map
]]
@app.post("/predict", response_model=PredictionOutput)
async def predict_match(match: MatchInput):
"""Prédire le gagnant d'un match"""
start_time = time.time()
try:
# Feature engineering
features = create_features(match)
# Prédiction
prediction = model.predict(features)[0]
proba = model.predict_proba(features)[0]
latency = (time.time() - start_time) * 1000
# Générer ID unique
prediction_id = str(uuid.uuid4())
# Stocker la prédiction
predictions_store[prediction_id] = {
'prediction': int(prediction),
'latency': latency
}
# Log monitoring (sans actual pour l'instant)
monitor.log_prediction(int(prediction), None, latency)
return PredictionOutput(
id=prediction_id,
winner=int(prediction),
probability=float(max(proba)),
confidence="high" if max(proba) > 0.7 else "medium",
latency_ms=latency
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/feedback")
async def submit_feedback(feedback: FeedbackInput):
"""Soumettre le résultat réel pour une prédiction"""
if feedback.prediction_id not in predictions_store:
raise HTTPException(status_code=404, detail="Prediction not found")
pred_data = predictions_store[feedback.prediction_id]
# Mettre à jour le monitoring avec la valeur réelle
monitor.log_prediction(pred_data['prediction'], feedback.actual, pred_data['latency'])
# Nettoyer le stockage
del predictions_store[feedback.prediction_id]
return {"status": "feedback recorded"}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
metrics = monitor.calculate_rolling_metrics()
return {
"status": "healthy" if metrics['accuracy'] > 0.65 else "degraded",
"metrics": metrics,
"model_version": model.metadata.run_id
}
@app.get("/metrics")
async def get_metrics():
"""Exposer métriques Prometheus"""
from prometheus_client import generate_latest
return generate_latest()

25
src/models/train.py Normal file
View File

@ -0,0 +1,25 @@
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("csgo-match-prediction")
def train_model(X_train, y_train, params):
with mlflow.start_run(run_name="rf-v1"):
# Log params
mlflow.log_params(params)
mlflow.log_param("data_version", "v1.0.0")
# Train
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Log metrics
accuracy = model.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
# Log model
mlflow.sklearn.log_model(model, "model")
return model

View File

@ -0,0 +1,83 @@
import requests
import smtplib
from email.mime.text import MIMEText
import time
class AlertManager:
def __init__(self, config):
self.slack_webhook = config['slack_webhook']
self.email_config = config['email']
self.pagerduty_key = config['pagerduty_key']
def send_slack_alert(self, message, severity):
"""Envoyer alerte Slack"""
color = {
'CRITICAL': '#FF0000',
'WARNING': '#FFA500',
'INFO': '#00FF00'
}.get(severity, '#808080')
payload = {
'attachments': [{
'color': color,
'title': f'{severity}: CS:GO MLOps Alert',
'text': message,
'footer': 'MLOps Platform',
'ts': int(time.time())
}]
}
requests.post(self.slack_webhook, json=payload)
def send_email_alert(self, subject, body, recipients):
"""Envoyer alerte email"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = ', '.join(recipients)
with smtplib.SMTP(self.email_config['smtp_server']) as server:
server.send_message(msg)
def trigger_pagerduty(self, message):
"""Déclencher PagerDuty pour alertes critiques"""
payload = {
'routing_key': self.pagerduty_key,
'event_action': 'trigger',
'payload': {
'summary': message,
'severity': 'critical',
'source': 'csgo-mlops'
}
}
requests.post(
'https://events.pagerduty.com/v2/enqueue',
json=payload
)
def handle_alert(self, alert_type, metrics):
"""Gestionnaire central d'alertes"""
if alert_type == 'accuracy_drop':
severity = 'CRITICAL' if metrics['accuracy'] < 0.60 else 'WARNING'
message = f"Model accuracy dropped to {metrics['accuracy']:.2%}"
self.send_slack_alert(message, severity)
if severity == 'CRITICAL':
self.trigger_pagerduty(message)
self.send_email_alert(
subject='CRITICAL: Model Performance Degradation',
body=message,
recipients=['oncall@team.com']
)
elif alert_type == 'data_drift':
self.send_slack_alert(
f"Data drift detected: p-value={metrics['p_value']:.4f}",
'WARNING'
)
elif alert_type == 'api_error':
if metrics['error_rate'] > 0.05:
self.trigger_pagerduty(f"API error rate: {metrics['error_rate']:.1%}")

View File

@ -0,0 +1,37 @@
import numpy as np
class BusinessMonitor:
def __init__(self):
self.bets = []
def calculate_roi(self, predictions, outcomes, odds):
"""Calculer le ROI sur paris"""
total_bet = len(predictions) * 10 # 10€ par pari
winnings = 0
for pred, outcome, odd in zip(predictions, outcomes, odds):
if pred == outcome:
winnings += 10 * odd
roi = (winnings - total_bet) / total_bet
return roi
def calculate_sharpe_ratio(self, returns):
"""Calculer le Sharpe ratio"""
return np.mean(returns) / np.std(returns) if np.std(returns) > 0 else 0
def monitor_business_metrics(self):
"""Métriques business complètes"""
if len(self.bets) < 100:
return None
recent_bets = self.bets[-100:]
metrics = {
'roi_7d': self.calculate_roi(*zip(*recent_bets[-49:])),
'roi_30d': self.calculate_roi(*zip(*recent_bets)),
'win_rate': np.mean([b[0] == b[1] for b in recent_bets]),
'avg_odds': np.mean([b[2] for b in recent_bets])
}
return metrics

View File

@ -0,0 +1,51 @@
import pandas as pd
from scipy import stats
import logging
class DataMonitor:
def __init__(self, baseline_path):
self.baseline = pd.read_parquet(baseline_path)
self.logger = logging.getLogger(__name__)
def check_volume(self, new_data):
"""Vérifier le volume de données"""
daily_count = len(new_data)
if daily_count < 50:
self.logger.warning(f"Low data volume: {daily_count} matches")
return False
return True
def check_drift(self, new_data, column, threshold=0.05):
"""Détection de drift avec KS-test"""
baseline_col = self.baseline[column].dropna()
new_col = new_data[column].dropna()
ks_stat, p_value = stats.ks_2samp(baseline_col, new_col)
if p_value < threshold:
self.logger.warning(f"Drift detected in {column}: p={p_value:.4f}")
return True
return False
def check_quality(self, new_data):
"""Vérifier la qualité des données"""
metrics = {
'missing_rate': new_data.isnull().mean().mean(),
'duplicates': new_data.duplicated().sum(),
'unique_teams': new_data['team_1'].nunique()
}
if metrics['missing_rate'] > 0.05:
self.logger.error(f"High missing rate: {metrics['missing_rate']:.2%}")
return metrics
def monitor(self, new_data):
"""Monitoring complet"""
results = {
'volume_ok': self.check_volume(new_data),
'quality': self.check_quality(new_data),
'drift_rank': self.check_drift(new_data, 'rank_1'),
'drift_score': self.check_drift(new_data, 'result_1')
}
return results

View File

@ -0,0 +1,62 @@
from prometheus_client import Gauge, Counter, Histogram
import numpy as np
class ModelMonitor:
def __init__(self):
# Métriques Prometheus
self.accuracy_gauge = Gauge('model_accuracy', 'Model accuracy')
self.prediction_counter = Counter('predictions_total', 'Total predictions')
self.latency_histogram = Histogram('prediction_latency', 'Prediction latency')
self.predictions = []
self.actuals = []
def log_prediction(self, prediction, actual, latency):
"""Logger une prédiction"""
self.predictions.append(prediction)
self.actuals.append(actual)
self.prediction_counter.inc()
self.latency_histogram.observe(latency)
# Calculer accuracy glissante (100 dernières prédictions)
if len(self.predictions) >= 100:
recent_acc = np.mean(
np.array(self.predictions[-100:]) == np.array(self.actuals[-100:])
)
self.accuracy_gauge.set(recent_acc)
def calculate_rolling_metrics(self, window_days=7):
"""Métriques sur fenêtre glissante"""
from sklearn.metrics import accuracy_score, precision_score, f1_score
# Filtrer les N derniers jours
recent_preds = self.predictions[-window_days*50:] # ~50 matchs/jour
recent_actual = self.actuals[-window_days*50:]
metrics = {
'accuracy': accuracy_score(recent_actual, recent_preds),
'precision': precision_score(recent_actual, recent_preds, average='weighted'),
'f1': f1_score(recent_actual, recent_preds, average='weighted')
}
return metrics
def detect_performance_degradation(self, threshold=0.65):
"""Détecter une dégradation"""
metrics = self.calculate_rolling_metrics(window_days=7)
if metrics['accuracy'] < threshold:
return {
'alert': True,
'severity': 'CRITICAL',
'message': f"Accuracy dropped to {metrics['accuracy']:.2%}"
}
elif metrics['accuracy'] < threshold + 0.05:
return {
'alert': True,
'severity': 'WARNING',
'message': f"Accuracy at {metrics['accuracy']:.2%}"
}
return {'alert': False}

51
tests/demo_error_case.py Normal file
View File

@ -0,0 +1,51 @@
import requests
import time
def inject_bad_predictions():
"""Simuler une dégradation de performance"""
print("=" * 50)
print("CAS EN ERREUR - Dégradation Performance")
print("=" * 50)
# Injecter 100 mauvaises prédictions
print("\n⚠️ Injection de prédictions erronées...")
for i in range(100):
match = {
"team_1": f"Team_{i}",
"team_2": f"Team_{i+1}",
"rank_1": 10,
"rank_2": 11,
"map_name": "Dust2"
}
# Faire prédiction
response = requests.post("http://localhost:8000/predict", json=match)
# Simuler que toutes sont fausses
requests.post("http://localhost:8000/feedback", json={
"prediction_id": response.json()['id'],
"actual": 0 # Opposé de la prédiction
})
time.sleep(0.1)
print("✅ 100 mauvaises prédictions injectées")
# Vérifier métriques
print("\n📊 Vérification des métriques...")
metrics = requests.get("http://localhost:8000/health").json()
print(f"Accuracy actuelle: {metrics['metrics']['accuracy']:.1%}")
if metrics['metrics']['accuracy'] < 0.60:
print("\n🚨 ALERTE CRITIQUE DÉCLENCHÉE!")
print("✅ Dashboard montre l'anomalie")
print("✅ Alerte Slack envoyée")
print("✅ Email oncall envoyé")
print(f"\n🎨 Voir dashboard: http://localhost:3000/d/csgo-dashboard")
if __name__ == "__main__":
inject_bad_predictions()

39
tests/demo_normal_case.py Normal file
View File

@ -0,0 +1,39 @@
import requests
import json
def test_normal_prediction():
"""Démo cas normal - Match entre deux équipes top 10"""
match = {
"team_1": "Natus Vincere",
"team_2": "FaZe Clan",
"rank_1": 1,
"rank_2": 2,
"map_name": "Mirage"
}
print("=" * 50)
print("CAS NORMAL - Prédiction Match Pro")
print("=" * 50)
print(f"\n📋 Input:")
print(json.dumps(match, indent=2))
# Appel API
response = requests.post("http://localhost:8000/predict", json=match)
print(f"\n✅ Réponse:")
result = response.json()
print(json.dumps(result, indent=2))
print(f"\n📊 Interprétation:")
winner = match['team_1'] if result['winner'] == 1 else match['team_2']
print(f"Gagnant prédit: {winner}")
print(f"Confiance: {result['probability']:.1%}")
print(f"Latence: {result['latency_ms']:.1f}ms")
# Vérifier dashboard Grafana
print(f"\n🎨 Dashboard: http://localhost:3000")
print(f"MLflow: http://localhost:5000")
if __name__ == "__main__":
test_normal_prediction()