""" Module blockchain pour l'enregistrement immuable des votes. Fonctionnalités: - Chaîne de blocs SHA-256 pour l'immuabilité - Signatures Dilithium pour l'authenticité - Chiffrement homomorphe pour la somme des votes - Vérification de l'intégrité de la chaîne """ import json import time from dataclasses import dataclass from typing import List, Optional from datetime import datetime from backend.crypto.hashing import SecureHash from backend.crypto.signatures import DigitalSignature @dataclass class Block: """ Bloc de la blockchain contenant des votes chiffrés. Attributs: index: Numéro du bloc dans la chaîne prev_hash: SHA-256 du bloc précédent (chaîn de hachage) timestamp: Timestamp Unix du bloc encrypted_vote: Vote chiffré (base64 ou hex) transaction_id: Identifiant unique du vote (anonyme) block_hash: SHA-256 du contenu du bloc signature: Signature Dilithium du bloc par l'autorité """ index: int prev_hash: str timestamp: float encrypted_vote: str transaction_id: str block_hash: str signature: str class Blockchain: """ Blockchain pour l'enregistrement immuable des votes électoraux. Propriétés de sécurité: - Immuabilité: Modification d'un bloc invalide toute la chaîne - Authenticité: Chaque bloc signé par l'autorité électorale - Intégrité: Chaîne de hachage SHA-256 - Transparence: N'importe qui peut vérifier la chaîne """ def __init__(self, authority_sk: Optional[str] = None, authority_vk: Optional[str] = None): """ Initialiser la blockchain. Args: authority_sk: Clé privée Dilithium de l'autorité (pour signer les blocs) authority_vk: Clé publique Dilithium de l'autorité (pour vérifier les blocs) """ self.chain: List[Block] = [] self.authority_sk = authority_sk self.authority_vk = authority_vk self.signature_verifier = DigitalSignature() # Créer le bloc de genèse self._create_genesis_block() def _create_genesis_block(self) -> None: """ Créer le bloc de genèse (bloc 0) de la blockchain. Le bloc de genèse a un hash précédent de zéros. """ genesis_hash = "0" * 64 # Bloc précédent inexistant genesis_block_content = self._compute_block_content( index=0, prev_hash=genesis_hash, timestamp=time.time(), encrypted_vote="", transaction_id="genesis" ) genesis_block_hash = SecureHash.sha256_hex(genesis_block_content.encode()) # Signer le bloc de genèse genesis_signature = self._sign_block(genesis_block_hash) if self.authority_sk else "" genesis_block = Block( index=0, prev_hash=genesis_hash, timestamp=time.time(), encrypted_vote="", transaction_id="genesis", block_hash=genesis_block_hash, signature=genesis_signature ) self.chain.append(genesis_block) def _compute_block_content( self, index: int, prev_hash: str, timestamp: float, encrypted_vote: str, transaction_id: str ) -> str: """ Calculer le contenu du bloc pour le hachage. Le contenu est une sérialisation déterministe du bloc. """ content = { "index": index, "prev_hash": prev_hash, "timestamp": timestamp, "encrypted_vote": encrypted_vote, "transaction_id": transaction_id } return json.dumps(content, sort_keys=True, separators=(',', ':')) def _sign_block(self, block_hash: str) -> str: """ Signer le bloc avec la clé privée Dilithium de l'autorité. Args: block_hash: Hash SHA-256 du bloc Returns: Signature en base64 """ if not self.authority_sk: return "" try: signature = self.signature_verifier.sign( block_hash.encode(), self.authority_sk ) return signature.hex() except Exception: # Fallback: signature simple si Dilithium non disponible return SecureHash.sha256_hex((block_hash + self.authority_sk).encode()) def add_block(self, encrypted_vote: str, transaction_id: str) -> Block: """ Ajouter un nouveau bloc avec un vote chiffré à la blockchain. Args: encrypted_vote: Vote chiffré (base64 ou hex) transaction_id: Identifiant unique du vote (anonyme) Returns: Le bloc créé Raises: ValueError: Si la chaîne n'est pas valide """ if not self.verify_chain_integrity(): raise ValueError("Blockchain integrity compromised. Cannot add block.") # Calculer les propriétés du bloc new_index = len(self.chain) prev_block = self.chain[-1] prev_hash = prev_block.block_hash timestamp = time.time() # Calculer le hash du bloc block_content = self._compute_block_content( index=new_index, prev_hash=prev_hash, timestamp=timestamp, encrypted_vote=encrypted_vote, transaction_id=transaction_id ) block_hash = SecureHash.sha256_hex(block_content.encode()) # Signer le bloc signature = self._sign_block(block_hash) # Créer et ajouter le bloc new_block = Block( index=new_index, prev_hash=prev_hash, timestamp=timestamp, encrypted_vote=encrypted_vote, transaction_id=transaction_id, block_hash=block_hash, signature=signature ) self.chain.append(new_block) return new_block def verify_chain_integrity(self) -> bool: """ Vérifier l'intégrité de la blockchain. Vérifie: 1. Chaîne de hachage correcte (chaque bloc lie au précédent) 2. Chaque bloc n'a pas été modifié (hash valide) 3. Signatures valides (chaque bloc signé par l'autorité) Returns: True si la chaîne est valide, False sinon """ for i in range(1, len(self.chain)): current_block = self.chain[i] prev_block = self.chain[i - 1] # Vérifier le lien de chaîne if current_block.prev_hash != prev_block.block_hash: return False # Vérifier le hash du bloc block_content = self._compute_block_content( index=current_block.index, prev_hash=current_block.prev_hash, timestamp=current_block.timestamp, encrypted_vote=current_block.encrypted_vote, transaction_id=current_block.transaction_id ) expected_hash = SecureHash.sha256_hex(block_content.encode()) if current_block.block_hash != expected_hash: return False # Vérifier la signature (optionnel si pas de clé publique) if self.authority_vk and current_block.signature: if not self._verify_block_signature(current_block): return False return True def _verify_block_signature(self, block: Block) -> bool: """ Vérifier la signature Dilithium d'un bloc. Args: block: Le bloc à vérifier Returns: True si la signature est valide """ if not self.authority_vk or not block.signature: return True try: return self.signature_verifier.verify( block.block_hash.encode(), bytes.fromhex(block.signature), self.authority_vk ) except Exception: # Fallback: vérification simple expected_sig = SecureHash.sha256_hex((block.block_hash + self.authority_vk).encode()) return block.signature == expected_sig def get_blockchain_data(self) -> dict: """ Obtenir l'état complet de la blockchain. Returns: Dict avec blocks et verification status """ blocks_data = [] for block in self.chain: blocks_data.append({ "index": block.index, "prev_hash": block.prev_hash, "timestamp": block.timestamp, "encrypted_vote": block.encrypted_vote, "transaction_id": block.transaction_id, "block_hash": block.block_hash, "signature": block.signature }) return { "blocks": blocks_data, "verification": { "chain_valid": self.verify_chain_integrity(), "total_blocks": len(self.chain), "total_votes": len(self.chain) - 1 # Exclure bloc de genèse } } def get_block(self, index: int) -> Optional[Block]: """ Obtenir un bloc par son index. Args: index: Index du bloc Returns: Le bloc ou None si non trouvé """ if 0 <= index < len(self.chain): return self.chain[index] return None def get_block_count(self) -> int: """Obtenir le nombre de blocs dans la chaîne (incluant genèse).""" return len(self.chain) def get_vote_count(self) -> int: """Obtenir le nombre de votes enregistrés (exclut bloc de genèse).""" return len(self.chain) - 1 def to_dict(self) -> dict: """Sérialiser la blockchain en dictionnaire.""" return { "blocks": [ { "index": block.index, "prev_hash": block.prev_hash, "timestamp": block.timestamp, "encrypted_vote": block.encrypted_vote, "transaction_id": block.transaction_id, "block_hash": block.block_hash, "signature": block.signature } for block in self.chain ], "valid": self.verify_chain_integrity() } class BlockchainManager: """ Gestionnaire de blockchain avec persistance en base de données. Gère une instance de blockchain par élection. """ def __init__(self): """Initialiser le gestionnaire.""" self.blockchains: dict = {} # election_id -> Blockchain instance def get_or_create_blockchain( self, election_id: int, authority_sk: Optional[str] = None, authority_vk: Optional[str] = None ) -> Blockchain: """ Obtenir ou créer une blockchain pour une élection. Args: election_id: ID de l'élection authority_sk: Clé privée de l'autorité authority_vk: Clé publique de l'autorité Returns: Instance Blockchain pour l'élection """ if election_id not in self.blockchains: self.blockchains[election_id] = Blockchain(authority_sk, authority_vk) return self.blockchains[election_id] def add_vote( self, election_id: int, encrypted_vote: str, transaction_id: str ) -> Block: """ Ajouter un vote à la blockchain d'une élection. Args: election_id: ID de l'élection encrypted_vote: Vote chiffré transaction_id: Identifiant unique du vote Returns: Le bloc créé """ blockchain = self.get_or_create_blockchain(election_id) return blockchain.add_block(encrypted_vote, transaction_id)