Alexis Bruneteau 6cd555a552 feat: Add transaction broadcasting between PoA validators
Problem: Votes were being submitted to one validator but not shared with
other validators, preventing them from being included in blocks.

Root cause: When a validator received a transaction via eth_sendTransaction,
it added it to its pending_transactions pool but did NOT broadcast it to
peer validators. Only blocks were being broadcast.

This meant:
- validator-1 receives vote → adds to pending_transactions
- validator-2 (responsible for next block) never receives the vote
- validator-2 can't include vote in block because it doesn't know about it
- Result: votes sit in pending queue forever

Solution:
- Add broadcast_transaction() method following same pattern as broadcast_block()
- Broadcast transaction to all known peers via /p2p/new_transaction endpoint
- Call broadcast on receipt of each transaction
- Peer validators receive and add to their pending_transactions pool
- All validators now have same pending transactions
- Any validator can create blocks with all pending transactions

The /p2p/new_transaction endpoint already existed, so validators can now
receive and process transactions from peers.

This fixes the issue where votes were submitted successfully but never
appeared on the blockchain.

🤖 Generated with Claude Code

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-07 17:00:14 +01:00

755 lines
25 KiB
Python

"""
PoA Validator Node - Blockchain Consensus & Management
This service implements a Proof-of-Authority validator that:
- Participates in PoA consensus (round-robin block creation)
- Maintains a distributed ledger of votes
- Communicates with other validators via P2P
- Exposes JSON-RPC interface for vote submission
- Registers with bootnode for peer discovery
Features:
- Block creation and validation
- Transaction pool management
- Peer synchronization
- JSON-RPC endpoints (eth_sendTransaction, eth_getTransactionReceipt, etc.)
- P2P networking with gossip
"""
import os
import sys
import json
import time
import logging
import asyncio
import hashlib
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime, timezone
import aiohttp
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - [%(name)s] - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# ============================================================================
# Data Models
# ============================================================================
class Transaction(BaseModel):
"""Vote transaction"""
voter_id: str
election_id: int
encrypted_vote: str
ballot_hash: str
proof: Optional[str] = None
timestamp: int
class Block:
"""Blockchain block"""
def __init__(
self,
index: int,
prev_hash: str,
timestamp: int,
transactions: List[Transaction],
validator: str,
block_hash: Optional[str] = None,
signature: Optional[str] = None
):
self.index = index
self.prev_hash = prev_hash
self.timestamp = timestamp
self.transactions = transactions
self.validator = validator
self.block_hash = block_hash
self.signature = signature
def to_dict(self) -> Dict[str, Any]:
"""Convert block to dictionary"""
return {
"index": self.index,
"prev_hash": self.prev_hash,
"timestamp": self.timestamp,
"transactions": [
{
"voter_id": t.voter_id,
"election_id": t.election_id,
"encrypted_vote": t.encrypted_vote,
"ballot_hash": t.ballot_hash,
"proof": t.proof,
"timestamp": t.timestamp
}
for t in self.transactions
],
"validator": self.validator,
"block_hash": self.block_hash,
"signature": self.signature
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> 'Block':
"""Create block from dictionary"""
transactions = [
Transaction(**tx) for tx in data.get("transactions", [])
]
return Block(
index=data["index"],
prev_hash=data["prev_hash"],
timestamp=data["timestamp"],
transactions=transactions,
validator=data["validator"],
block_hash=data.get("block_hash"),
signature=data.get("signature")
)
class Blockchain:
"""Blockchain state management"""
# Genesis block configuration
GENESIS_INDEX = 0
GENESIS_PREV_HASH = "0" * 64
GENESIS_TIMESTAMP = 1699360000
AUTHORIZED_VALIDATORS = [
"validator-1",
"validator-2",
"validator-3"
]
def __init__(self):
self.chain: List[Block] = []
self._create_genesis_block()
def _create_genesis_block(self) -> None:
"""Create the genesis block"""
genesis_block = Block(
index=self.GENESIS_INDEX,
prev_hash=self.GENESIS_PREV_HASH,
timestamp=self.GENESIS_TIMESTAMP,
transactions=[],
validator="genesis",
block_hash="0x" + hashlib.sha256(
json.dumps({
"index": self.GENESIS_INDEX,
"prev_hash": self.GENESIS_PREV_HASH,
"timestamp": self.GENESIS_TIMESTAMP,
"transactions": []
}, sort_keys=True).encode()
).hexdigest(),
signature="genesis"
)
self.chain.append(genesis_block)
logger.info("Genesis block created")
def add_block(self, block: Block) -> bool:
"""Add a block to the chain"""
if not self.validate_block(block):
logger.error(f"Block validation failed: {block.index}")
return False
self.chain.append(block)
logger.info(
f"Block {block.index} added to chain "
f"(validator: {block.validator}, txs: {len(block.transactions)})"
)
return True
def validate_block(self, block: Block) -> bool:
"""Validate a block"""
# Check block index
if block.index != len(self.chain):
logger.warning(f"Invalid block index: {block.index}, expected {len(self.chain)}")
return False
# Check previous hash
prev_block = self.chain[-1]
if block.prev_hash != prev_block.block_hash:
logger.warning(f"Invalid prev_hash for block {block.index}")
return False
# Check validator is authorized
if block.validator not in self.AUTHORIZED_VALIDATORS and block.validator != "genesis":
logger.warning(f"Unauthorized validator: {block.validator}")
return False
# Check block hash
calculated_hash = self.calculate_block_hash(block)
if block.block_hash != calculated_hash:
logger.warning(f"Invalid block hash for block {block.index}")
return False
return True
@staticmethod
def calculate_block_hash(block: Block) -> str:
"""Calculate hash for a block"""
block_data = {
"index": block.index,
"prev_hash": block.prev_hash,
"timestamp": block.timestamp,
"transactions": [
{
"voter_id": t.voter_id,
"election_id": t.election_id,
"encrypted_vote": t.encrypted_vote,
"ballot_hash": t.ballot_hash,
"proof": t.proof,
"timestamp": t.timestamp
}
for t in block.transactions
],
"validator": block.validator
}
block_json = json.dumps(block_data, sort_keys=True)
return "0x" + hashlib.sha256(block_json.encode()).hexdigest()
def get_block(self, index: int) -> Optional[Block]:
"""Get block by index"""
if 0 <= index < len(self.chain):
return self.chain[index]
return None
def get_latest_block(self) -> Block:
"""Get the latest block"""
return self.chain[-1]
def get_chain_length(self) -> int:
"""Get blockchain length"""
return len(self.chain)
def verify_integrity(self) -> bool:
"""Verify blockchain integrity"""
for i in range(1, len(self.chain)):
current = self.chain[i]
previous = self.chain[i - 1]
if current.prev_hash != previous.block_hash:
logger.error(f"Chain integrity broken at block {i}")
return False
if current.block_hash != Blockchain.calculate_block_hash(current):
logger.error(f"Block hash mismatch at block {i}")
return False
return True
# ============================================================================
# PoA Validator
# ============================================================================
class PoAValidator:
"""Proof-of-Authority Validator Node"""
def __init__(
self,
node_id: str,
private_key: str,
bootnode_url: str,
rpc_port: int = 8001,
p2p_port: int = 30303
):
self.node_id = node_id
self.private_key = private_key
self.bootnode_url = bootnode_url
self.rpc_port = rpc_port
self.p2p_port = p2p_port
self.public_key = f"0x{uuid.uuid4().hex[:40]}"
# State management
self.blockchain = Blockchain()
self.pending_transactions: List[Transaction] = []
self.peer_connections: Dict[str, str] = {} # node_id -> url
self.transaction_pool: Dict[str, Transaction] = {} # tx_id -> transaction
# Block creation state
self.last_block_time = time.time()
self.block_creation_interval = 5 # seconds
logger.info(f"PoAValidator initialized: {node_id}")
async def startup(self) -> None:
"""Initialize and connect to network"""
logger.info(f"Validator {self.node_id} starting up...")
try:
# 1. Register with bootnode
await self.register_with_bootnode()
# 2. Discover peers
await self.discover_peers()
# 3. Start block creation task
asyncio.create_task(self.block_creation_loop())
logger.info(f"Validator {self.node_id} is ready")
except Exception as e:
logger.error(f"Startup error: {e}")
raise
async def register_with_bootnode(self) -> None:
"""Register this validator with the bootnode"""
try:
payload = {
"node_id": self.node_id,
"ip": self.node_id, # Docker service name
"p2p_port": self.p2p_port,
"rpc_port": self.rpc_port,
"public_key": self.public_key
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{self.bootnode_url}/register_peer",
json=payload,
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
logger.info(f"Registered with bootnode: {self.node_id}")
else:
logger.error(f"Failed to register with bootnode: {resp.status}")
except Exception as e:
logger.error(f"Error registering with bootnode: {e}")
raise
async def discover_peers(self) -> None:
"""Discover other validators from bootnode"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
f"{self.bootnode_url}/discover?node_id={self.node_id}",
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
data = await resp.json()
peers = data.get("peers", [])
for peer in peers:
self.peer_connections[peer["node_id"]] = (
f"http://{peer['ip']}:{peer['rpc_port']}"
)
logger.info(
f"Discovered {len(peers)} peers: "
f"{list(self.peer_connections.keys())}"
)
else:
logger.error(f"Failed to discover peers: {resp.status}")
except Exception as e:
logger.error(f"Error discovering peers: {e}")
def should_create_block(self) -> bool:
"""Determine if this validator should create the next block"""
next_block_index = self.blockchain.get_chain_length()
authorized = Blockchain.AUTHORIZED_VALIDATORS
validator_index = authorized.index(self.node_id) if self.node_id in authorized else -1
if validator_index == -1:
return False
should_create = next_block_index % len(authorized) == validator_index
return should_create
async def block_creation_loop(self) -> None:
"""Main loop for creating blocks"""
while True:
try:
current_time = time.time()
time_since_last = current_time - self.last_block_time
if time_since_last >= self.block_creation_interval:
if self.should_create_block() and len(self.pending_transactions) > 0:
await self.create_and_broadcast_block()
self.last_block_time = current_time
await asyncio.sleep(1)
except Exception as e:
logger.error(f"Error in block creation loop: {e}")
await asyncio.sleep(5)
async def create_and_broadcast_block(self) -> None:
"""Create a new block and broadcast to peers"""
try:
# Take up to 32 pending transactions
transactions = self.pending_transactions[:32]
self.pending_transactions = self.pending_transactions[32:]
# Create block
prev_block = self.blockchain.get_latest_block()
new_block = Block(
index=self.blockchain.get_chain_length(),
prev_hash=prev_block.block_hash,
timestamp=int(time.time()),
transactions=transactions,
validator=self.node_id
)
# Calculate hash
new_block.block_hash = Blockchain.calculate_block_hash(new_block)
# Sign (simplified - just use hash)
new_block.signature = new_block.block_hash[:16]
# Add to local chain
if self.blockchain.add_block(new_block):
logger.info(f"Block {new_block.index} created successfully")
# Broadcast to peers
await self.broadcast_block(new_block)
except Exception as e:
logger.error(f"Error creating block: {e}")
async def broadcast_transaction(self, transaction: Transaction) -> None:
"""Broadcast transaction to all peers"""
if not self.peer_connections:
logger.debug("No peers to broadcast transaction to")
return
payload = json.dumps({
"type": "new_transaction",
"transaction": {
"voter_id": transaction.voter_id,
"election_id": transaction.election_id,
"encrypted_vote": transaction.encrypted_vote,
"ballot_hash": transaction.ballot_hash,
"proof": transaction.proof,
"timestamp": transaction.timestamp
}
})
async with aiohttp.ClientSession() as session:
for node_id, peer_url in self.peer_connections.items():
try:
async with session.post(
f"{peer_url}/p2p/new_transaction",
data=payload,
headers={"Content-Type": "application/json"},
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
if resp.status == 200:
logger.debug(f"Transaction broadcast to {node_id}")
except Exception as e:
logger.debug(f"Failed to broadcast transaction to {node_id}: {e}")
async def broadcast_block(self, block: Block) -> None:
"""Broadcast block to all peers"""
if not self.peer_connections:
logger.debug("No peers to broadcast to")
return
payload = json.dumps({
"type": "new_block",
"block": block.to_dict()
})
async with aiohttp.ClientSession() as session:
for node_id, peer_url in self.peer_connections.items():
try:
async with session.post(
f"{peer_url}/p2p/new_block",
data=payload,
headers={"Content-Type": "application/json"},
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
if resp.status == 200:
logger.debug(f"Block broadcast to {node_id}")
except Exception as e:
logger.warning(f"Failed to broadcast to {node_id}: {e}")
async def handle_new_block(self, block_data: Dict[str, Any]) -> None:
"""Handle a new block from peers"""
try:
block = Block.from_dict(block_data)
if self.blockchain.validate_block(block):
if self.blockchain.add_block(block):
# Broadcast to other peers
await self.broadcast_block(block)
logger.info(f"Block {block.index} accepted and propagated")
else:
logger.warning(f"Invalid block received: {block.index}")
except Exception as e:
logger.error(f"Error handling new block: {e}")
def add_transaction(self, transaction: Transaction) -> str:
"""Add a transaction to the pending pool"""
tx_id = f"0x{uuid.uuid4().hex}"
self.transaction_pool[tx_id] = transaction
self.pending_transactions.append(transaction)
logger.info(f"Transaction added: {tx_id}")
return tx_id
def get_transaction_receipt(self, tx_id: str) -> Optional[Dict[str, Any]]:
"""Get receipt for a transaction"""
# Look through blockchain for transaction
for block in self.blockchain.chain:
for tx in block.transactions:
if tx.voter_id == tx_id or tx.ballot_hash == tx_id:
return {
"transactionHash": tx_id,
"blockNumber": block.index,
"blockHash": block.block_hash,
"status": 1,
"timestamp": block.timestamp
}
return None
def get_blockchain_data(self) -> Dict[str, Any]:
"""Get full blockchain data"""
return {
"blocks": [block.to_dict() for block in self.blockchain.chain],
"verification": {
"chain_valid": self.blockchain.verify_integrity(),
"total_blocks": self.blockchain.get_chain_length(),
"total_votes": sum(
len(block.transactions)
for block in self.blockchain.chain
if block.index > 0 # Exclude genesis
)
}
}
# ============================================================================
# FastAPI Application
# ============================================================================
app = FastAPI(
title="PoA Validator Node",
description="Proof-of-Authority blockchain validator",
version="1.0.0"
)
# Global validator instance (set during startup)
validator: Optional[PoAValidator] = None
# ============================================================================
# Startup/Shutdown
# ============================================================================
@app.on_event("startup")
async def startup():
"""Initialize validator on startup"""
global validator
node_id = os.getenv("NODE_ID", "validator-1")
private_key = os.getenv("PRIVATE_KEY", f"0x{uuid.uuid4().hex}")
bootnode_url = os.getenv("BOOTNODE_URL", "http://bootnode:8546")
rpc_port = int(os.getenv("RPC_PORT", "8001"))
p2p_port = int(os.getenv("P2P_PORT", "30303"))
validator = PoAValidator(
node_id=node_id,
private_key=private_key,
bootnode_url=bootnode_url,
rpc_port=rpc_port,
p2p_port=p2p_port
)
await validator.startup()
# ============================================================================
# Health Check
# ============================================================================
@app.get("/health")
async def health_check():
"""Health check endpoint"""
if validator is None:
raise HTTPException(status_code=503, detail="Validator not initialized")
return {
"status": "healthy",
"node_id": validator.node_id,
"chain_length": validator.blockchain.get_chain_length(),
"pending_transactions": len(validator.pending_transactions),
"timestamp": datetime.now(timezone.utc).isoformat()
}
# ============================================================================
# JSON-RPC Interface
# ============================================================================
@app.post("/rpc")
async def handle_json_rpc(request: dict):
"""Handle JSON-RPC requests"""
if validator is None:
raise HTTPException(status_code=503, detail="Validator not initialized")
method = request.get("method")
params = request.get("params", [])
request_id = request.get("id")
try:
if method == "eth_sendTransaction":
# Submit a vote transaction
tx_data = params[0] if params else {}
data_hex = tx_data.get("data", "0x")
# Decode transaction data
if data_hex.startswith("0x"):
data_hex = data_hex[2:]
try:
data_json = bytes.fromhex(data_hex).decode()
tx_dict = json.loads(data_json)
except:
raise ValueError("Invalid transaction data")
else:
raise ValueError("Invalid data format")
# Create transaction
transaction = Transaction(**tx_dict)
tx_id = validator.add_transaction(transaction)
# Broadcast transaction to other validators
asyncio.create_task(validator.broadcast_transaction(transaction))
return {
"jsonrpc": "2.0",
"result": tx_id,
"id": request_id
}
elif method == "eth_getTransactionReceipt":
# Get transaction receipt
tx_id = params[0] if params else None
receipt = validator.get_transaction_receipt(tx_id)
return {
"jsonrpc": "2.0",
"result": receipt,
"id": request_id
}
elif method == "eth_blockNumber":
# Get current block number
block_number = validator.blockchain.get_chain_length() - 1
return {
"jsonrpc": "2.0",
"result": hex(block_number),
"id": request_id
}
elif method == "eth_getBlockByNumber":
# Get block by number
block_num = int(params[0], 0) if params else 0
block = validator.blockchain.get_block(block_num)
if block:
return {
"jsonrpc": "2.0",
"result": block.to_dict(),
"id": request_id
}
else:
return {
"jsonrpc": "2.0",
"result": None,
"id": request_id
}
else:
raise ValueError(f"Unknown method: {method}")
except Exception as e:
logger.error(f"JSON-RPC error: {e}")
return {
"jsonrpc": "2.0",
"error": {"code": -32603, "message": str(e)},
"id": request_id
}
# ============================================================================
# P2P Networking
# ============================================================================
@app.post("/p2p/new_block")
async def handle_new_block(block_data: dict):
"""Handle new block from peer"""
if validator is None:
raise HTTPException(status_code=503, detail="Validator not initialized")
try:
await validator.handle_new_block(block_data)
return {"status": "ok"}
except Exception as e:
logger.error(f"Error handling P2P block: {e}")
raise HTTPException(status_code=400, detail=str(e))
@app.post("/p2p/new_transaction")
async def handle_new_transaction(transaction: Transaction):
"""Handle new transaction from peer"""
if validator is None:
raise HTTPException(status_code=503, detail="Validator not initialized")
try:
validator.add_transaction(transaction)
return {"status": "ok"}
except Exception as e:
logger.error(f"Error handling P2P transaction: {e}")
raise HTTPException(status_code=400, detail=str(e))
# ============================================================================
# Admin Endpoints
# ============================================================================
@app.get("/blockchain")
async def get_blockchain():
"""Get full blockchain data"""
if validator is None:
raise HTTPException(status_code=503, detail="Validator not initialized")
return validator.get_blockchain_data()
@app.get("/peers")
async def get_peers():
"""Get connected peers"""
if validator is None:
raise HTTPException(status_code=503, detail="Validator not initialized")
return {
"node_id": validator.node_id,
"peers": list(validator.peer_connections.keys()),
"peer_count": len(validator.peer_connections)
}
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
import uvicorn
rpc_port = int(os.getenv("RPC_PORT", "8001"))
logger.info(f"Starting validator on RPC port {rpc_port}")
uvicorn.run(
app,
host="0.0.0.0",
port=rpc_port,
log_level="info"
)