Source code for kosmos.protocols.eg_protocol
import math
from kosmos.protocols.config.protocol import EGProtocolConfig, RoutingProtocolConfig
from kosmos.protocols.protocol import Protocol, ProtocolStatus
from kosmos.protocols.protocol_result import CommunicationProtocolResult, RoutingProtocolResult
from kosmos.protocols.routing.dijkstra_routing import DijkstraRoutingProtocol
from kosmos.protocols.routing.path import Path
from kosmos.quantum_logic.quantum_register_manager import QuantumRegisterManager
from kosmos.quantum_logic.qubit import QubitId, QubitType
from kosmos.topology.link import LinkType, QuantumLink
from kosmos.topology.net import Network
from kosmos.topology.node import QuantumNode
def _calculate_time_estimate(path: Path, success_probability: float) -> int:
"""Calculate estimated time for entanglement generation including retries.
Args:
path (Path): Path between nodes to be entangled.
success_probability (float): Probability of successful entanglement.
Returns:
int: Estimated time in picoseconds needed.
"""
base_time = sum(link.delay for link in path.links if isinstance(link, QuantumLink))
n_swaps = max(0, len(path.nodes) - 2)
swap_overhead = n_swaps * (5 + len(path.links))
expected_attempts = (
1.0 / success_probability if success_probability > 0 else len(path.nodes) * 2
)
expected_attempts = min(max(expected_attempts, 1), len(path.nodes) * 2)
return int((base_time + swap_overhead) * expected_attempts)
def _get_link_success_probability(link: QuantumLink) -> float:
"""Get the success probability of generating an entanglement along a certain path.
Exponential decay with distance is incorporated.
Args:
link (QuantumLink): Certain quantum link in the network.
Returns:
float: Success probability of entanglement generation along given path.
"""
transmission_prob = 1.0 - link.loss
probability = transmission_prob * link.polarization_fidelity
distance_factor = math.exp(-0.0001 * link.distance)
return probability * distance_factor
[docs]
class EGProtocol(Protocol):
"""Entanglement generation protocol.
This protocol allocates communication qubits at the source and target nodes,
then creates entanglement between them based on the quantum link properties.
Attributes:
quantum_manager (QuantumRegisterManager): The quantum register manager.
source_node (QuantumNode): The source node.
target_node (QuantumNode): The target node.
"""
def __init__(
self,
config: EGProtocolConfig,
network: Network,
quantum_manager: QuantumRegisterManager,
source_node: QuantumNode,
target_node: QuantumNode,
) -> None:
"""Initialize the entanglement generation protocol.
Args:
config (EGProtocolConfig): Entanglement generation protocol configuration.
network (Network): The network topology.
quantum_manager (QuantumRegisterManager): The quantum register manager.
source_node (QuantumNode): The source node.
target_node (QuantumNode): The target node.
"""
super().__init__(config, network)
self.config = config
self.quantum_manager = quantum_manager
self.source_node = source_node
self.target_node = target_node
def _get_path(self) -> RoutingProtocolResult:
"""Get Path between two nodes.
Returns:
RoutingProtocolResult: Result of the routing protocol.
"""
routing_config = RoutingProtocolConfig(
allowed_link_types=[LinkType.QUANTUM], cost_function="cost"
)
routing_protocol = DijkstraRoutingProtocol(
routing_config, self.network, self.source_node, self.target_node
)
return routing_protocol.execute()
def _calculate_success_probability(self, path: Path) -> float:
"""Calculate the success probability of entanglement generation between two nodes.
Args:
path (Path): Path between nodes that will be entangled.
Returns:
float: Probability of successful entanglement generation.
"""
link_probabilities = [
_get_link_success_probability(link)
for link in path.links
if isinstance(link, QuantumLink)
]
entanglement_success = 1.0
for probability in link_probabilities:
entanglement_success *= probability
n_swaps = max(0, len(path.nodes) - 2)
swap_success = self.config.swap_success_rate**n_swaps
return entanglement_success * swap_success
def _create_entanglement(self, fidelity: float) -> None:
"""Create Bell pair and register it in the manager.
Args:
fidelity (float): Fidelity of Bell pair.
"""
protocol_id = id(self)
qubit_ids = [
QubitId(f"EG_{protocol_id}_{self.source_node.id}"),
QubitId(f"EG_{protocol_id}_{self.target_node.id}"),
]
self.quantum_manager.allocate_bell_pair(
[self.source_node, self.target_node],
qubit_ids,
fidelity,
)
def _get_used_communication_qubits(self, node: QuantumNode) -> int:
"""Get count of qubits currently in use at a node.
Args:
node (QuantumNode): Reference to the node.
Returns:
int: Number of communication qubits allocated to a node.
"""
qubit_ids_at_node = self.quantum_manager.get_qubits_by_node(node)
communication_count = 0
for qubit_id in qubit_ids_at_node:
qubit = self.quantum_manager.qubits.get(qubit_id)
if qubit and qubit.qubit_type == QubitType.COMMUNICATION:
communication_count += 1
return communication_count
[docs]
def execute(self) -> CommunicationProtocolResult:
"""Execute the entanglement generation protocol.
Returns:
CommunicationProtocolResult: Result of the entanglement generation protocol execution.
"""
self.status = ProtocolStatus.RUNNING
if self.source_node.communication_qubits <= self._get_used_communication_qubits(
self.source_node
) or self.target_node.communication_qubits <= self._get_used_communication_qubits(
self.target_node
):
self.status = ProtocolStatus.FAILED
return CommunicationProtocolResult(status=self.status)
routing_result = self._get_path()
if routing_result.path is None:
self.status = ProtocolStatus.FAILED
return CommunicationProtocolResult(self.status)
path = routing_result.path
success_prob = self._calculate_success_probability(path)
time_estimate = _calculate_time_estimate(path, success_prob)
execution_time = time_estimate
fidelity = 1.0 - (0.05 * len(path.links))
while fidelity < self.config.fidelity_threshold:
execution_time += time_estimate
fidelity += self.config.purification_improvement
fidelity = min(1.0, fidelity)
self._create_entanglement(fidelity)
self.status = ProtocolStatus.SUCCESS
return CommunicationProtocolResult(
status=self.status,
execution_time=execution_time,
)