""" Scrutateur (Vote Counting & Verification Module) Module de dépouillement pour: - Vérifier l'intégrité de la blockchain - Compter les votes chiffrés - Générer des rapports de vérification - Valider les résultats avec preuves cryptographiques Usage: python -m backend.scripts.scrutator --election-id 1 --verify python -m backend.scripts.scrutator --election-id 1 --count python -m backend.scripts.scrutator --election-id 1 --report """ import argparse import json from datetime import datetime from typing import Dict, List, Tuple from sqlalchemy.orm import Session from backend.blockchain import BlockchainManager from backend.models import Vote, Election, Candidate from backend.database import SessionLocal from backend.crypto.hashing import SecureHash class Scrutator: """ Scrutateur - Compteur et vérificateur de votes. Responsabilités: 1. Vérifier l'intégrité de la blockchain 2. Compter les votes chiffrés 3. Générer des rapports 4. Valider les résultats """ def __init__(self, election_id: int): """ Initialiser le scrutateur pour une élection. Args: election_id: ID de l'élection à dépouiller """ self.election_id = election_id self.db = SessionLocal() self.blockchain_manager = BlockchainManager() self.blockchain = None self.election = None self.votes = [] def load_election(self) -> bool: """ Charger les données de l'élection. Returns: True si l'élection existe, False sinon """ try: self.election = self.db.query(Election).filter( Election.id == self.election_id ).first() if not self.election: print(f"✗ Élection {self.election_id} non trouvée") return False print(f"✓ Élection chargée: {self.election.name}") return True except Exception as e: print(f"✗ Erreur lors du chargement de l'élection: {e}") return False def load_blockchain(self) -> bool: """ Charger la blockchain de l'élection. Returns: True si la blockchain est chargée """ try: self.blockchain = self.blockchain_manager.get_or_create_blockchain( self.election_id ) print(f"✓ Blockchain chargée: {self.blockchain.get_block_count()} blocs") return True except Exception as e: print(f"✗ Erreur lors du chargement de la blockchain: {e}") return False def load_votes(self) -> bool: """ Charger les votes de la base de données. Returns: True si les votes sont chargés """ try: self.votes = self.db.query(Vote).filter( Vote.election_id == self.election_id ).all() print(f"✓ {len(self.votes)} votes chargés") return True except Exception as e: print(f"✗ Erreur lors du chargement des votes: {e}") return False def verify_blockchain_integrity(self) -> bool: """ Vérifier l'intégrité de la blockchain. Vérifie: - La chaîne de hachage (chaque bloc lie au précédent) - L'absence de modification - La validité des signatures Returns: True si la blockchain est valide """ print("\n" + "=" * 60) print("VÉRIFICATION DE L'INTÉGRITÉ DE LA BLOCKCHAIN") print("=" * 60) if not self.blockchain: print("✗ Blockchain non chargée") return False is_valid = self.blockchain.verify_chain_integrity() if is_valid: print("✓ Chaîne de hachage valide") print(f"✓ {self.blockchain.get_block_count()} blocs vérifiés") print(f"✓ Aucune modification détectée") else: print("✗ ERREUR: Intégrité compromise!") print(" La blockchain a été modifiée") return is_valid def count_votes(self) -> Dict[str, int]: """ Compter les votes par candidat. Returns: Dictionnaire {candidat_name: count} """ print("\n" + "=" * 60) print("DÉPOUILLEMENT DES VOTES") print("=" * 60) vote_counts: Dict[str, int] = {} for vote in self.votes: candidate = self.db.query(Candidate).filter( Candidate.id == vote.candidate_id ).first() if candidate: if candidate.name not in vote_counts: vote_counts[candidate.name] = 0 vote_counts[candidate.name] += 1 # Afficher les résultats total = sum(vote_counts.values()) print(f"\nTotal de votes: {total}") print() for candidate_name in sorted(vote_counts.keys()): count = vote_counts[candidate_name] percentage = (count / total * 100) if total > 0 else 0 bar_length = int(percentage / 2) bar = "█" * bar_length + "░" * (50 - bar_length) print(f"{candidate_name:<20} {count:>6} votes ({percentage:>5.1f}%)") print(f"{'':20} {bar}") return vote_counts def verify_vote_count_consistency(self, vote_counts: Dict[str, int]) -> bool: """ Vérifier la cohérence entre la base de données et la blockchain. Args: vote_counts: Résultats du dépouillement Returns: True si les comptes sont cohérents """ print("\n" + "=" * 60) print("VÉRIFICATION DE LA COHÉRENCE") print("=" * 60) blockchain_vote_count = self.blockchain.get_vote_count() db_vote_count = len(self.votes) print(f"Votes en base de données: {db_vote_count}") print(f"Votes dans la blockchain: {blockchain_vote_count}") if db_vote_count == blockchain_vote_count: print("✓ Les comptes sont cohérents") return True else: print("✗ ERREUR: Incohérence détectée!") print(f" Différence: {abs(db_vote_count - blockchain_vote_count)} votes") return False def generate_report(self, vote_counts: Dict[str, int]) -> dict: """ Générer un rapport complet de vérification. Args: vote_counts: Résultats du dépouillement Returns: Rapport complet avec tous les détails """ print("\n" + "=" * 60) print("RAPPORT DE VÉRIFICATION") print("=" * 60) blockchain_valid = self.blockchain.verify_chain_integrity() total_votes = sum(vote_counts.values()) report = { "timestamp": datetime.utcnow().isoformat(), "election": { "id": self.election.id, "name": self.election.name, "description": self.election.description, "start_date": self.election.start_date.isoformat(), "end_date": self.election.end_date.isoformat() }, "blockchain": { "total_blocks": self.blockchain.get_block_count(), "total_votes": self.blockchain.get_vote_count(), "chain_valid": blockchain_valid, "genesis_block": { "index": 0, "hash": self.blockchain.chain[0].block_hash if self.blockchain.chain else None, "timestamp": self.blockchain.chain[0].timestamp if self.blockchain.chain else None } }, "results": { "total_votes": total_votes, "candidates": [] }, "verification": { "blockchain_integrity": blockchain_valid, "vote_count_consistency": len(self.votes) == self.blockchain.get_vote_count(), "status": "VALID" if (blockchain_valid and len(self.votes) == self.blockchain.get_vote_count()) else "INVALID" } } # Ajouter les résultats par candidat for candidate_name in sorted(vote_counts.keys()): count = vote_counts[candidate_name] percentage = (count / total_votes * 100) if total_votes > 0 else 0 report["results"]["candidates"].append({ "name": candidate_name, "votes": count, "percentage": round(percentage, 2) }) # Afficher le résumé print(f"\nÉlection: {self.election.name}") print(f"Votes valides: {total_votes}") print(f"Intégrité blockchain: {'✓ VALIDE' if blockchain_valid else '✗ INVALIDE'}") print(f"Cohérence votes: {'✓ COHÉRENTE' if report['verification']['vote_count_consistency'] else '✗ INCOHÉRENTE'}") print(f"\nStatut général: {report['verification']['status']}") return report def export_report(self, report: dict, filename: str = None) -> str: """ Exporter le rapport en JSON. Args: report: Rapport à exporter filename: Nom du fichier (si None, génère automatiquement) Returns: Chemin du fichier exporté """ if filename is None: timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S") filename = f"election_{self.election_id}_report_{timestamp}.json" try: with open(filename, "w") as f: json.dump(report, f, indent=2, default=str) print(f"\n✓ Rapport exporté: {filename}") return filename except Exception as e: print(f"✗ Erreur lors de l'export: {e}") return "" def close(self): """Fermer la session de base de données.""" self.db.close() def run_full_scrutiny(self) -> Tuple[bool, dict]: """ Exécuter le dépouillement complet. Returns: (success, report) - Tuple avec succès et rapport """ print("\n" + "█" * 60) print("█ DÉMARRAGE DU DÉPOUILLEMENT ÉLECTORAL") print("█" * 60) # 1. Charger les données if not self.load_election(): return False, {} if not self.load_blockchain(): return False, {} if not self.load_votes(): return False, {} # 2. Vérifier l'intégrité blockchain_valid = self.verify_blockchain_integrity() # 3. Compter les votes vote_counts = self.count_votes() # 4. Vérifier la cohérence consistency_valid = self.verify_vote_count_consistency(vote_counts) # 5. Générer le rapport report = self.generate_report(vote_counts) print("\n" + "█" * 60) print("█ DÉPOUILLEMENT TERMINÉ") print("█" * 60 + "\n") success = blockchain_valid and consistency_valid return success, report def main(): """Entrée principale du scrutateur.""" parser = argparse.ArgumentParser( description="Scrutateur - Vote counting and verification" ) parser.add_argument( "--election-id", type=int, required=True, help="ID de l'élection à dépouiller" ) parser.add_argument( "--verify", action="store_true", help="Vérifier l'intégrité de la blockchain" ) parser.add_argument( "--count", action="store_true", help="Compter les votes" ) parser.add_argument( "--report", action="store_true", help="Générer un rapport complet" ) parser.add_argument( "--export", type=str, help="Exporter le rapport en JSON" ) args = parser.parse_args() scrutator = Scrutator(args.election_id) try: if args.verify or args.count or args.report or args.export: # Mode spécifique if not scrutator.load_election() or not scrutator.load_blockchain(): return if args.verify: scrutator.verify_blockchain_integrity() if args.count or args.report or args.export: if not scrutator.load_votes(): return vote_counts = scrutator.count_votes() if args.report or args.export: report = scrutator.generate_report(vote_counts) if args.export: scrutator.export_report(report, args.export) else: # Mode complet success, report = scrutator.run_full_scrutiny() if report: # Export par défaut scrutator.export_report(report) exit(0 if success else 1) finally: scrutator.close() if __name__ == "__main__": main()