Changes:
- Add next-themes dependency for theme management
- Create ThemeProvider wrapper for app root layout
- Set dark mode as default theme
- Create ThemeToggle component with Sun/Moon icons
- Add theme toggle to home page navigation
- Add theme toggle to dashboard header
- App now starts in dark mode with ability to switch to light mode
Styling uses existing Tailwind dark mode variables configured in
tailwind.config.ts and globals.css. All existing components automatically
support dark theme.
🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
246 lines
7.5 KiB
Python
246 lines
7.5 KiB
Python
"""
|
|
Service de base de données - Opérations CRUD.
|
|
"""
|
|
|
|
import logging
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func
|
|
from . import models, schemas
|
|
from .auth import hash_password, verify_password
|
|
from datetime import datetime, timezone
|
|
from .blockchain_elections import record_election_to_blockchain
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VoterService:
|
|
"""Service pour gérer les électeurs"""
|
|
|
|
@staticmethod
|
|
def create_voter(db: Session, voter: schemas.VoterRegister) -> models.Voter:
|
|
"""Créer un nouvel électeur"""
|
|
db_voter = models.Voter(
|
|
email=voter.email,
|
|
first_name=voter.first_name,
|
|
last_name=voter.last_name,
|
|
citizen_id=voter.citizen_id,
|
|
password_hash=hash_password(voter.password)
|
|
)
|
|
db.add(db_voter)
|
|
db.commit()
|
|
db.refresh(db_voter)
|
|
return db_voter
|
|
|
|
@staticmethod
|
|
def get_voter_by_email(db: Session, email: str) -> models.Voter:
|
|
"""Récupérer un électeur par email"""
|
|
return db.query(models.Voter).filter(
|
|
models.Voter.email == email
|
|
).first()
|
|
|
|
@staticmethod
|
|
def verify_voter_credentials(
|
|
db: Session,
|
|
email: str,
|
|
password: str
|
|
) -> models.Voter:
|
|
"""Vérifier les identifiants et retourner l'électeur"""
|
|
voter = VoterService.get_voter_by_email(db, email)
|
|
if not voter:
|
|
return None
|
|
if not verify_password(password, voter.password_hash):
|
|
return None
|
|
return voter
|
|
|
|
@staticmethod
|
|
def mark_as_voted(db: Session, voter_id: int) -> None:
|
|
"""Marquer l'électeur comme ayant voté"""
|
|
voter = db.query(models.Voter).filter(
|
|
models.Voter.id == voter_id
|
|
).first()
|
|
if voter:
|
|
voter.has_voted = True
|
|
voter.updated_at = datetime.now(timezone.utc)
|
|
db.commit()
|
|
|
|
|
|
class ElectionService:
|
|
"""Service pour gérer les élections"""
|
|
|
|
@staticmethod
|
|
def create_election(
|
|
db: Session,
|
|
name: str,
|
|
description: str,
|
|
start_date: datetime,
|
|
end_date: datetime,
|
|
elgamal_p: int = None,
|
|
elgamal_g: int = None,
|
|
is_active: bool = True,
|
|
creator_id: int = 0
|
|
) -> models.Election:
|
|
"""
|
|
Créer une nouvelle élection et l'enregistrer sur la blockchain.
|
|
|
|
Args:
|
|
db: Database session
|
|
name: Election name
|
|
description: Election description
|
|
start_date: Election start date
|
|
end_date: Election end date
|
|
elgamal_p: ElGamal prime (optional)
|
|
elgamal_g: ElGamal generator (optional)
|
|
is_active: Whether election is active
|
|
creator_id: ID of admin creating this election
|
|
|
|
Returns:
|
|
The created Election model
|
|
"""
|
|
# Create election in database
|
|
db_election = models.Election(
|
|
name=name,
|
|
description=description,
|
|
start_date=start_date,
|
|
end_date=end_date,
|
|
elgamal_p=elgamal_p,
|
|
elgamal_g=elgamal_g,
|
|
is_active=is_active
|
|
)
|
|
db.add(db_election)
|
|
db.commit()
|
|
db.refresh(db_election)
|
|
|
|
# Record to blockchain immediately after creation
|
|
try:
|
|
logger.debug(f"Recording election {db_election.id} ({name}) to blockchain")
|
|
|
|
# Get candidates for this election to include in blockchain record
|
|
candidates = db.query(models.Candidate).filter(
|
|
models.Candidate.election_id == db_election.id
|
|
).all()
|
|
|
|
logger.debug(f" Found {len(candidates)} candidates for election {db_election.id}")
|
|
|
|
candidates_data = [
|
|
{
|
|
"id": c.id,
|
|
"name": c.name,
|
|
"description": c.description or "",
|
|
"order": c.order or 0
|
|
}
|
|
for c in candidates
|
|
]
|
|
|
|
# Record election to blockchain
|
|
block = record_election_to_blockchain(
|
|
election_id=db_election.id,
|
|
election_name=name,
|
|
election_description=description,
|
|
candidates=candidates_data,
|
|
start_date=start_date.isoformat(),
|
|
end_date=end_date.isoformat(),
|
|
is_active=is_active,
|
|
creator_id=creator_id
|
|
)
|
|
logger.info(
|
|
f"✓ Election {db_election.id} recorded to blockchain "
|
|
f"(Block #{block.index}, Hash: {block.block_hash[:16]}...)"
|
|
)
|
|
except Exception as e:
|
|
# Log error but don't fail election creation
|
|
logger.error(
|
|
f"Warning: Could not record election {db_election.id} to blockchain: {e}",
|
|
exc_info=True
|
|
)
|
|
|
|
return db_election
|
|
|
|
@staticmethod
|
|
def get_active_election(db: Session) -> models.Election:
|
|
"""Récupérer l'élection active"""
|
|
now = datetime.now(timezone.utc)
|
|
return db.query(models.Election).filter(
|
|
models.Election.is_active == True,
|
|
models.Election.start_date <= now,
|
|
models.Election.end_date > now
|
|
).first()
|
|
|
|
@staticmethod
|
|
def get_election(db: Session, election_id: int) -> models.Election:
|
|
"""Récupérer une élection par ID"""
|
|
return db.query(models.Election).filter(
|
|
models.Election.id == election_id
|
|
).first()
|
|
|
|
|
|
class VoteService:
|
|
"""Service pour gérer les votes"""
|
|
|
|
@staticmethod
|
|
def record_vote(
|
|
db: Session,
|
|
voter_id: int,
|
|
election_id: int,
|
|
candidate_id: int,
|
|
encrypted_vote: bytes,
|
|
ballot_hash: str,
|
|
ip_address: str = None
|
|
) -> models.Vote:
|
|
"""Enregistrer un vote chiffré"""
|
|
db_vote = models.Vote(
|
|
voter_id=voter_id,
|
|
election_id=election_id,
|
|
candidate_id=candidate_id,
|
|
encrypted_vote=encrypted_vote,
|
|
ballot_hash=ballot_hash,
|
|
ip_address=ip_address,
|
|
timestamp=datetime.now(timezone.utc)
|
|
)
|
|
db.add(db_vote)
|
|
db.commit()
|
|
db.refresh(db_vote)
|
|
return db_vote
|
|
|
|
@staticmethod
|
|
def has_voter_voted(
|
|
db: Session,
|
|
voter_id: int,
|
|
election_id: int
|
|
) -> bool:
|
|
"""Vérifier si l'électeur a déjà voté"""
|
|
vote = db.query(models.Vote).filter(
|
|
models.Vote.voter_id == voter_id,
|
|
models.Vote.election_id == election_id
|
|
).first()
|
|
return vote is not None
|
|
|
|
@staticmethod
|
|
def get_election_results(
|
|
db: Session,
|
|
election_id: int
|
|
) -> list[schemas.ResultResponse]:
|
|
"""Calculer les résultats d'une élection"""
|
|
results = db.query(
|
|
models.Candidate.name,
|
|
func.count(models.Vote.id).label("vote_count")
|
|
).join(
|
|
models.Vote,
|
|
models.Candidate.id == models.Vote.candidate_id
|
|
).filter(
|
|
models.Vote.election_id == election_id
|
|
).group_by(
|
|
models.Candidate.id,
|
|
models.Candidate.name
|
|
).all()
|
|
|
|
total_votes = sum(r.vote_count for r in results)
|
|
|
|
return [
|
|
schemas.ResultResponse(
|
|
candidate_name=r.name,
|
|
vote_count=r.vote_count,
|
|
percentage=(r.vote_count / total_votes * 100) if total_votes > 0 else 0
|
|
)
|
|
for r in results
|
|
]
|