Source code for kosmos.protocols.routing.dijkstra_routing
import heapq
import itertools
import math
from collections import deque
from kosmos.protocols.protocol_result import RoutingProtocolResult
from kosmos.protocols.routing.path import Path
from kosmos.protocols.routing.routing import RoutingProtocol
from kosmos.protocols.status import ProtocolStatus
from kosmos.topology.link import Link
from kosmos.topology.node import Node
[docs]
class DijkstraRoutingProtocol(RoutingProtocol):
"""Dijkstra routing protocol."""
[docs]
def execute(self) -> RoutingProtocolResult:
"""Execute the Dijkstra routing protocol.
Returns:
RoutingProtocolResult: Result of the routing protocol execution.
"""
self.status = ProtocolStatus.RUNNING
visited_nodes = set()
cost_from_src: dict[Node, float] = dict.fromkeys(self.network.nodes(), math.inf)
cost_from_src[self.source_node] = 0.0
# Track predecessor nodes and the link used to reach them
predecessor: dict[Node, tuple[Node, Link]] = {}
# Include push_seq for tiebreaking; earlier pushes are preferred if cost is equal
push_seq = itertools.count()
heap = [(0.0, next(push_seq), self.source_node)]
while heap:
current_cost_from_src, _, current_node = heapq.heappop(heap)
if current_cost_from_src > cost_from_src[current_node]:
continue
if current_node == self.target_node:
break
visited_nodes.add(current_node)
for link in self.network.outgoing_links(current_node):
if link.type not in self.config.allowed_link_types:
continue
neighbor = link.dst if link.src == current_node else link.src
if neighbor in visited_nodes:
continue
tentative_cost = current_cost_from_src + self._link_cost(link)
if tentative_cost < cost_from_src[neighbor]:
cost_from_src[neighbor] = tentative_cost
predecessor[neighbor] = (current_node, link)
heapq.heappush(heap, (tentative_cost, next(push_seq), neighbor))
if self.target_node not in predecessor and self.target_node != self.source_node:
# No path found
self.status = ProtocolStatus.FAILED
return RoutingProtocolResult(
status=self.status, path=None, total_cost=None, total_distance=None
)
# Build the path by following predecessors
node_path: deque[Node] = deque()
link_path: deque[Link] = deque()
total_cost = 0.0
total_distance = 0.0
current = self.target_node
while current != self.source_node:
prev_node, used_link = predecessor[current]
node_path.appendleft(current)
link_path.appendleft(used_link)
total_cost += self._link_cost(used_link)
total_distance += self._link_distance(used_link)
current = prev_node
node_path.appendleft(self.source_node)
path = Path(list(node_path), list(link_path))
self.status = ProtocolStatus.SUCCESS
return RoutingProtocolResult(
status=self.status,
path=path,
total_cost=total_cost,
total_distance=total_distance,
)