The backend container needs to reach validators using their Docker service names (validator-1, validator-2, validator-3) instead of localhost:PORT. This fixes the 'validators unreachable' warning on backend startup. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
474 lines
14 KiB
Python
474 lines
14 KiB
Python
"""
|
|
BlockchainClient for communicating with PoA validator nodes.
|
|
|
|
This client submits votes to the distributed PoA blockchain network
|
|
and queries the state of votes on the blockchain.
|
|
"""
|
|
|
|
import logging
|
|
import httpx
|
|
import json
|
|
from typing import Optional, Dict, Any, List
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
import asyncio
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ValidatorStatus(str, Enum):
|
|
"""Status of a validator node"""
|
|
HEALTHY = "healthy"
|
|
DEGRADED = "degraded"
|
|
UNREACHABLE = "unreachable"
|
|
|
|
|
|
@dataclass
|
|
class ValidatorNode:
|
|
"""Represents a PoA validator node"""
|
|
node_id: str
|
|
rpc_url: str # JSON-RPC endpoint
|
|
p2p_url: str # P2P networking endpoint
|
|
status: ValidatorStatus = ValidatorStatus.UNREACHABLE
|
|
|
|
@property
|
|
def health_check_url(self) -> str:
|
|
"""Health check endpoint"""
|
|
return f"{self.rpc_url}/health"
|
|
|
|
|
|
class BlockchainClient:
|
|
"""
|
|
Client for PoA blockchain network.
|
|
|
|
Features:
|
|
- Load balancing across multiple validators
|
|
- Health monitoring
|
|
- Automatic failover
|
|
- Vote submission and confirmation tracking
|
|
"""
|
|
|
|
# Default validator configuration
|
|
# Use Docker service names for internal container communication
|
|
# For external access (outside Docker), use localhost:PORT
|
|
DEFAULT_VALIDATORS = [
|
|
ValidatorNode(
|
|
node_id="validator-1",
|
|
rpc_url="http://validator-1:8001",
|
|
p2p_url="http://validator-1:30303"
|
|
),
|
|
ValidatorNode(
|
|
node_id="validator-2",
|
|
rpc_url="http://validator-2:8001",
|
|
p2p_url="http://validator-2:30303"
|
|
),
|
|
ValidatorNode(
|
|
node_id="validator-3",
|
|
rpc_url="http://validator-3:8001",
|
|
p2p_url="http://validator-3:30303"
|
|
),
|
|
]
|
|
|
|
def __init__(self, validators: Optional[List[ValidatorNode]] = None, timeout: float = 5.0):
|
|
"""
|
|
Initialize blockchain client.
|
|
|
|
Args:
|
|
validators: List of validator nodes (uses defaults if None)
|
|
timeout: HTTP request timeout in seconds
|
|
"""
|
|
self.validators = validators or self.DEFAULT_VALIDATORS
|
|
self.timeout = timeout
|
|
self.healthy_validators: List[ValidatorNode] = []
|
|
self._client: Optional[httpx.AsyncClient] = None
|
|
|
|
async def __aenter__(self):
|
|
"""Async context manager entry"""
|
|
self._client = httpx.AsyncClient(timeout=self.timeout)
|
|
await self.refresh_validator_status()
|
|
return self
|
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
|
"""Async context manager exit"""
|
|
if self._client:
|
|
await self._client.aclose()
|
|
|
|
async def refresh_validator_status(self) -> None:
|
|
"""
|
|
Check health of all validators.
|
|
|
|
Updates the list of healthy validators for load balancing.
|
|
"""
|
|
if not self._client:
|
|
self._client = httpx.AsyncClient(timeout=self.timeout)
|
|
|
|
tasks = [self._check_validator_health(v) for v in self.validators]
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
self.healthy_validators = [
|
|
v for v in self.validators
|
|
if v.status == ValidatorStatus.HEALTHY
|
|
]
|
|
|
|
logger.info(
|
|
f"Validator health check: {len(self.healthy_validators)}/{len(self.validators)} healthy"
|
|
)
|
|
|
|
async def _check_validator_health(self, validator: ValidatorNode) -> None:
|
|
"""Check if a validator is healthy"""
|
|
try:
|
|
if not self._client:
|
|
return
|
|
|
|
response = await self._client.get(
|
|
validator.health_check_url,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
validator.status = ValidatorStatus.HEALTHY
|
|
logger.debug(f"✓ {validator.node_id} is healthy")
|
|
else:
|
|
validator.status = ValidatorStatus.DEGRADED
|
|
logger.warning(f"⚠ {validator.node_id} returned status {response.status_code}")
|
|
except Exception as e:
|
|
validator.status = ValidatorStatus.UNREACHABLE
|
|
logger.warning(f"✗ {validator.node_id} is unreachable: {e}")
|
|
|
|
def _get_healthy_validator(self) -> Optional[ValidatorNode]:
|
|
"""
|
|
Get a healthy validator for the next request.
|
|
Uses round-robin for load balancing.
|
|
"""
|
|
if not self.healthy_validators:
|
|
logger.error("No healthy validators available!")
|
|
return None
|
|
|
|
# Simple round-robin: return first healthy validator
|
|
# In production, implement proper round-robin state management
|
|
return self.healthy_validators[0]
|
|
|
|
async def submit_vote(
|
|
self,
|
|
voter_id: str,
|
|
election_id: int,
|
|
encrypted_vote: str,
|
|
transaction_id: Optional[str] = None
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Submit a vote to the PoA blockchain network.
|
|
|
|
Args:
|
|
voter_id: Voter identifier
|
|
election_id: Election ID
|
|
encrypted_vote: Encrypted vote (base64 or hex)
|
|
transaction_id: Optional transaction ID (generated if not provided)
|
|
|
|
Returns:
|
|
Transaction receipt with block hash and index
|
|
|
|
Raises:
|
|
Exception: If all validators are unreachable
|
|
"""
|
|
validator = self._get_healthy_validator()
|
|
if not validator:
|
|
raise Exception("No healthy validators available")
|
|
|
|
# Generate transaction ID if not provided
|
|
if not transaction_id:
|
|
import uuid
|
|
transaction_id = f"tx-{uuid.uuid4().hex[:12]}"
|
|
|
|
# Prepare JSON-RPC request
|
|
rpc_request = {
|
|
"jsonrpc": "2.0",
|
|
"method": "eth_sendTransaction",
|
|
"params": [{
|
|
"from": voter_id,
|
|
"to": f"election-{election_id}",
|
|
"data": encrypted_vote,
|
|
"gas": "0x5208"
|
|
}],
|
|
"id": transaction_id
|
|
}
|
|
|
|
logger.info(f"Submitting vote to {validator.node_id}: tx_id={transaction_id}")
|
|
|
|
try:
|
|
if not self._client:
|
|
raise Exception("AsyncClient not initialized")
|
|
|
|
response = await self._client.post(
|
|
f"{validator.rpc_url}/rpc",
|
|
json=rpc_request,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
# Check for JSON-RPC errors
|
|
if "error" in result:
|
|
logger.error(f"RPC error from {validator.node_id}: {result['error']}")
|
|
raise Exception(f"RPC error: {result['error']}")
|
|
|
|
logger.info(f"✓ Vote submitted successfully: {transaction_id}")
|
|
|
|
return {
|
|
"transaction_id": transaction_id,
|
|
"block_hash": result.get("result"),
|
|
"validator": validator.node_id,
|
|
"status": "pending"
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to submit vote to {validator.node_id}: {e}")
|
|
raise
|
|
|
|
async def get_transaction_receipt(
|
|
self,
|
|
transaction_id: str,
|
|
election_id: int
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get the receipt for a submitted vote.
|
|
|
|
Args:
|
|
transaction_id: Transaction ID returned from submit_vote
|
|
election_id: Election ID
|
|
|
|
Returns:
|
|
Transaction receipt with confirmation status and block info
|
|
"""
|
|
validator = self._get_healthy_validator()
|
|
if not validator:
|
|
return None
|
|
|
|
rpc_request = {
|
|
"jsonrpc": "2.0",
|
|
"method": "eth_getTransactionReceipt",
|
|
"params": [transaction_id],
|
|
"id": transaction_id
|
|
}
|
|
|
|
try:
|
|
if not self._client:
|
|
raise Exception("AsyncClient not initialized")
|
|
|
|
response = await self._client.post(
|
|
f"{validator.rpc_url}/rpc",
|
|
json=rpc_request,
|
|
timeout=self.timeout
|
|
)
|
|
|
|
result = response.json()
|
|
|
|
if "error" in result:
|
|
logger.warning(f"RPC error: {result['error']}")
|
|
return None
|
|
|
|
receipt = result.get("result")
|
|
if receipt:
|
|
logger.debug(f"✓ Got receipt for {transaction_id}: block {receipt.get('blockNumber')}")
|
|
|
|
return receipt
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get receipt for {transaction_id}: {e}")
|
|
return None
|
|
|
|
async def get_vote_confirmation_status(
|
|
self,
|
|
transaction_id: str,
|
|
election_id: int
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Check if a vote has been confirmed on the blockchain.
|
|
|
|
Args:
|
|
transaction_id: Transaction ID
|
|
election_id: Election ID
|
|
|
|
Returns:
|
|
Status information including block number and finality
|
|
"""
|
|
receipt = await self.get_transaction_receipt(transaction_id, election_id)
|
|
|
|
if receipt is None:
|
|
return {
|
|
"status": "pending",
|
|
"confirmed": False,
|
|
"transaction_id": transaction_id
|
|
}
|
|
|
|
return {
|
|
"status": "confirmed",
|
|
"confirmed": True,
|
|
"transaction_id": transaction_id,
|
|
"block_number": receipt.get("blockNumber"),
|
|
"block_hash": receipt.get("blockHash"),
|
|
"gas_used": receipt.get("gasUsed")
|
|
}
|
|
|
|
async def get_blockchain_state(self, election_id: int) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get the current state of the blockchain for an election.
|
|
|
|
Args:
|
|
election_id: Election ID
|
|
|
|
Returns:
|
|
Blockchain state with block count and verification status
|
|
"""
|
|
validator = self._get_healthy_validator()
|
|
if not validator:
|
|
return None
|
|
|
|
try:
|
|
if not self._client:
|
|
raise Exception("AsyncClient not initialized")
|
|
|
|
# Query blockchain info endpoint on validator
|
|
response = await self._client.get(
|
|
f"{validator.rpc_url}/blockchain",
|
|
params={"election_id": election_id},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get blockchain state: {e}")
|
|
return None
|
|
|
|
async def verify_blockchain_integrity(self, election_id: int) -> bool:
|
|
"""
|
|
Verify that the blockchain for an election is valid and unmodified.
|
|
|
|
Args:
|
|
election_id: Election ID
|
|
|
|
Returns:
|
|
True if blockchain is valid, False otherwise
|
|
"""
|
|
state = await self.get_blockchain_state(election_id)
|
|
|
|
if state is None:
|
|
return False
|
|
|
|
verification = state.get("verification", {})
|
|
is_valid = verification.get("chain_valid", False)
|
|
|
|
if is_valid:
|
|
logger.info(f"✓ Blockchain for election {election_id} is valid")
|
|
else:
|
|
logger.error(f"✗ Blockchain for election {election_id} is INVALID")
|
|
|
|
return is_valid
|
|
|
|
async def get_election_results(self, election_id: int) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get the current vote counts for an election from the blockchain.
|
|
|
|
Args:
|
|
election_id: Election ID
|
|
|
|
Returns:
|
|
Vote counts by candidate and verification status
|
|
"""
|
|
validator = self._get_healthy_validator()
|
|
if not validator:
|
|
return None
|
|
|
|
try:
|
|
if not self._client:
|
|
raise Exception("AsyncClient not initialized")
|
|
|
|
# Query results endpoint on validator
|
|
response = await self._client.get(
|
|
f"{validator.rpc_url}/results",
|
|
params={"election_id": election_id},
|
|
timeout=self.timeout
|
|
)
|
|
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to get election results: {e}")
|
|
return None
|
|
|
|
async def wait_for_confirmation(
|
|
self,
|
|
transaction_id: str,
|
|
election_id: int,
|
|
max_wait_seconds: int = 30,
|
|
poll_interval_seconds: float = 1.0
|
|
) -> bool:
|
|
"""
|
|
Wait for a vote to be confirmed on the blockchain.
|
|
|
|
Args:
|
|
transaction_id: Transaction ID
|
|
election_id: Election ID
|
|
max_wait_seconds: Maximum time to wait in seconds
|
|
poll_interval_seconds: Time between status checks
|
|
|
|
Returns:
|
|
True if vote was confirmed, False if timeout
|
|
"""
|
|
import time
|
|
|
|
start_time = time.time()
|
|
while time.time() - start_time < max_wait_seconds:
|
|
status = await self.get_vote_confirmation_status(transaction_id, election_id)
|
|
|
|
if status.get("confirmed"):
|
|
logger.info(f"✓ Vote confirmed: {transaction_id}")
|
|
return True
|
|
|
|
logger.debug(f"Waiting for confirmation... ({status['status']})")
|
|
await asyncio.sleep(poll_interval_seconds)
|
|
|
|
logger.warning(f"Confirmation timeout for {transaction_id}")
|
|
return False
|
|
|
|
|
|
# Singleton instance for use throughout the backend
|
|
_blockchain_client: Optional[BlockchainClient] = None
|
|
|
|
|
|
async def get_blockchain_client() -> BlockchainClient:
|
|
"""
|
|
Get or create the global blockchain client instance.
|
|
|
|
Returns:
|
|
BlockchainClient instance
|
|
"""
|
|
global _blockchain_client
|
|
|
|
if _blockchain_client is None:
|
|
_blockchain_client = BlockchainClient()
|
|
await _blockchain_client.refresh_validator_status()
|
|
|
|
return _blockchain_client
|
|
|
|
|
|
def get_blockchain_client_sync() -> BlockchainClient:
|
|
"""
|
|
Get the blockchain client (for sync contexts).
|
|
|
|
Note: This returns the client without initializing it.
|
|
Use with caution in async contexts.
|
|
|
|
Returns:
|
|
BlockchainClient instance
|
|
"""
|
|
global _blockchain_client
|
|
|
|
if _blockchain_client is None:
|
|
_blockchain_client = BlockchainClient()
|
|
|
|
return _blockchain_client
|