59 lines
1.9 KiB
Python
59 lines
1.9 KiB
Python
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from datetime import datetime, timedelta
|
|
|
|
default_args = {
|
|
'owner': 'mlops-team',
|
|
'retries': 2,
|
|
'retry_delay': timedelta(minutes=5)
|
|
}
|
|
|
|
def extract_data():
|
|
"""Extraction des données HLTV"""
|
|
import pandas as pd
|
|
# Simuler extraction (remplacer par scraping réel)
|
|
df = pd.read_csv('data/raw/results.csv')
|
|
df.to_parquet('data/staging/results.parquet')
|
|
return len(df)
|
|
|
|
def validate_data():
|
|
"""Validation avec Great Expectations"""
|
|
import pandas as pd
|
|
import great_expectations as ge
|
|
|
|
df = pd.read_parquet('data/staging/results.parquet')
|
|
ge_df = ge.from_pandas(df)
|
|
|
|
# Validations
|
|
result1 = ge_df.expect_column_to_exist('date')
|
|
result2 = ge_df.expect_column_values_to_not_be_null('team_1')
|
|
result3 = ge_df.expect_column_values_to_be_between('rank_1', 1, 50)
|
|
|
|
assert result1.success, f"Validation failed: {result1.result}"
|
|
assert result2.success, f"Validation failed: {result2.result}"
|
|
assert result3.success, f"Validation failed: {result3.result}"
|
|
|
|
def transform_data():
|
|
"""Feature engineering"""
|
|
import pandas as pd
|
|
df = pd.read_parquet('data/staging/results.parquet')
|
|
|
|
# Créer features
|
|
df['rank_diff'] = df['rank_1'] - df['rank_2']
|
|
df['win_rate_1'] = df.groupby('team_1')['result_1'].transform('mean')
|
|
|
|
df.to_parquet('data/processed/features.parquet')
|
|
|
|
with DAG(
|
|
'csgo_data_ingestion',
|
|
default_args=default_args,
|
|
schedule_interval='@daily',
|
|
start_date=datetime(2025, 1, 1),
|
|
catchup=False
|
|
) as dag:
|
|
|
|
extract = PythonOperator(task_id='extract', python_callable=extract_data)
|
|
validate = PythonOperator(task_id='validate', python_callable=validate_data)
|
|
transform = PythonOperator(task_id='transform', python_callable=transform_data)
|
|
|
|
extract >> validate >> transform |