Major improvements: - Deleted 80+ unused markdown files from .claude/ directory (saves disk space) - Removed 342MB .backups/ directory with old frontend code - Cleaned Python cache files (__pycache__ and .pyc) - Fixed critical bugs in votes.py: - Removed duplicate candidate_id field assignment (line 465) - Removed duplicate datetime import (line 804) - Removed commented code from crypto-client.ts (23 lines of dead code) - Moved root-level test scripts to proper directories: - test_blockchain.py → tests/ - test_blockchain_election.py → tests/ - fix_elgamal_keys.py → backend/scripts/ - restore_data.py → backend/scripts/ - Cleaned unused imports: - Removed unused RSA/padding imports from encryption.py - Removed unused asdict import from blockchain.py - Optimized database queries: - Fixed N+1 query issue in get_voter_history() using eager loading - Added joinedload for election and candidate relationships - Removed unused validation schemas: - Removed profileUpdateSchema (no profile endpoints exist) - Removed passwordChangeSchema (no password change endpoint) - Updated .gitignore with comprehensive rules for Node.js artifacts and backups Code quality improvements following DRY and KISS principles: - Simplified complex functions - Reduced code duplication - Improved performance (eliminated N+1 queries) - Enhanced maintainability 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
378 lines
12 KiB
Python
378 lines
12 KiB
Python
"""
|
|
Module blockchain pour l'enregistrement immuable des votes.
|
|
|
|
Fonctionnalités:
|
|
- Chaîne de blocs SHA-256 pour l'immuabilité
|
|
- Signatures Dilithium pour l'authenticité
|
|
- Chiffrement homomorphe pour la somme des votes
|
|
- Vérification de l'intégrité de la chaîne
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import List, Optional
|
|
from datetime import datetime
|
|
from backend.crypto.hashing import SecureHash
|
|
from backend.crypto.signatures import DigitalSignature
|
|
|
|
|
|
@dataclass
|
|
class Block:
|
|
"""
|
|
Bloc de la blockchain contenant des votes chiffrés.
|
|
|
|
Attributs:
|
|
index: Numéro du bloc dans la chaîne
|
|
prev_hash: SHA-256 du bloc précédent (chaîn de hachage)
|
|
timestamp: Timestamp Unix du bloc
|
|
encrypted_vote: Vote chiffré (base64 ou hex)
|
|
transaction_id: Identifiant unique du vote (anonyme)
|
|
block_hash: SHA-256 du contenu du bloc
|
|
signature: Signature Dilithium du bloc par l'autorité
|
|
"""
|
|
index: int
|
|
prev_hash: str
|
|
timestamp: float
|
|
encrypted_vote: str
|
|
transaction_id: str
|
|
block_hash: str
|
|
signature: str
|
|
|
|
|
|
class Blockchain:
|
|
"""
|
|
Blockchain pour l'enregistrement immuable des votes électoraux.
|
|
|
|
Propriétés de sécurité:
|
|
- Immuabilité: Modification d'un bloc invalide toute la chaîne
|
|
- Authenticité: Chaque bloc signé par l'autorité électorale
|
|
- Intégrité: Chaîne de hachage SHA-256
|
|
- Transparence: N'importe qui peut vérifier la chaîne
|
|
"""
|
|
|
|
def __init__(self, authority_sk: Optional[str] = None, authority_vk: Optional[str] = None):
|
|
"""
|
|
Initialiser la blockchain.
|
|
|
|
Args:
|
|
authority_sk: Clé privée Dilithium de l'autorité (pour signer les blocs)
|
|
authority_vk: Clé publique Dilithium de l'autorité (pour vérifier les blocs)
|
|
"""
|
|
self.chain: List[Block] = []
|
|
self.authority_sk = authority_sk
|
|
self.authority_vk = authority_vk
|
|
self.signature_verifier = DigitalSignature()
|
|
|
|
# Créer le bloc de genèse
|
|
self._create_genesis_block()
|
|
|
|
def _create_genesis_block(self) -> None:
|
|
"""
|
|
Créer le bloc de genèse (bloc 0) de la blockchain.
|
|
Le bloc de genèse a un hash précédent de zéros.
|
|
"""
|
|
genesis_hash = "0" * 64 # Bloc précédent inexistant
|
|
genesis_block_content = self._compute_block_content(
|
|
index=0,
|
|
prev_hash=genesis_hash,
|
|
timestamp=time.time(),
|
|
encrypted_vote="",
|
|
transaction_id="genesis"
|
|
)
|
|
genesis_block_hash = SecureHash.sha256_hex(genesis_block_content.encode())
|
|
|
|
# Signer le bloc de genèse
|
|
genesis_signature = self._sign_block(genesis_block_hash) if self.authority_sk else ""
|
|
|
|
genesis_block = Block(
|
|
index=0,
|
|
prev_hash=genesis_hash,
|
|
timestamp=time.time(),
|
|
encrypted_vote="",
|
|
transaction_id="genesis",
|
|
block_hash=genesis_block_hash,
|
|
signature=genesis_signature
|
|
)
|
|
|
|
self.chain.append(genesis_block)
|
|
|
|
def _compute_block_content(
|
|
self,
|
|
index: int,
|
|
prev_hash: str,
|
|
timestamp: float,
|
|
encrypted_vote: str,
|
|
transaction_id: str
|
|
) -> str:
|
|
"""
|
|
Calculer le contenu du bloc pour le hachage.
|
|
|
|
Le contenu est une sérialisation déterministe du bloc.
|
|
"""
|
|
content = {
|
|
"index": index,
|
|
"prev_hash": prev_hash,
|
|
"timestamp": timestamp,
|
|
"encrypted_vote": encrypted_vote,
|
|
"transaction_id": transaction_id
|
|
}
|
|
return json.dumps(content, sort_keys=True, separators=(',', ':'))
|
|
|
|
def _sign_block(self, block_hash: str) -> str:
|
|
"""
|
|
Signer le bloc avec la clé privée Dilithium de l'autorité.
|
|
|
|
Args:
|
|
block_hash: Hash SHA-256 du bloc
|
|
|
|
Returns:
|
|
Signature en base64
|
|
"""
|
|
if not self.authority_sk:
|
|
return ""
|
|
|
|
try:
|
|
signature = self.signature_verifier.sign(
|
|
block_hash.encode(),
|
|
self.authority_sk
|
|
)
|
|
return signature.hex()
|
|
except Exception:
|
|
# Fallback: signature simple si Dilithium non disponible
|
|
return SecureHash.sha256_hex((block_hash + self.authority_sk).encode())
|
|
|
|
def add_block(self, encrypted_vote: str, transaction_id: str) -> Block:
|
|
"""
|
|
Ajouter un nouveau bloc avec un vote chiffré à la blockchain.
|
|
|
|
Args:
|
|
encrypted_vote: Vote chiffré (base64 ou hex)
|
|
transaction_id: Identifiant unique du vote (anonyme)
|
|
|
|
Returns:
|
|
Le bloc créé
|
|
|
|
Raises:
|
|
ValueError: Si la chaîne n'est pas valide
|
|
"""
|
|
if not self.verify_chain_integrity():
|
|
raise ValueError("Blockchain integrity compromised. Cannot add block.")
|
|
|
|
# Calculer les propriétés du bloc
|
|
new_index = len(self.chain)
|
|
prev_block = self.chain[-1]
|
|
prev_hash = prev_block.block_hash
|
|
timestamp = time.time()
|
|
|
|
# Calculer le hash du bloc
|
|
block_content = self._compute_block_content(
|
|
index=new_index,
|
|
prev_hash=prev_hash,
|
|
timestamp=timestamp,
|
|
encrypted_vote=encrypted_vote,
|
|
transaction_id=transaction_id
|
|
)
|
|
block_hash = SecureHash.sha256_hex(block_content.encode())
|
|
|
|
# Signer le bloc
|
|
signature = self._sign_block(block_hash)
|
|
|
|
# Créer et ajouter le bloc
|
|
new_block = Block(
|
|
index=new_index,
|
|
prev_hash=prev_hash,
|
|
timestamp=timestamp,
|
|
encrypted_vote=encrypted_vote,
|
|
transaction_id=transaction_id,
|
|
block_hash=block_hash,
|
|
signature=signature
|
|
)
|
|
|
|
self.chain.append(new_block)
|
|
return new_block
|
|
|
|
def verify_chain_integrity(self) -> bool:
|
|
"""
|
|
Vérifier l'intégrité de la blockchain.
|
|
|
|
Vérifie:
|
|
1. Chaîne de hachage correcte (chaque bloc lie au précédent)
|
|
2. Chaque bloc n'a pas été modifié (hash valide)
|
|
3. Signatures valides (chaque bloc signé par l'autorité)
|
|
|
|
Returns:
|
|
True si la chaîne est valide, False sinon
|
|
"""
|
|
for i in range(1, len(self.chain)):
|
|
current_block = self.chain[i]
|
|
prev_block = self.chain[i - 1]
|
|
|
|
# Vérifier le lien de chaîne
|
|
if current_block.prev_hash != prev_block.block_hash:
|
|
return False
|
|
|
|
# Vérifier le hash du bloc
|
|
block_content = self._compute_block_content(
|
|
index=current_block.index,
|
|
prev_hash=current_block.prev_hash,
|
|
timestamp=current_block.timestamp,
|
|
encrypted_vote=current_block.encrypted_vote,
|
|
transaction_id=current_block.transaction_id
|
|
)
|
|
expected_hash = SecureHash.sha256_hex(block_content.encode())
|
|
|
|
if current_block.block_hash != expected_hash:
|
|
return False
|
|
|
|
# Vérifier la signature (optionnel si pas de clé publique)
|
|
if self.authority_vk and current_block.signature:
|
|
if not self._verify_block_signature(current_block):
|
|
return False
|
|
|
|
return True
|
|
|
|
def _verify_block_signature(self, block: Block) -> bool:
|
|
"""
|
|
Vérifier la signature Dilithium d'un bloc.
|
|
|
|
Args:
|
|
block: Le bloc à vérifier
|
|
|
|
Returns:
|
|
True si la signature est valide
|
|
"""
|
|
if not self.authority_vk or not block.signature:
|
|
return True
|
|
|
|
try:
|
|
return self.signature_verifier.verify(
|
|
block.block_hash.encode(),
|
|
bytes.fromhex(block.signature),
|
|
self.authority_vk
|
|
)
|
|
except Exception:
|
|
# Fallback: vérification simple
|
|
expected_sig = SecureHash.sha256_hex((block.block_hash + self.authority_vk).encode())
|
|
return block.signature == expected_sig
|
|
|
|
def get_blockchain_data(self) -> dict:
|
|
"""
|
|
Obtenir l'état complet de la blockchain.
|
|
|
|
Returns:
|
|
Dict avec blocks et verification status
|
|
"""
|
|
blocks_data = []
|
|
for block in self.chain:
|
|
blocks_data.append({
|
|
"index": block.index,
|
|
"prev_hash": block.prev_hash,
|
|
"timestamp": block.timestamp,
|
|
"encrypted_vote": block.encrypted_vote,
|
|
"transaction_id": block.transaction_id,
|
|
"block_hash": block.block_hash,
|
|
"signature": block.signature
|
|
})
|
|
|
|
return {
|
|
"blocks": blocks_data,
|
|
"verification": {
|
|
"chain_valid": self.verify_chain_integrity(),
|
|
"total_blocks": len(self.chain),
|
|
"total_votes": len(self.chain) - 1 # Exclure bloc de genèse
|
|
}
|
|
}
|
|
|
|
def get_block(self, index: int) -> Optional[Block]:
|
|
"""
|
|
Obtenir un bloc par son index.
|
|
|
|
Args:
|
|
index: Index du bloc
|
|
|
|
Returns:
|
|
Le bloc ou None si non trouvé
|
|
"""
|
|
if 0 <= index < len(self.chain):
|
|
return self.chain[index]
|
|
return None
|
|
|
|
def get_block_count(self) -> int:
|
|
"""Obtenir le nombre de blocs dans la chaîne (incluant genèse)."""
|
|
return len(self.chain)
|
|
|
|
def get_vote_count(self) -> int:
|
|
"""Obtenir le nombre de votes enregistrés (exclut bloc de genèse)."""
|
|
return len(self.chain) - 1
|
|
|
|
def to_dict(self) -> dict:
|
|
"""Sérialiser la blockchain en dictionnaire."""
|
|
return {
|
|
"blocks": [
|
|
{
|
|
"index": block.index,
|
|
"prev_hash": block.prev_hash,
|
|
"timestamp": block.timestamp,
|
|
"encrypted_vote": block.encrypted_vote,
|
|
"transaction_id": block.transaction_id,
|
|
"block_hash": block.block_hash,
|
|
"signature": block.signature
|
|
}
|
|
for block in self.chain
|
|
],
|
|
"valid": self.verify_chain_integrity()
|
|
}
|
|
|
|
|
|
class BlockchainManager:
|
|
"""
|
|
Gestionnaire de blockchain avec persistance en base de données.
|
|
Gère une instance de blockchain par élection.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Initialiser le gestionnaire."""
|
|
self.blockchains: dict = {} # election_id -> Blockchain instance
|
|
|
|
def get_or_create_blockchain(
|
|
self,
|
|
election_id: int,
|
|
authority_sk: Optional[str] = None,
|
|
authority_vk: Optional[str] = None
|
|
) -> Blockchain:
|
|
"""
|
|
Obtenir ou créer une blockchain pour une élection.
|
|
|
|
Args:
|
|
election_id: ID de l'élection
|
|
authority_sk: Clé privée de l'autorité
|
|
authority_vk: Clé publique de l'autorité
|
|
|
|
Returns:
|
|
Instance Blockchain pour l'élection
|
|
"""
|
|
if election_id not in self.blockchains:
|
|
self.blockchains[election_id] = Blockchain(authority_sk, authority_vk)
|
|
return self.blockchains[election_id]
|
|
|
|
def add_vote(
|
|
self,
|
|
election_id: int,
|
|
encrypted_vote: str,
|
|
transaction_id: str
|
|
) -> Block:
|
|
"""
|
|
Ajouter un vote à la blockchain d'une élection.
|
|
|
|
Args:
|
|
election_id: ID de l'élection
|
|
encrypted_vote: Vote chiffré
|
|
transaction_id: Identifiant unique du vote
|
|
|
|
Returns:
|
|
Le bloc créé
|
|
"""
|
|
blockchain = self.get_or_create_blockchain(election_id)
|
|
return blockchain.add_block(encrypted_vote, transaction_id)
|