Routing Examples

Examples demonstrating network routing algorithms.


Dijkstra

dijkstra.py
 1from kosmos.protocols.config.protocol import RoutingProtocolConfig
 2from kosmos.protocols.routing.dijkstra_routing import DijkstraRoutingProtocol
 3from kosmos.topology.link import ClassicalLink, LinkId, LinkType, OpticalLink, QuantumLink
 4from kosmos.topology.net import Network
 5from kosmos.topology.node import ClassicalNode, NodeId, NodeRole, QuantumNode
 6
 7NODES = [
 8    ("A", ClassicalNode, {"roles": [NodeRole.END_USER]}),
 9    ("QB", QuantumNode, {"roles": [NodeRole.ROUTER], "num_qubits": 10, "coherence_time": 1.0}),
10    ("C", ClassicalNode, {"roles": [NodeRole.ROUTER]}),
11    ("D", ClassicalNode, {"roles": [NodeRole.ROUTER]}),
12    ("QE", QuantumNode, {"roles": [NodeRole.END_USER], "num_qubits": 5, "coherence_time": 0.5}),
13    ("F", ClassicalNode, {"roles": [NodeRole.ROUTER]}),
14]
15
16LINKS = [
17    ("AB", "A", "QB", ClassicalLink, {"distance": 10000.0, "bandwidth": 1e9}),
18    ("AC", "A", "C", ClassicalLink, {"distance": 5000.0, "bandwidth": 1e9}),
19    ("AD", "A", "D", ClassicalLink, {"distance": 15000.0, "bandwidth": 1e9}),
20    ("CD", "C", "D", ClassicalLink, {"distance": 8000.0, "bandwidth": 1e9}),
21    ("DF", "D", "F", ClassicalLink, {"distance": 12000.0, "bandwidth": 1e9}),
22    ("EF", "QE", "F", ClassicalLink, {"distance": 900000.0, "bandwidth": 1e9}),
23    ("QBE", "QB", "QE", QuantumLink, {"distance": 2000.0, "repetition_rate": 1e6}),
24]
25
26
27def create_example_network() -> Network:
28    """Create an example network."""
29    network = Network()
30    nodes = {}
31
32    # Create nodes
33    for node_id, node_class, params in NODES:
34        defaults = {"has_transceiver": True} if node_class == QuantumNode else {}
35        node = node_class(id=NodeId(node_id), **defaults, **params)
36        network.add_node(node)
37        nodes[node_id] = node
38
39    # Create links
40    for link_id, src_id, dst_id, link_class, params in LINKS:
41        defaults = {"attenuation": 0.0002, "signal_speed": 200.0}
42        if link_class == QuantumLink:
43            defaults["polarization_fidelity"] = 0.99
44        link = link_class(
45            id=LinkId(link_id), src=nodes[src_id], dst=nodes[dst_id], **defaults, **params
46        )
47        network.add_link(link)
48
49    return network
50
51
52def main() -> None:
53    """Run Dijkstra routing protocol example."""
54    network = create_example_network()
55
56    # Run Dijkstra from A to QE
57    source, target = network.get_node("A"), network.get_node("QE")
58
59    config_cost = RoutingProtocolConfig(
60        allowed_link_types=[LinkType.QUANTUM, LinkType.CLASSICAL], cost_function="cost"
61    )
62    result_cost = DijkstraRoutingProtocol(config_cost, network, source, target).execute()
63
64    config_distance = RoutingProtocolConfig(
65        allowed_link_types=[LinkType.QUANTUM, LinkType.CLASSICAL], cost_function="distance"
66    )
67    result_distance = DijkstraRoutingProtocol(config_distance, network, source, target).execute()
68
69    output = ["\n=== Nodes ==="]
70    output.extend(f"{node.id}: {node.__class__.__name__}" for node in network.nodes())
71
72    output.append("\n=== Links ===")
73    for link in network.links():
74        if not isinstance(link, OpticalLink):
75            msg = "Only optical links are supported."
76            raise TypeError(msg)
77        output.append(
78            f"{link.id}: {link.src.id} <--> {link.dst.id} ({link.__class__.__name__}); "
79            f"Dist: {link.distance:.4f}, Cost: {link.weight:.4g}"
80        )
81
82    output.append("\n=== Dijkstra Routing (by Cost) ===")
83    output.append(str(result_cost))
84
85    output.append("\n=== Dijkstra Routing (by Distance) ===")
86    output.append(str(result_distance))
87
88    print("\n".join(output))  # noqa: T201
89
90
91if __name__ == "__main__":
92    main()

Custom Routing Protocol

This example demonstrates how to implement and use a custom routing protocol in KOSMOS.

create_own_protocol.py
  1from kosmos.protocols.config.protocol import RoutingProtocolConfig
  2from kosmos.protocols.protocol_result import RoutingProtocolResult
  3from kosmos.protocols.routing.path import Path
  4from kosmos.protocols.routing.routing import RoutingProtocol
  5from kosmos.protocols.status import ProtocolStatus
  6from kosmos.topology.link import ClassicalLink, LinkId, LinkType
  7from kosmos.topology.net import Network
  8from kosmos.topology.node import ClassicalNode, NodeId, NodeRole
  9
 10
 11class DirectNeighborRoutingProtocol(RoutingProtocol):
 12    """Succeeds only if source and target share a direct allowed link."""
 13
 14    def execute(self) -> RoutingProtocolResult:
 15        """Execute the direct neighbor routing protocol."""
 16        self.status = ProtocolStatus.RUNNING
 17
 18        # Iterate over the outgoing links of the source node to find the target node
 19        for link in self.network.outgoing_links(self.source_node):
 20            # Check if the type of the link is allowed
 21            if link.type not in self.config.allowed_link_types:
 22                continue
 23
 24            # Determine the node on the other end of this link (relative to the source)
 25            neighbor = link.dst if link.src == self.source_node else link.src
 26
 27            # Only accept the link if it leads directly to the target
 28            if neighbor != self.target_node:
 29                continue
 30
 31            # Create the path and calculate the total cost
 32            path = Path(nodes=[self.source_node, self.target_node], links=[link])
 33            total_cost = self._link_cost(link)
 34
 35            # Set the status to SUCCESS
 36            self.status = ProtocolStatus.SUCCESS
 37
 38            # Build the result object for this protocol execution
 39            # execute() must return a RoutingProtocolResult
 40            return RoutingProtocolResult(
 41                status=self.status,
 42                path=path,
 43                total_cost=total_cost,
 44                total_distance=None,
 45            )
 46
 47        self.status = ProtocolStatus.FAILED
 48        return RoutingProtocolResult(
 49            status=self.status,
 50            path=None,
 51            total_cost=None,
 52            total_distance=None,
 53        )
 54
 55
 56def create_network() -> Network:
 57    """Create a simple network with three nodes and one link."""
 58    network = Network()
 59
 60    node_a = ClassicalNode(id=NodeId("A"), roles=[NodeRole.END_USER])
 61    node_b = ClassicalNode(id=NodeId("B"), roles=[NodeRole.ROUTER])
 62    node_c = ClassicalNode(id=NodeId("C"), roles=[NodeRole.END_USER])
 63
 64    network.add_node(node_a)
 65    network.add_node(node_b)
 66    network.add_node(node_c)
 67
 68    link_ab = ClassicalLink(
 69        id=LinkId("AB"),
 70        src=node_a,
 71        dst=node_b,
 72        distance=5000.0,
 73        bandwidth=1e9,
 74        attenuation=0.0002,
 75        signal_speed=200.0,
 76    )
 77    network.add_link(link_ab)
 78
 79    return network
 80
 81
 82def execute_protocol() -> None:
 83    """Execute the custom routing protocol."""
 84    network = create_network()
 85    node_a, node_b = network.get_node("A"), network.get_node("B")
 86
 87    # Create protocol configuration
 88    config = RoutingProtocolConfig(
 89        allowed_link_types=[LinkType.CLASSICAL],
 90        cost_function="cost",
 91    )
 92
 93    # Execute protocol
 94    routing_protocol = DirectNeighborRoutingProtocol(config, network, node_a, node_b)
 95    result = routing_protocol.execute()
 96
 97    output = f"Routing from {node_a.id} to {node_b.id}:"
 98    output += f"\nStatus: {result.status}"
 99    if result.path:
100        output += f"\nPath: {' -> '.join([str(n.id) for n in result.path.nodes])}"
101        output += f"\nCost: {result.total_cost}"
102
103    print(output)  # noqa: T201
104
105
106if __name__ == "__main__":
107    execute_protocol()