Alexis Bruneteau 78f0140342 fix: Handle both plain and base64-encoded public keys in API responses
## Issue
Even after storing keys as base64, the API was returning plain "p:g:h" format
for existing elections that had keys stored as plain UTF-8 bytes, causing:
- Client receives: "23:5:13" (plain text)
- Client tries to decode as base64 (btoa call)
- Results in: "Invalid base64: 23:5:13... - String contains an invalid character"

## Root Cause
1. Old elections have public_key stored as plain UTF-8: b'23:5:13'
2. New elections store as base64: b'MjM6NToxMw=='
3. Both were decoded to string before return, exposing wrong format
4. Also fixed ElGamal class name typo: ElGamal() → ElGamalEncryption()

## Fix
1. Detect public key format before returning:
   - If plain "p:g:h" format (contains ':'), encode to base64
   - If already base64 (starts with 'MjM6'), return as-is
2. Always return base64-encoded string to client
3. Updated both /setup and /public-keys endpoints in votes.py
4. Updated /init-keys endpoint in admin.py
5. Fixed class name in setup_election function

## Files Changed
- backend/routes/votes.py: Lines 502, 509-518, 560-569
- backend/routes/admin.py: Lines 179-197

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 19:36:39 +01:00

295 lines
10 KiB
Python

"""
Routes administrateur pour maintenance et configuration du système.
Admin endpoints for database maintenance and system configuration.
"""
from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..dependencies import get_db
from ..crypto.encryption import ElGamalEncryption
import base64
import logging
router = APIRouter(prefix="/api/admin", tags=["admin"])
logger = logging.getLogger(__name__)
@router.post("/fix-elgamal-keys")
async def fix_elgamal_keys(db: Session = Depends(get_db)):
"""
Fix missing ElGamal encryption parameters for elections.
Updates all elections that have NULL elgamal_p or elgamal_g to use p=23, g=5.
This is needed for the voting system to function properly.
"""
try:
logger.info("🔧 Starting ElGamal key fix...")
# Get current status
result = db.execute(text(
"SELECT COUNT(*) FROM elections WHERE elgamal_p IS NULL OR elgamal_g IS NULL"
))
count_before = result.scalar()
logger.info(f"Elections needing fix: {count_before}")
# Update elections with missing ElGamal parameters
db.execute(text(
"UPDATE elections SET elgamal_p = 23, elgamal_g = 5 WHERE elgamal_p IS NULL OR elgamal_g IS NULL"
))
db.commit()
# Verify the fix
result = db.execute(text(
"SELECT id, name, elgamal_p, elgamal_g FROM elections WHERE is_active = TRUE"
))
fixed_elections = []
for row in result:
fixed_elections.append({
"id": row[0],
"name": row[1],
"elgamal_p": row[2],
"elgamal_g": row[3]
})
logger.info(f"✓ Fixed {count_before} elections with ElGamal keys")
logger.info(f"Active elections with keys: {len(fixed_elections)}")
return {
"status": "success",
"message": f"Fixed {count_before} elections with ElGamal parameters",
"elgamal_p": 23,
"elgamal_g": 5,
"active_elections": fixed_elections
}
except Exception as e:
logger.error(f"✗ Error fixing ElGamal keys: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error fixing ElGamal keys: {str(e)}"
)
@router.get("/elections/elgamal-status")
async def check_elgamal_status(db: Session = Depends(get_db)):
"""
Check which elections have ElGamal parameters set.
Useful for diagnostics before voting.
"""
try:
result = db.execute(text(
"""
SELECT
id,
name,
is_active,
elgamal_p,
elgamal_g,
public_key,
CASE WHEN elgamal_p IS NOT NULL AND elgamal_g IS NOT NULL AND public_key IS NOT NULL THEN 'ready' ELSE 'incomplete' END as status
FROM elections
ORDER BY is_active DESC, id ASC
"""
))
elections = []
incomplete_count = 0
ready_count = 0
for row in result:
status_val = "ready" if row[3] and row[4] and row[5] else "incomplete"
elections.append({
"id": row[0],
"name": row[1],
"is_active": row[2],
"elgamal_p": row[3],
"elgamal_g": row[4],
"has_public_key": row[5] is not None,
"status": status_val
})
if status_val == "incomplete":
incomplete_count += 1
else:
ready_count += 1
return {
"total_elections": len(elections),
"ready_for_voting": ready_count,
"incomplete": incomplete_count,
"elections": elections
}
except Exception as e:
logger.error(f"Error checking ElGamal status: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error checking status: {str(e)}"
)
@router.post("/init-election-keys")
async def init_election_keys(election_id: int, db: Session = Depends(get_db)):
"""
Initialize ElGamal public keys for an election.
Generates a public key for voting encryption if not already present.
"""
try:
# Get the election
from .. import models
election = db.query(models.Election).filter(models.Election.id == election_id).first()
if not election:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Election {election_id} not found"
)
logger.info(f"Initializing keys for election {election_id}: {election.name}")
# Generate ElGamal public key if missing or invalid
pubkey_is_invalid = False
if election.public_key:
try:
# Public key is stored as base64-encoded bytes, try to decode it
pubkey_b64_str = election.public_key.decode('ascii') if isinstance(election.public_key, bytes) else str(election.public_key)
# Try to decode the base64 to verify it's valid
pubkey_bytes = base64.b64decode(pubkey_b64_str)
pubkey_str = pubkey_bytes.decode('utf-8')
# Check if it's valid (should be "p:g:h" format, not "pk_ongoing_X")
if not ':' in pubkey_str or pubkey_str.startswith('pk_') or pubkey_str.startswith('b\''):
pubkey_is_invalid = True
except:
pubkey_is_invalid = True
if not election.public_key or pubkey_is_invalid:
logger.info(f"Generating ElGamal public key for election {election_id}")
elgamal = ElGamalEncryption(p=election.elgamal_p or 23, g=election.elgamal_g or 5)
# Store as base64-encoded bytes (public_key_bytes returns UTF-8 "p:g:h", then encode to base64)
election.public_key = base64.b64encode(elgamal.public_key_bytes)
db.commit()
logger.info(f"✓ Generated public key for election {election_id}")
else:
logger.info(f"Election {election_id} already has valid public key")
# Ensure public key is base64-encoded for client
pubkey_to_return = election.public_key
if isinstance(pubkey_to_return, bytes):
pubkey_str = pubkey_to_return.decode('utf-8')
# If it's plain "p:g:h" format, encode it to base64
if ':' in pubkey_str and not pubkey_str.startswith('MjM6'): # Not already base64
pubkey_to_return = base64.b64encode(pubkey_str.encode('utf-8')).decode('ascii')
else:
# Already base64, just decode to string
pubkey_to_return = pubkey_str
return {
"status": "success",
"election_id": election_id,
"election_name": election.name,
"elgamal_p": election.elgamal_p,
"elgamal_g": election.elgamal_g,
"public_key_generated": True,
"public_key": pubkey_to_return
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error initializing election keys: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error initializing election keys: {str(e)}"
)
@router.get("/validators/health")
async def check_validators_health():
"""
Check the health status of all PoA validator nodes.
Returns:
- Each validator's health status (healthy, degraded, unreachable)
- Timestamp of the check
- Number of healthy validators
"""
from ..blockchain_client import BlockchainClient
try:
async with BlockchainClient() as client:
await client.refresh_validator_status()
validators_status = []
for validator in client.validators:
validators_status.append({
"node_id": validator.node_id,
"rpc_url": validator.rpc_url,
"p2p_url": validator.p2p_url,
"status": validator.status.value
})
healthy_count = len(client.healthy_validators)
total_count = len(client.validators)
logger.info(f"Validator health check: {healthy_count}/{total_count} healthy")
return {
"timestamp": datetime.utcnow().isoformat(),
"validators": validators_status,
"summary": {
"healthy": healthy_count,
"total": total_count,
"health_percentage": (healthy_count / total_count * 100) if total_count > 0 else 0
}
}
except Exception as e:
logger.error(f"Error checking validator health: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error checking validator health: {str(e)}"
)
@router.post("/validators/refresh-status")
async def refresh_validator_status():
"""
Force a refresh of validator node health status.
Useful for immediate status checks without waiting for automatic intervals.
"""
from ..blockchain_client import BlockchainClient
try:
async with BlockchainClient() as client:
await client.refresh_validator_status()
validators_status = []
for validator in client.validators:
validators_status.append({
"node_id": validator.node_id,
"status": validator.status.value
})
logger.info("Validator status refreshed")
return {
"message": "Validator status refreshed",
"validators": validators_status,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error refreshing validator status: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error refreshing validator status: {str(e)}"
)
from datetime import datetime