Restores all missing project files and fixes:
- Restored backend/blockchain.py with full blockchain implementation
- Restored backend/routes/votes.py with all API endpoints
- Restored frontend/components/voting-interface.tsx voting UI
- Fixed backend/crypto/hashing.py to handle both str and bytes
- Fixed pyproject.toml for Poetry compatibility
- All cryptographic modules tested and working
- ElGamal encryption, ZK proofs, digital signatures functional
- Blockchain integrity verification working
- Homomorphic vote counting implemented and tested
Phase 2 Backend API: ✓ COMPLETE
Phase 3 Frontend Interface: ✓ COMPLETE
Verification:
✓ Frontend builds successfully (12 routes)
✓ Backend crypto modules all import correctly
✓ Full voting simulation works end-to-end
✓ Blockchain records and verifies votes
✓ Homomorphic vote counting functional
🤖 Generated with Claude Code
Co-Authored-By: Claude <noreply@anthropic.com>
417 lines
13 KiB
Python
417 lines
13 KiB
Python
"""
|
|
Scrutateur (Vote Counting & Verification Module)
|
|
|
|
Module de dépouillement pour:
|
|
- Vérifier l'intégrité de la blockchain
|
|
- Compter les votes chiffrés
|
|
- Générer des rapports de vérification
|
|
- Valider les résultats avec preuves cryptographiques
|
|
|
|
Usage:
|
|
python -m backend.scripts.scrutator --election-id 1 --verify
|
|
python -m backend.scripts.scrutator --election-id 1 --count
|
|
python -m backend.scripts.scrutator --election-id 1 --report
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
from datetime import datetime
|
|
from typing import Dict, List, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from backend.blockchain import BlockchainManager
|
|
from backend.models import Vote, Election, Candidate
|
|
from backend.database import SessionLocal
|
|
from backend.crypto.hashing import SecureHash
|
|
|
|
|
|
class Scrutator:
|
|
"""
|
|
Scrutateur - Compteur et vérificateur de votes.
|
|
|
|
Responsabilités:
|
|
1. Vérifier l'intégrité de la blockchain
|
|
2. Compter les votes chiffrés
|
|
3. Générer des rapports
|
|
4. Valider les résultats
|
|
"""
|
|
|
|
def __init__(self, election_id: int):
|
|
"""
|
|
Initialiser le scrutateur pour une élection.
|
|
|
|
Args:
|
|
election_id: ID de l'élection à dépouiller
|
|
"""
|
|
self.election_id = election_id
|
|
self.db = SessionLocal()
|
|
self.blockchain_manager = BlockchainManager()
|
|
self.blockchain = None
|
|
self.election = None
|
|
self.votes = []
|
|
|
|
def load_election(self) -> bool:
|
|
"""
|
|
Charger les données de l'élection.
|
|
|
|
Returns:
|
|
True si l'élection existe, False sinon
|
|
"""
|
|
try:
|
|
self.election = self.db.query(Election).filter(
|
|
Election.id == self.election_id
|
|
).first()
|
|
|
|
if not self.election:
|
|
print(f"✗ Élection {self.election_id} non trouvée")
|
|
return False
|
|
|
|
print(f"✓ Élection chargée: {self.election.name}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du chargement de l'élection: {e}")
|
|
return False
|
|
|
|
def load_blockchain(self) -> bool:
|
|
"""
|
|
Charger la blockchain de l'élection.
|
|
|
|
Returns:
|
|
True si la blockchain est chargée
|
|
"""
|
|
try:
|
|
self.blockchain = self.blockchain_manager.get_or_create_blockchain(
|
|
self.election_id
|
|
)
|
|
print(f"✓ Blockchain chargée: {self.blockchain.get_block_count()} blocs")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du chargement de la blockchain: {e}")
|
|
return False
|
|
|
|
def load_votes(self) -> bool:
|
|
"""
|
|
Charger les votes de la base de données.
|
|
|
|
Returns:
|
|
True si les votes sont chargés
|
|
"""
|
|
try:
|
|
self.votes = self.db.query(Vote).filter(
|
|
Vote.election_id == self.election_id
|
|
).all()
|
|
|
|
print(f"✓ {len(self.votes)} votes chargés")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du chargement des votes: {e}")
|
|
return False
|
|
|
|
def verify_blockchain_integrity(self) -> bool:
|
|
"""
|
|
Vérifier l'intégrité de la blockchain.
|
|
|
|
Vérifie:
|
|
- La chaîne de hachage (chaque bloc lie au précédent)
|
|
- L'absence de modification
|
|
- La validité des signatures
|
|
|
|
Returns:
|
|
True si la blockchain est valide
|
|
"""
|
|
print("\n" + "=" * 60)
|
|
print("VÉRIFICATION DE L'INTÉGRITÉ DE LA BLOCKCHAIN")
|
|
print("=" * 60)
|
|
|
|
if not self.blockchain:
|
|
print("✗ Blockchain non chargée")
|
|
return False
|
|
|
|
is_valid = self.blockchain.verify_chain_integrity()
|
|
|
|
if is_valid:
|
|
print("✓ Chaîne de hachage valide")
|
|
print(f"✓ {self.blockchain.get_block_count()} blocs vérifiés")
|
|
print(f"✓ Aucune modification détectée")
|
|
else:
|
|
print("✗ ERREUR: Intégrité compromise!")
|
|
print(" La blockchain a été modifiée")
|
|
|
|
return is_valid
|
|
|
|
def count_votes(self) -> Dict[str, int]:
|
|
"""
|
|
Compter les votes par candidat.
|
|
|
|
Returns:
|
|
Dictionnaire {candidat_name: count}
|
|
"""
|
|
print("\n" + "=" * 60)
|
|
print("DÉPOUILLEMENT DES VOTES")
|
|
print("=" * 60)
|
|
|
|
vote_counts: Dict[str, int] = {}
|
|
|
|
for vote in self.votes:
|
|
candidate = self.db.query(Candidate).filter(
|
|
Candidate.id == vote.candidate_id
|
|
).first()
|
|
|
|
if candidate:
|
|
if candidate.name not in vote_counts:
|
|
vote_counts[candidate.name] = 0
|
|
vote_counts[candidate.name] += 1
|
|
|
|
# Afficher les résultats
|
|
total = sum(vote_counts.values())
|
|
print(f"\nTotal de votes: {total}")
|
|
print()
|
|
|
|
for candidate_name in sorted(vote_counts.keys()):
|
|
count = vote_counts[candidate_name]
|
|
percentage = (count / total * 100) if total > 0 else 0
|
|
bar_length = int(percentage / 2)
|
|
bar = "█" * bar_length + "░" * (50 - bar_length)
|
|
|
|
print(f"{candidate_name:<20} {count:>6} votes ({percentage:>5.1f}%)")
|
|
print(f"{'':20} {bar}")
|
|
|
|
return vote_counts
|
|
|
|
def verify_vote_count_consistency(self, vote_counts: Dict[str, int]) -> bool:
|
|
"""
|
|
Vérifier la cohérence entre la base de données et la blockchain.
|
|
|
|
Args:
|
|
vote_counts: Résultats du dépouillement
|
|
|
|
Returns:
|
|
True si les comptes sont cohérents
|
|
"""
|
|
print("\n" + "=" * 60)
|
|
print("VÉRIFICATION DE LA COHÉRENCE")
|
|
print("=" * 60)
|
|
|
|
blockchain_vote_count = self.blockchain.get_vote_count()
|
|
db_vote_count = len(self.votes)
|
|
|
|
print(f"Votes en base de données: {db_vote_count}")
|
|
print(f"Votes dans la blockchain: {blockchain_vote_count}")
|
|
|
|
if db_vote_count == blockchain_vote_count:
|
|
print("✓ Les comptes sont cohérents")
|
|
return True
|
|
else:
|
|
print("✗ ERREUR: Incohérence détectée!")
|
|
print(f" Différence: {abs(db_vote_count - blockchain_vote_count)} votes")
|
|
return False
|
|
|
|
def generate_report(self, vote_counts: Dict[str, int]) -> dict:
|
|
"""
|
|
Générer un rapport complet de vérification.
|
|
|
|
Args:
|
|
vote_counts: Résultats du dépouillement
|
|
|
|
Returns:
|
|
Rapport complet avec tous les détails
|
|
"""
|
|
print("\n" + "=" * 60)
|
|
print("RAPPORT DE VÉRIFICATION")
|
|
print("=" * 60)
|
|
|
|
blockchain_valid = self.blockchain.verify_chain_integrity()
|
|
total_votes = sum(vote_counts.values())
|
|
|
|
report = {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"election": {
|
|
"id": self.election.id,
|
|
"name": self.election.name,
|
|
"description": self.election.description,
|
|
"start_date": self.election.start_date.isoformat(),
|
|
"end_date": self.election.end_date.isoformat()
|
|
},
|
|
"blockchain": {
|
|
"total_blocks": self.blockchain.get_block_count(),
|
|
"total_votes": self.blockchain.get_vote_count(),
|
|
"chain_valid": blockchain_valid,
|
|
"genesis_block": {
|
|
"index": 0,
|
|
"hash": self.blockchain.chain[0].block_hash if self.blockchain.chain else None,
|
|
"timestamp": self.blockchain.chain[0].timestamp if self.blockchain.chain else None
|
|
}
|
|
},
|
|
"results": {
|
|
"total_votes": total_votes,
|
|
"candidates": []
|
|
},
|
|
"verification": {
|
|
"blockchain_integrity": blockchain_valid,
|
|
"vote_count_consistency": len(self.votes) == self.blockchain.get_vote_count(),
|
|
"status": "VALID" if (blockchain_valid and len(self.votes) == self.blockchain.get_vote_count()) else "INVALID"
|
|
}
|
|
}
|
|
|
|
# Ajouter les résultats par candidat
|
|
for candidate_name in sorted(vote_counts.keys()):
|
|
count = vote_counts[candidate_name]
|
|
percentage = (count / total_votes * 100) if total_votes > 0 else 0
|
|
|
|
report["results"]["candidates"].append({
|
|
"name": candidate_name,
|
|
"votes": count,
|
|
"percentage": round(percentage, 2)
|
|
})
|
|
|
|
# Afficher le résumé
|
|
print(f"\nÉlection: {self.election.name}")
|
|
print(f"Votes valides: {total_votes}")
|
|
print(f"Intégrité blockchain: {'✓ VALIDE' if blockchain_valid else '✗ INVALIDE'}")
|
|
print(f"Cohérence votes: {'✓ COHÉRENTE' if report['verification']['vote_count_consistency'] else '✗ INCOHÉRENTE'}")
|
|
print(f"\nStatut général: {report['verification']['status']}")
|
|
|
|
return report
|
|
|
|
def export_report(self, report: dict, filename: str = None) -> str:
|
|
"""
|
|
Exporter le rapport en JSON.
|
|
|
|
Args:
|
|
report: Rapport à exporter
|
|
filename: Nom du fichier (si None, génère automatiquement)
|
|
|
|
Returns:
|
|
Chemin du fichier exporté
|
|
"""
|
|
if filename is None:
|
|
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
|
|
filename = f"election_{self.election_id}_report_{timestamp}.json"
|
|
|
|
try:
|
|
with open(filename, "w") as f:
|
|
json.dump(report, f, indent=2, default=str)
|
|
|
|
print(f"\n✓ Rapport exporté: {filename}")
|
|
return filename
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors de l'export: {e}")
|
|
return ""
|
|
|
|
def close(self):
|
|
"""Fermer la session de base de données."""
|
|
self.db.close()
|
|
|
|
def run_full_scrutiny(self) -> Tuple[bool, dict]:
|
|
"""
|
|
Exécuter le dépouillement complet.
|
|
|
|
Returns:
|
|
(success, report) - Tuple avec succès et rapport
|
|
"""
|
|
print("\n" + "█" * 60)
|
|
print("█ DÉMARRAGE DU DÉPOUILLEMENT ÉLECTORAL")
|
|
print("█" * 60)
|
|
|
|
# 1. Charger les données
|
|
if not self.load_election():
|
|
return False, {}
|
|
|
|
if not self.load_blockchain():
|
|
return False, {}
|
|
|
|
if not self.load_votes():
|
|
return False, {}
|
|
|
|
# 2. Vérifier l'intégrité
|
|
blockchain_valid = self.verify_blockchain_integrity()
|
|
|
|
# 3. Compter les votes
|
|
vote_counts = self.count_votes()
|
|
|
|
# 4. Vérifier la cohérence
|
|
consistency_valid = self.verify_vote_count_consistency(vote_counts)
|
|
|
|
# 5. Générer le rapport
|
|
report = self.generate_report(vote_counts)
|
|
|
|
print("\n" + "█" * 60)
|
|
print("█ DÉPOUILLEMENT TERMINÉ")
|
|
print("█" * 60 + "\n")
|
|
|
|
success = blockchain_valid and consistency_valid
|
|
|
|
return success, report
|
|
|
|
|
|
def main():
|
|
"""Entrée principale du scrutateur."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Scrutateur - Vote counting and verification"
|
|
)
|
|
parser.add_argument(
|
|
"--election-id",
|
|
type=int,
|
|
required=True,
|
|
help="ID de l'élection à dépouiller"
|
|
)
|
|
parser.add_argument(
|
|
"--verify",
|
|
action="store_true",
|
|
help="Vérifier l'intégrité de la blockchain"
|
|
)
|
|
parser.add_argument(
|
|
"--count",
|
|
action="store_true",
|
|
help="Compter les votes"
|
|
)
|
|
parser.add_argument(
|
|
"--report",
|
|
action="store_true",
|
|
help="Générer un rapport complet"
|
|
)
|
|
parser.add_argument(
|
|
"--export",
|
|
type=str,
|
|
help="Exporter le rapport en JSON"
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
scrutator = Scrutator(args.election_id)
|
|
|
|
try:
|
|
if args.verify or args.count or args.report or args.export:
|
|
# Mode spécifique
|
|
if not scrutator.load_election() or not scrutator.load_blockchain():
|
|
return
|
|
|
|
if args.verify:
|
|
scrutator.verify_blockchain_integrity()
|
|
|
|
if args.count or args.report or args.export:
|
|
if not scrutator.load_votes():
|
|
return
|
|
|
|
vote_counts = scrutator.count_votes()
|
|
|
|
if args.report or args.export:
|
|
report = scrutator.generate_report(vote_counts)
|
|
|
|
if args.export:
|
|
scrutator.export_report(report, args.export)
|
|
else:
|
|
# Mode complet
|
|
success, report = scrutator.run_full_scrutiny()
|
|
|
|
if report:
|
|
# Export par défaut
|
|
scrutator.export_report(report)
|
|
|
|
exit(0 if success else 1)
|
|
finally:
|
|
scrutator.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|