""" Implémentation du protocole LEACH (Low-Energy Adaptive Clustering Hierarchy) """ import random import math from node import Node from metrics import Metrics from config import BS_POSITION class LEACH: """ Implémentation de LEACH en version décentralisée. Chaque nœud a une probabilité p d'être élu cluster head. """ def __init__(self, nodes, probability_ch, packet_size): """ Initialise le protocole LEACH. Args: nodes (list): Liste des nœuds probability_ch (float): Probabilité d'élection comme CH packet_size (int): Taille des paquets en bits """ self.nodes = nodes self.probability_ch = probability_ch self.packet_size = packet_size self.metrics = Metrics() self.ch_nodes = [] self.round_num = 0 def elect_cluster_heads(self): """ Élection aléatoire des cluster heads. Chaque nœud vivant a probabilité p de devenir CH. """ self.ch_nodes = [] for node in self.nodes: if node.is_alive: if random.random() < self.probability_ch: node.is_cluster_head = True self.ch_nodes.append(node) return len(self.ch_nodes) > 0 def form_clusters(self): """ Formation des clusters : chaque nœud non-CH rejoigne le CH le plus proche. """ # Reset cluster affiliation for node in self.nodes: if not node.is_cluster_head: node.cluster_id = None # Assignation des nœuds aux CHs for node in self.nodes: if not node.is_alive or node.is_cluster_head: continue # Trouver le CH le plus proche closest_ch = None min_distance = float('inf') for ch in self.ch_nodes: if ch.is_alive: dist = node.distance_to(ch.x, ch.y) if dist < min_distance: min_distance = dist closest_ch = ch if closest_ch: node.cluster_id = closest_ch.node_id def communication_phase(self): """ Phase de communication : - Les nœuds envoient les données à leur CH - Les CHs agrègent et envoient à la BS Returns: tuple: (packets_to_ch, packets_to_bs, total_energy_consumed) """ packets_to_ch = 0 packets_to_bs = 0 # Phase 1 : Nœuds → CHs for node in self.nodes: if not node.is_alive or node.is_cluster_head: continue # Déterminer si le nœud doit envoyer (probabilité p) if random.random() < self.probability_ch: # Utilisé comme probabilité d'activité closest_ch = None min_distance = float('inf') for ch in self.ch_nodes: if ch.is_alive: dist = node.distance_to(ch.x, ch.y) if dist < min_distance: min_distance = dist closest_ch = ch if closest_ch: # Transmission du nœud au CH energy_tx = node.transmit(self.packet_size, min_distance) # Réception au CH energy_rx = closest_ch.receive(self.packet_size) packets_to_ch += 1 # Phase 2 : CHs → BS (après agrégation) for ch in self.ch_nodes: if not ch.is_alive: continue # Compter les nœuds dans le cluster cluster_members = sum(1 for n in self.nodes if n.cluster_id == ch.node_id and n.is_alive) if cluster_members > 0: # Agrégation des données reçues aggregated_bits = cluster_members * self.packet_size energy_agg = ch.aggregate(aggregated_bits) # Transmission à la BS distance_to_bs = ch.distance_to(BS_POSITION[0], BS_POSITION[1]) energy_tx_bs = ch.transmit(aggregated_bits, distance_to_bs) packets_to_bs += 1 return packets_to_ch, packets_to_bs def run_round(self): """ Exécute une ronde complète du protocole LEACH. Returns: bool: True si la ronde a réussi, False si muted (pas de CH) """ self.round_num += 1 # Reset pour la nouvelle ronde for node in self.nodes: node.reset_for_round() # Phase 1 : Élection des CHs ch_elected = self.elect_cluster_heads() if not ch_elected: # Muted round - pas de CH self.metrics.record_round(self.round_num, self.nodes, [], 0, 0, muted=True) self.metrics.update_dead_nodes(self.nodes, self.round_num) return False # Phase 2 : Formation des clusters self.form_clusters() # Phase 3 : Communication packets_to_ch, packets_to_bs = self.communication_phase() # Phase 4 : Mobilité - déplacement aléatoire des nœuds for node in self.nodes: if node.is_alive: node.move() # Enregistrement des métriques self.metrics.record_round(self.round_num, self.nodes, self.ch_nodes, packets_to_ch, packets_to_bs, muted=False) self.metrics.update_dead_nodes(self.nodes, self.round_num) return True def run_simulation(self, num_rounds): """ Lance la simulation complète. Args: num_rounds (int): Nombre de rounds à simuler """ for _ in range(num_rounds): # Vérifier s'il y a encore des nœuds vivants alive_count = sum(1 for n in self.nodes if n.is_alive) if alive_count == 0: break self.run_round() def get_metrics(self, total_rounds): """Retourne les métriques de la simulation.""" return self.metrics.get_summary(total_rounds) def get_detailed_metrics(self): """Retourne les données détaillées de chaque round.""" return self.metrics.get_rounds_data()