Source code for kosmos.protocols.remote_operation_protocol

from typing import Literal

from kosmos.protocols.config.protocol import RemoteOperationProtocolConfig
from kosmos.protocols.protocol import Protocol
from kosmos.protocols.protocol_result import CommunicationProtocolResult
from kosmos.protocols.status import ProtocolStatus
from kosmos.quantum_logic.quantum_register_manager import QuantumRegisterManager
from kosmos.topology.net import Network
from kosmos.topology.node import QuantumNode


[docs] class RemoteOperationProtocol(Protocol): """Remote CNOT Protocol. Takes an entanglement link between the two specified nodes and increases the global clock by the estimated time. Attributes: quantum_manager (QuantumRegisterManager): The quantum register manager. source_node (QuantumNode): The source node. target_node (QuantumNode): The target node. """ def __init__( self, config: RemoteOperationProtocolConfig, network: Network, quantum_manager: QuantumRegisterManager, source_node: QuantumNode, target_node: QuantumNode, ) -> None: """Initialize the entanglement generation protocol. Args: config (RemoteOperationProtocolConfig): Entanglement generation protocol configuration. network (Network): The network topology. quantum_manager (QuantumRegisterManager): The quantum register manager. source_node (QuantumNode): The source node. target_node (QuantumNode): The target node. """ super().__init__(config, network) self.config = config self.quantum_manager = quantum_manager self.source_node = source_node self.target_node = target_node def _calculate_time_estimate( self, fidelity: float, operation_type: Literal["teleportation", "remote_cnot"] ) -> int: """Estimate total time for remote CNOT including expected retries. Args: fidelity (float): Fidelity of the entanglement. operation_type (Literal["teleportation", "remote_cnot"]): Type of remote operation. Returns: int: Estimated time (in picoseconds) needed. """ success_prob = ( fidelity * self.source_node.gate_fid * self.source_node.meas_fid * self.target_node.gate_fid * self.target_node.meas_fid ) if operation_type == "remote_cnot": single_attempt = int( 2 * self.config.local_gate_time + self.config.measurement_time + self.config.classical_communication_delay ) elif operation_type == "teleportation": single_attempt = int( self.config.local_gate_time + 2 * self.config.measurement_time + self.config.classical_communication_delay ) else: msg = f'Unknown operation_type: {operation_type}. Use "remote_cnot" or "teleportation"' raise ValueError(msg) if success_prob <= 0.0: expected_attempts = self.config.max_retries else: expected_attempts = 1.0 / success_prob expected_attempts = min(max(expected_attempts, 1.0), float(self.config.max_retries)) return int(single_attempt * expected_attempts)
[docs] def execute(self) -> CommunicationProtocolResult: """Execute remote CNOT protocol. Returns: CommunicationProtocolResult: Result of the remote CNOT protocol. """ self.status = ProtocolStatus.RUNNING found = self.quantum_manager.find_entanglement_between_nodes( self.source_node, self.target_node ) if found is None: self.status = ProtocolStatus.FAILED return CommunicationProtocolResult(self.status) key = found quantum_state = self.quantum_manager.states.get(key) fidelity = quantum_state.fidelity if quantum_state is not None else 0.0 if fidelity == 0.0: self.status = ProtocolStatus.FAILED return CommunicationProtocolResult(self.status) execution_time = self._calculate_time_estimate(fidelity, self.config.operation_type) self.quantum_manager.remove_com_entanglement(list(key)) for qubit_id in key: if qubit_id in self.quantum_manager.qubits: self.quantum_manager.remove_qubit(qubit_id) self.status = ProtocolStatus.SUCCESS return CommunicationProtocolResult( status=self.status, execution_time=execution_time, )