## Issue ElGamal encryption failed with "Invalid base64: 23:5:9..." error because: - `/api/votes/setup` stored public key as base64-encoded bytes - `/api/admin/init-keys` stored public key as raw UTF-8 bytes - Client received plain "p:g:h" text instead of base64, causing decoding failure ## Root Cause Inconsistent storage format: - votes.py line 505: `base64.b64encode(elgamal.public_key_bytes)` - admin.py line 169: `elgamal.public_key_bytes` (no encoding) - Return paths decoded base64 as UTF-8, exposing plain format to client ## Fix 1. Both endpoints now consistently store `base64.b64encode(elgamal.public_key_bytes)` 2. Return paths decode base64 to ASCII (which is valid base64 format) 3. Updated validation in admin.py to properly decode base64 before validation 4. Frontend ElGamalEncryption.encrypt() expects base64 input, now receives it correctly ## Files Changed - backend/routes/votes.py: Lines 505, 513, 550 - backend/routes/admin.py: Lines 159-162, 169, 182 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
284 lines
9.7 KiB
Python
284 lines
9.7 KiB
Python
"""
|
|
Routes administrateur pour maintenance et configuration du système.
|
|
|
|
Admin endpoints for database maintenance and system configuration.
|
|
"""
|
|
|
|
from fastapi import APIRouter, HTTPException, status, Depends
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import text
|
|
from ..dependencies import get_db
|
|
from ..crypto.encryption import ElGamalEncryption
|
|
import base64
|
|
import logging
|
|
|
|
router = APIRouter(prefix="/api/admin", tags=["admin"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@router.post("/fix-elgamal-keys")
|
|
async def fix_elgamal_keys(db: Session = Depends(get_db)):
|
|
"""
|
|
Fix missing ElGamal encryption parameters for elections.
|
|
|
|
Updates all elections that have NULL elgamal_p or elgamal_g to use p=23, g=5.
|
|
This is needed for the voting system to function properly.
|
|
"""
|
|
try:
|
|
logger.info("🔧 Starting ElGamal key fix...")
|
|
|
|
# Get current status
|
|
result = db.execute(text(
|
|
"SELECT COUNT(*) FROM elections WHERE elgamal_p IS NULL OR elgamal_g IS NULL"
|
|
))
|
|
count_before = result.scalar()
|
|
logger.info(f"Elections needing fix: {count_before}")
|
|
|
|
# Update elections with missing ElGamal parameters
|
|
db.execute(text(
|
|
"UPDATE elections SET elgamal_p = 23, elgamal_g = 5 WHERE elgamal_p IS NULL OR elgamal_g IS NULL"
|
|
))
|
|
db.commit()
|
|
|
|
# Verify the fix
|
|
result = db.execute(text(
|
|
"SELECT id, name, elgamal_p, elgamal_g FROM elections WHERE is_active = TRUE"
|
|
))
|
|
|
|
fixed_elections = []
|
|
for row in result:
|
|
fixed_elections.append({
|
|
"id": row[0],
|
|
"name": row[1],
|
|
"elgamal_p": row[2],
|
|
"elgamal_g": row[3]
|
|
})
|
|
|
|
logger.info(f"✓ Fixed {count_before} elections with ElGamal keys")
|
|
logger.info(f"Active elections with keys: {len(fixed_elections)}")
|
|
|
|
return {
|
|
"status": "success",
|
|
"message": f"Fixed {count_before} elections with ElGamal parameters",
|
|
"elgamal_p": 23,
|
|
"elgamal_g": 5,
|
|
"active_elections": fixed_elections
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"✗ Error fixing ElGamal keys: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error fixing ElGamal keys: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/elections/elgamal-status")
|
|
async def check_elgamal_status(db: Session = Depends(get_db)):
|
|
"""
|
|
Check which elections have ElGamal parameters set.
|
|
|
|
Useful for diagnostics before voting.
|
|
"""
|
|
try:
|
|
result = db.execute(text(
|
|
"""
|
|
SELECT
|
|
id,
|
|
name,
|
|
is_active,
|
|
elgamal_p,
|
|
elgamal_g,
|
|
public_key,
|
|
CASE WHEN elgamal_p IS NOT NULL AND elgamal_g IS NOT NULL AND public_key IS NOT NULL THEN 'ready' ELSE 'incomplete' END as status
|
|
FROM elections
|
|
ORDER BY is_active DESC, id ASC
|
|
"""
|
|
))
|
|
|
|
elections = []
|
|
incomplete_count = 0
|
|
ready_count = 0
|
|
|
|
for row in result:
|
|
status_val = "ready" if row[3] and row[4] and row[5] else "incomplete"
|
|
elections.append({
|
|
"id": row[0],
|
|
"name": row[1],
|
|
"is_active": row[2],
|
|
"elgamal_p": row[3],
|
|
"elgamal_g": row[4],
|
|
"has_public_key": row[5] is not None,
|
|
"status": status_val
|
|
})
|
|
if status_val == "incomplete":
|
|
incomplete_count += 1
|
|
else:
|
|
ready_count += 1
|
|
|
|
return {
|
|
"total_elections": len(elections),
|
|
"ready_for_voting": ready_count,
|
|
"incomplete": incomplete_count,
|
|
"elections": elections
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking ElGamal status: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error checking status: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/init-election-keys")
|
|
async def init_election_keys(election_id: int, db: Session = Depends(get_db)):
|
|
"""
|
|
Initialize ElGamal public keys for an election.
|
|
|
|
Generates a public key for voting encryption if not already present.
|
|
"""
|
|
try:
|
|
# Get the election
|
|
from .. import models
|
|
election = db.query(models.Election).filter(models.Election.id == election_id).first()
|
|
|
|
if not election:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail=f"Election {election_id} not found"
|
|
)
|
|
|
|
logger.info(f"Initializing keys for election {election_id}: {election.name}")
|
|
|
|
# Generate ElGamal public key if missing or invalid
|
|
pubkey_is_invalid = False
|
|
if election.public_key:
|
|
try:
|
|
# Public key is stored as base64-encoded bytes, try to decode it
|
|
pubkey_b64_str = election.public_key.decode('ascii') if isinstance(election.public_key, bytes) else str(election.public_key)
|
|
# Try to decode the base64 to verify it's valid
|
|
pubkey_bytes = base64.b64decode(pubkey_b64_str)
|
|
pubkey_str = pubkey_bytes.decode('utf-8')
|
|
# Check if it's valid (should be "p:g:h" format, not "pk_ongoing_X")
|
|
if not ':' in pubkey_str or pubkey_str.startswith('pk_') or pubkey_str.startswith('b\''):
|
|
pubkey_is_invalid = True
|
|
except:
|
|
pubkey_is_invalid = True
|
|
|
|
if not election.public_key or pubkey_is_invalid:
|
|
logger.info(f"Generating ElGamal public key for election {election_id}")
|
|
elgamal = ElGamalEncryption(p=election.elgamal_p or 23, g=election.elgamal_g or 5)
|
|
# Store as base64-encoded bytes (public_key_bytes returns UTF-8 "p:g:h", then encode to base64)
|
|
election.public_key = base64.b64encode(elgamal.public_key_bytes)
|
|
db.commit()
|
|
logger.info(f"✓ Generated public key for election {election_id}")
|
|
else:
|
|
logger.info(f"Election {election_id} already has valid public key")
|
|
|
|
return {
|
|
"status": "success",
|
|
"election_id": election_id,
|
|
"election_name": election.name,
|
|
"elgamal_p": election.elgamal_p,
|
|
"elgamal_g": election.elgamal_g,
|
|
"public_key_generated": True,
|
|
"public_key": election.public_key.decode('ascii') if isinstance(election.public_key, bytes) else election.public_key
|
|
}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error initializing election keys: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error initializing election keys: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.get("/validators/health")
|
|
async def check_validators_health():
|
|
"""
|
|
Check the health status of all PoA validator nodes.
|
|
|
|
Returns:
|
|
- Each validator's health status (healthy, degraded, unreachable)
|
|
- Timestamp of the check
|
|
- Number of healthy validators
|
|
"""
|
|
from ..blockchain_client import BlockchainClient
|
|
|
|
try:
|
|
async with BlockchainClient() as client:
|
|
await client.refresh_validator_status()
|
|
|
|
validators_status = []
|
|
for validator in client.validators:
|
|
validators_status.append({
|
|
"node_id": validator.node_id,
|
|
"rpc_url": validator.rpc_url,
|
|
"p2p_url": validator.p2p_url,
|
|
"status": validator.status.value
|
|
})
|
|
|
|
healthy_count = len(client.healthy_validators)
|
|
total_count = len(client.validators)
|
|
|
|
logger.info(f"Validator health check: {healthy_count}/{total_count} healthy")
|
|
|
|
return {
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"validators": validators_status,
|
|
"summary": {
|
|
"healthy": healthy_count,
|
|
"total": total_count,
|
|
"health_percentage": (healthy_count / total_count * 100) if total_count > 0 else 0
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking validator health: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error checking validator health: {str(e)}"
|
|
)
|
|
|
|
|
|
@router.post("/validators/refresh-status")
|
|
async def refresh_validator_status():
|
|
"""
|
|
Force a refresh of validator node health status.
|
|
|
|
Useful for immediate status checks without waiting for automatic intervals.
|
|
"""
|
|
from ..blockchain_client import BlockchainClient
|
|
|
|
try:
|
|
async with BlockchainClient() as client:
|
|
await client.refresh_validator_status()
|
|
|
|
validators_status = []
|
|
for validator in client.validators:
|
|
validators_status.append({
|
|
"node_id": validator.node_id,
|
|
"status": validator.status.value
|
|
})
|
|
|
|
logger.info("Validator status refreshed")
|
|
|
|
return {
|
|
"message": "Validator status refreshed",
|
|
"validators": validators_status,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error refreshing validator status: {e}", exc_info=True)
|
|
raise HTTPException(
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
|
|
detail=f"Error refreshing validator status: {str(e)}"
|
|
)
|
|
|
|
|
|
from datetime import datetime
|