Add structured logging throughout the backend: - logging_config.py: Centralized logging configuration with colored output - main.py: Enhanced startup logging showing initialization progress - init_blockchain.py: Detailed blockchain initialization logging - services.py: Election creation logging Logging features: - Emoji prefixes for different log levels (INFO, DEBUG, ERROR, etc.) - Color-coded output for better visibility - Timestamp and module information - Exception stack traces on errors - Separate loggers for different modules This helps debug: - Backend startup sequence - Database initialization - Blockchain election recording - Service operations - Configuration issues
246 lines
7.5 KiB
Python
246 lines
7.5 KiB
Python
"""
|
|
Service de base de données - Opérations CRUD.
|
|
"""
|
|
|
|
import logging
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
from . import models, schemas
|
|
from .auth import hash_password, verify_password
|
|
from datetime import datetime
|
|
from .blockchain_elections import record_election_to_blockchain
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VoterService:
|
|
"""Service pour gérer les électeurs"""
|
|
|
|
@staticmethod
|
|
def create_voter(db: Session, voter: schemas.VoterRegister) -> models.Voter:
|
|
"""Créer un nouvel électeur"""
|
|
db_voter = models.Voter(
|
|
email=voter.email,
|
|
first_name=voter.first_name,
|
|
last_name=voter.last_name,
|
|
citizen_id=voter.citizen_id,
|
|
password_hash=hash_password(voter.password)
|
|
)
|
|
db.add(db_voter)
|
|
db.commit()
|
|
db.refresh(db_voter)
|
|
return db_voter
|
|
|
|
@staticmethod
|
|
def get_voter_by_email(db: Session, email: str) -> models.Voter:
|
|
"""Récupérer un électeur par email"""
|
|
return db.query(models.Voter).filter(
|
|
models.Voter.email == email
|
|
).first()
|
|
|
|
@staticmethod
|
|
def verify_voter_credentials(
|
|
db: Session,
|
|
email: str,
|
|
password: str
|
|
) -> models.Voter:
|
|
"""Vérifier les identifiants et retourner l'électeur"""
|
|
voter = VoterService.get_voter_by_email(db, email)
|
|
if not voter:
|
|
return None
|
|
if not verify_password(password, voter.password_hash):
|
|
return None
|
|
return voter
|
|
|
|
@staticmethod
|
|
def mark_as_voted(db: Session, voter_id: int) -> None:
|
|
"""Marquer l'électeur comme ayant voté"""
|
|
voter = db.query(models.Voter).filter(
|
|
models.Voter.id == voter_id
|
|
).first()
|
|
if voter:
|
|
voter.has_voted = True
|
|
voter.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
|
|
class ElectionService:
|
|
"""Service pour gérer les élections"""
|
|
|
|
@staticmethod
|
|
def create_election(
|
|
db: Session,
|
|
name: str,
|
|
description: str,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
elgamal_p: int = None,
|
|
elgamal_g: int = None,
|
|
is_active: bool = True,
|
|
creator_id: int = 0
|
|
) -> models.Election:
|
|
"""
|
|
Créer une nouvelle élection et l'enregistrer sur la blockchain.
|
|
|
|
Args:
|
|
db: Database session
|
|
name: Election name
|
|
description: Election description
|
|
start_date: Election start date
|
|
end_date: Election end date
|
|
elgamal_p: ElGamal prime (optional)
|
|
elgamal_g: ElGamal generator (optional)
|
|
is_active: Whether election is active
|
|
creator_id: ID of admin creating this election
|
|
|
|
Returns:
|
|
The created Election model
|
|
"""
|
|
# Create election in database
|
|
db_election = models.Election(
|
|
name=name,
|
|
description=description,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
elgamal_p=elgamal_p,
|
|
elgamal_g=elgamal_g,
|
|
is_active=is_active
|
|
)
|
|
db.add(db_election)
|
|
db.commit()
|
|
db.refresh(db_election)
|
|
|
|
# Record to blockchain immediately after creation
|
|
try:
|
|
logger.debug(f"Recording election {db_election.id} ({name}) to blockchain")
|
|
|
|
# Get candidates for this election to include in blockchain record
|
|
candidates = db.query(models.Candidate).filter(
|
|
models.Candidate.election_id == db_election.id
|
|
).all()
|
|
|
|
logger.debug(f" Found {len(candidates)} candidates for election {db_election.id}")
|
|
|
|
candidates_data = [
|
|
{
|
|
"id": c.id,
|
|
"name": c.name,
|
|
"description": c.description or "",
|
|
"order": c.order or 0
|
|
}
|
|
for c in candidates
|
|
]
|
|
|
|
# Record election to blockchain
|
|
block = record_election_to_blockchain(
|
|
election_id=db_election.id,
|
|
election_name=name,
|
|
election_description=description,
|
|
candidates=candidates_data,
|
|
start_date=start_date.isoformat(),
|
|
end_date=end_date.isoformat(),
|
|
is_active=is_active,
|
|
creator_id=creator_id
|
|
)
|
|
logger.info(
|
|
f"✓ Election {db_election.id} recorded to blockchain "
|
|
f"(Block #{block.index}, Hash: {block.block_hash[:16]}...)"
|
|
)
|
|
except Exception as e:
|
|
# Log error but don't fail election creation
|
|
logger.error(
|
|
f"Warning: Could not record election {db_election.id} to blockchain: {e}",
|
|
exc_info=True
|
|
)
|
|
|
|
return db_election
|
|
|
|
@staticmethod
|
|
def get_active_election(db: Session) -> models.Election:
|
|
"""Récupérer l'élection active"""
|
|
now = datetime.utcnow()
|
|
return db.query(models.Election).filter(
|
|
models.Election.is_active == True,
|
|
models.Election.start_date <= now,
|
|
models.Election.end_date > now
|
|
).first()
|
|
|
|
@staticmethod
|
|
def get_election(db: Session, election_id: int) -> models.Election:
|
|
"""Récupérer une élection par ID"""
|
|
return db.query(models.Election).filter(
|
|
models.Election.id == election_id
|
|
).first()
|
|
|
|
|
|
class VoteService:
|
|
"""Service pour gérer les votes"""
|
|
|
|
@staticmethod
|
|
def record_vote(
|
|
db: Session,
|
|
voter_id: int,
|
|
election_id: int,
|
|
candidate_id: int,
|
|
encrypted_vote: bytes,
|
|
ballot_hash: str,
|
|
ip_address: str = None
|
|
) -> models.Vote:
|
|
"""Enregistrer un vote chiffré"""
|
|
db_vote = models.Vote(
|
|
voter_id=voter_id,
|
|
election_id=election_id,
|
|
candidate_id=candidate_id,
|
|
encrypted_vote=encrypted_vote,
|
|
ballot_hash=ballot_hash,
|
|
ip_address=ip_address,
|
|
timestamp=datetime.utcnow()
|
|
)
|
|
db.add(db_vote)
|
|
db.commit()
|
|
db.refresh(db_vote)
|
|
return db_vote
|
|
|
|
@staticmethod
|
|
def has_voter_voted(
|
|
db: Session,
|
|
voter_id: int,
|
|
election_id: int
|
|
) -> bool:
|
|
"""Vérifier si l'électeur a déjà voté"""
|
|
vote = db.query(models.Vote).filter(
|
|
models.Vote.voter_id == voter_id,
|
|
models.Vote.election_id == election_id
|
|
).first()
|
|
return vote is not None
|
|
|
|
@staticmethod
|
|
def get_election_results(
|
|
db: Session,
|
|
election_id: int
|
|
) -> list[schemas.ResultResponse]:
|
|
"""Calculer les résultats d'une élection"""
|
|
results = db.query(
|
|
models.Candidate.name,
|
|
func.count(models.Vote.id).label("vote_count")
|
|
).join(
|
|
models.Vote,
|
|
models.Candidate.id == models.Vote.candidate_id
|
|
).filter(
|
|
models.Vote.election_id == election_id
|
|
).group_by(
|
|
models.Candidate.id,
|
|
models.Candidate.name
|
|
).all()
|
|
|
|
total_votes = sum(r.vote_count for r in results)
|
|
|
|
return [
|
|
schemas.ResultResponse(
|
|
candidate_name=r.name,
|
|
vote_count=r.vote_count,
|
|
percentage=(r.vote_count / total_votes * 100) if total_votes > 0 else 0
|
|
)
|
|
for r in results
|
|
]
|