E-Voting Developer dfdf159198 fix: ElGamal encryption, vote deduplication, and frontend data validation
- Fixed ElGamal class instantiation in votes.py (ElGamalEncryption instead of ElGamal)
- Fixed public key serialization in admin.py (use public_key_bytes property)
- Implemented database migration with SQL-based key generation
- Added vote deduplication endpoint: GET /api/votes/check
- Protected all array accesses with type validation in frontend
- Fixed vote parameter type handling (string to number conversion)
- Removed all debug console logs for production
- Created missing dynamic route for vote history details

Fixes:
- JavaScript error: "can't access property length, e is undefined"
- Vote deduplication not preventing form display
- Frontend data validation issues
- Missing dynamic routes
2025-11-08 00:05:19 +01:00

280 lines
9.4 KiB
Python

"""
Routes administrateur pour maintenance et configuration du système.
Admin endpoints for database maintenance and system configuration.
"""
from fastapi import APIRouter, HTTPException, status, Depends
from sqlalchemy.orm import Session
from sqlalchemy import text
from ..dependencies import get_db
from ..crypto.encryption import ElGamalEncryption
import base64
import logging
router = APIRouter(prefix="/api/admin", tags=["admin"])
logger = logging.getLogger(__name__)
@router.post("/fix-elgamal-keys")
async def fix_elgamal_keys(db: Session = Depends(get_db)):
"""
Fix missing ElGamal encryption parameters for elections.
Updates all elections that have NULL elgamal_p or elgamal_g to use p=23, g=5.
This is needed for the voting system to function properly.
"""
try:
logger.info("🔧 Starting ElGamal key fix...")
# Get current status
result = db.execute(text(
"SELECT COUNT(*) FROM elections WHERE elgamal_p IS NULL OR elgamal_g IS NULL"
))
count_before = result.scalar()
logger.info(f"Elections needing fix: {count_before}")
# Update elections with missing ElGamal parameters
db.execute(text(
"UPDATE elections SET elgamal_p = 23, elgamal_g = 5 WHERE elgamal_p IS NULL OR elgamal_g IS NULL"
))
db.commit()
# Verify the fix
result = db.execute(text(
"SELECT id, name, elgamal_p, elgamal_g FROM elections WHERE is_active = TRUE"
))
fixed_elections = []
for row in result:
fixed_elections.append({
"id": row[0],
"name": row[1],
"elgamal_p": row[2],
"elgamal_g": row[3]
})
logger.info(f"✓ Fixed {count_before} elections with ElGamal keys")
logger.info(f"Active elections with keys: {len(fixed_elections)}")
return {
"status": "success",
"message": f"Fixed {count_before} elections with ElGamal parameters",
"elgamal_p": 23,
"elgamal_g": 5,
"active_elections": fixed_elections
}
except Exception as e:
logger.error(f"✗ Error fixing ElGamal keys: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error fixing ElGamal keys: {str(e)}"
)
@router.get("/elections/elgamal-status")
async def check_elgamal_status(db: Session = Depends(get_db)):
"""
Check which elections have ElGamal parameters set.
Useful for diagnostics before voting.
"""
try:
result = db.execute(text(
"""
SELECT
id,
name,
is_active,
elgamal_p,
elgamal_g,
public_key,
CASE WHEN elgamal_p IS NOT NULL AND elgamal_g IS NOT NULL AND public_key IS NOT NULL THEN 'ready' ELSE 'incomplete' END as status
FROM elections
ORDER BY is_active DESC, id ASC
"""
))
elections = []
incomplete_count = 0
ready_count = 0
for row in result:
status_val = "ready" if row[3] and row[4] and row[5] else "incomplete"
elections.append({
"id": row[0],
"name": row[1],
"is_active": row[2],
"elgamal_p": row[3],
"elgamal_g": row[4],
"has_public_key": row[5] is not None,
"status": status_val
})
if status_val == "incomplete":
incomplete_count += 1
else:
ready_count += 1
return {
"total_elections": len(elections),
"ready_for_voting": ready_count,
"incomplete": incomplete_count,
"elections": elections
}
except Exception as e:
logger.error(f"Error checking ElGamal status: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error checking status: {str(e)}"
)
@router.post("/init-election-keys")
async def init_election_keys(election_id: int, db: Session = Depends(get_db)):
"""
Initialize ElGamal public keys for an election.
Generates a public key for voting encryption if not already present.
"""
try:
# Get the election
from .. import models
election = db.query(models.Election).filter(models.Election.id == election_id).first()
if not election:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Election {election_id} not found"
)
logger.info(f"Initializing keys for election {election_id}: {election.name}")
# Generate ElGamal public key if missing or invalid
pubkey_is_invalid = False
if election.public_key:
try:
pubkey_str = election.public_key.decode('utf-8') if isinstance(election.public_key, bytes) else str(election.public_key)
# Check if it's valid (should be "p:g:h" format, not "pk_ongoing_X")
if not ':' in pubkey_str or pubkey_str.startswith('pk_') or pubkey_str.startswith('b\''):
pubkey_is_invalid = True
except:
pubkey_is_invalid = True
if not election.public_key or pubkey_is_invalid:
logger.info(f"Generating ElGamal public key for election {election_id}")
elgamal = ElGamalEncryption(p=election.elgamal_p or 23, g=election.elgamal_g or 5)
# Use the property that returns properly formatted bytes "p:g:h"
election.public_key = elgamal.public_key_bytes
db.commit()
logger.info(f"✓ Generated public key for election {election_id}")
else:
logger.info(f"Election {election_id} already has valid public key")
return {
"status": "success",
"election_id": election_id,
"election_name": election.name,
"elgamal_p": election.elgamal_p,
"elgamal_g": election.elgamal_g,
"public_key_generated": True,
"public_key": base64.b64encode(election.public_key).decode() if election.public_key else None
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error initializing election keys: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error initializing election keys: {str(e)}"
)
@router.get("/validators/health")
async def check_validators_health():
"""
Check the health status of all PoA validator nodes.
Returns:
- Each validator's health status (healthy, degraded, unreachable)
- Timestamp of the check
- Number of healthy validators
"""
from ..blockchain_client import BlockchainClient
try:
async with BlockchainClient() as client:
await client.refresh_validator_status()
validators_status = []
for validator in client.validators:
validators_status.append({
"node_id": validator.node_id,
"rpc_url": validator.rpc_url,
"p2p_url": validator.p2p_url,
"status": validator.status.value
})
healthy_count = len(client.healthy_validators)
total_count = len(client.validators)
logger.info(f"Validator health check: {healthy_count}/{total_count} healthy")
return {
"timestamp": datetime.utcnow().isoformat(),
"validators": validators_status,
"summary": {
"healthy": healthy_count,
"total": total_count,
"health_percentage": (healthy_count / total_count * 100) if total_count > 0 else 0
}
}
except Exception as e:
logger.error(f"Error checking validator health: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error checking validator health: {str(e)}"
)
@router.post("/validators/refresh-status")
async def refresh_validator_status():
"""
Force a refresh of validator node health status.
Useful for immediate status checks without waiting for automatic intervals.
"""
from ..blockchain_client import BlockchainClient
try:
async with BlockchainClient() as client:
await client.refresh_validator_status()
validators_status = []
for validator in client.validators:
validators_status.append({
"node_id": validator.node_id,
"status": validator.status.value
})
logger.info("Validator status refreshed")
return {
"message": "Validator status refreshed",
"validators": validators_status,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error refreshing validator status: {e}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Error refreshing validator status: {str(e)}"
)
from datetime import datetime