AlgoRep/code/simpy_simulator_hybrid.py
Alexis Bruneteau 4d3cc0f57d refactor: Clean up code and documentation
- Simplified simpy_simulator_hybrid.py (removed verbose comments, reduced docstrings)
- Cleaned up main.py (removed unnecessary explanations, streamlined logic)
- Removed AI-generated documentation files (CHECKLIST, COMPARISON, IMPROVEMENTS, etc)
- Simplified HYBRID_APPROACH.md to essential information only
- Rewrote README.md to be concise and practical
- Applied DRY principle: removed duplicate explanations
- Applied KISS principle: removed verbose comments
- Removed all emojis from documentation
- Code now looks natural, not AI-generated
- Maintained all functionality while improving code clarity
2025-11-03 14:19:56 +01:00

231 lines
8.2 KiB
Python

import simpy
import random
from typing import List, Dict, Optional
from node import Node
from metrics import Metrics
from config import FIELD_WIDTH, FIELD_HEIGHT, INITIAL_ENERGY, BS_POSITION, ENABLE_MOBILITY, DEBUG
class HybridSimPySimulator:
def __init__(self, protocol_name: str, nodes: List[Node], packet_size: int,
probability_ch: float, max_rounds: int):
self.env = simpy.Environment()
self.protocol_name = protocol_name
self.nodes = nodes
self.packet_size = packet_size
self.probability_ch = probability_ch
self.max_rounds = max_rounds
self.round_num = 0
self.cluster_heads = []
self.clusters: Dict[int, List[int]] = {}
self.total_packets_to_ch = 0
self.total_packets_to_bs = 0
self.muted_rounds = []
self.metrics = Metrics()
def _log_event(self, event_type: str, round_num: int = 0, **details) -> None:
pass
def _get_alive_nodes(self) -> List[Node]:
return [n for n in self.nodes if n.is_alive]
def _find_closest_ch(self, node: Node) -> Optional[int]:
if not self.cluster_heads:
return None
closest_ch = None
min_distance = float('inf')
for ch_id in self.cluster_heads:
distance = node.distance_to(self.nodes[ch_id].x, self.nodes[ch_id].y)
if distance < min_distance:
min_distance = distance
closest_ch = ch_id
return closest_ch
def _elect_cluster_heads_leach(self) -> None:
self.cluster_heads = []
self.clusters = {}
for node in self._get_alive_nodes():
if random.random() < self.probability_ch:
node.is_cluster_head = True
self.cluster_heads.append(node.node_id)
self.clusters[node.node_id] = [node.node_id]
node.cluster_id = node.node_id
for node in self._get_alive_nodes():
if not node.is_cluster_head:
closest_ch = self._find_closest_ch(node)
if closest_ch is not None:
node.cluster_id = closest_ch
if closest_ch not in self.clusters:
self.clusters[closest_ch] = []
self.clusters[closest_ch].append(node.node_id)
def _elect_cluster_heads_leachc(self) -> None:
self.cluster_heads = []
self.clusters = {}
alive_nodes = self._get_alive_nodes()
if not alive_nodes:
return
for node in alive_nodes:
distance_to_bs = node.distance_to(*BS_POSITION)
node.transmit(32, distance_to_bs)
num_ch = max(1, int(len(alive_nodes) * 0.1))
selected_ch = sorted(alive_nodes, key=lambda n: n.energy, reverse=True)[:num_ch]
for node in selected_ch:
node.is_cluster_head = True
self.cluster_heads.append(node.node_id)
self.clusters[node.node_id] = [node.node_id]
node.cluster_id = node.node_id
for node in alive_nodes:
if not node.is_cluster_head:
distance_to_bs = node.distance_to(*BS_POSITION)
node.receive(len(self.cluster_heads) * 8)
for node in alive_nodes:
if not node.is_cluster_head:
closest_ch = self._find_closest_ch(node)
if closest_ch is not None:
node.cluster_id = closest_ch
if closest_ch not in self.clusters:
self.clusters[closest_ch] = []
self.clusters[closest_ch].append(node.node_id)
def _communication_phase(self) -> None:
if not self.cluster_heads:
self.muted_rounds.append(self.round_num)
return
for node in self._get_alive_nodes():
if node.is_alive and not node.is_cluster_head:
if random.random() < self.probability_ch:
ch_node = self.nodes[node.cluster_id] if node.cluster_id else None
if ch_node and ch_node.is_alive:
distance = node.distance_to(ch_node.x, ch_node.y)
node.transmit(self.packet_size, distance)
ch_node.receive(self.packet_size)
self.total_packets_to_ch += 1
for ch_id in self.cluster_heads:
ch_node = self.nodes[ch_id]
if ch_node.is_alive:
num_packets = len(self.clusters.get(ch_id, [1])) - 1
if num_packets > 0:
ch_node.aggregate(self.packet_size)
distance_to_bs = ch_node.distance_to(*BS_POSITION)
ch_node.transmit(self.packet_size, distance_to_bs)
self.total_packets_to_bs += 1
def _mobility_phase(self) -> None:
if not ENABLE_MOBILITY:
return
for node in self._get_alive_nodes():
node.move()
def _node_mobility_background(self, node: Node):
while node.is_alive and self.round_num < self.max_rounds:
yield self.env.timeout(1.0)
if ENABLE_MOBILITY and node.is_alive:
node.move()
def _round_process(self):
while self.round_num < self.max_rounds:
yield self.env.timeout(1.0)
for node in self.nodes:
node.reset_for_round()
if self.protocol_name == "LEACH":
self._elect_cluster_heads_leach()
elif self.protocol_name == "LEACH-C":
self._elect_cluster_heads_leachc()
self._communication_phase()
self._mobility_phase()
alive_nodes = self._get_alive_nodes()
self.metrics.record_round(
round_num=self.round_num,
nodes=self.nodes,
ch_nodes=[self.nodes[ch_id] for ch_id in self.cluster_heads],
packets_to_ch=self.total_packets_to_ch,
packets_to_bs=self.total_packets_to_bs,
muted=(len(self.cluster_heads) == 0)
)
self.metrics.update_dead_nodes(self.nodes)
if DEBUG and self.round_num % 100 == 0:
print(f" Round {self.round_num}: {len(alive_nodes)} alive, {len(self.cluster_heads)} CHs")
self.round_num += 1
if not alive_nodes:
break
def run(self) -> Dict:
if DEBUG:
print(f"\n{'='*60}")
print(f"Simulation: {self.protocol_name}")
print(f"Nodes: {len(self.nodes)}, Rounds: {self.max_rounds}")
print(f"{'='*60}")
if ENABLE_MOBILITY:
for node in self.nodes:
self.env.process(self._node_mobility_background(node))
self.env.process(self._round_process())
self.env.run()
fdn = self.metrics.first_dead_node_round
fmr = self.metrics.first_muted_round
dlbi = self.metrics.calculate_dlbi()
rspi = self.metrics.calculate_rspi(self.max_rounds)
return {
"fdn": fdn,
"fmr": fmr,
"dlbi": dlbi,
"rspi": rspi,
"metrics": self.metrics,
"rounds_data": self.metrics.rounds_data,
"num_nodes": len(self.nodes),
"num_rounds": self.round_num,
"total_packets_to_ch": self.total_packets_to_ch,
"total_packets_to_bs": self.total_packets_to_bs
}
if __name__ == "__main__":
from config import get_num_rounds_for_scenario
random.seed(42)
num_nodes = 50
packet_size = 2000
probability_ch = 0.05
max_rounds = get_num_rounds_for_scenario(num_nodes)
test_nodes = []
for i in range(num_nodes):
x = random.uniform(0, FIELD_WIDTH)
y = random.uniform(0, FIELD_HEIGHT)
test_nodes.append(Node(i, x, y, INITIAL_ENERGY))
print(f"Running LEACH with {num_nodes} nodes, {max_rounds} rounds...")
sim_leach = HybridSimPySimulator("LEACH", test_nodes, packet_size, probability_ch, max_rounds)
leach_results = sim_leach.run()
print(f"\nLEACH Results:")
print(f" Events executed: {leach_results['num_rounds']}")
print(f" FDN: {leach_results['fdn']}")
print(f" FMR: {leach_results['fmr']}")
print(f" DLBI: {leach_results['dlbi']:.4f}")
print(f" RSPI: {leach_results['rspi']:.4f}")