Source code for kosmos.protocols.remote_operation_protocol
from typing import Literal
from kosmos.protocols.config.protocol import RemoteOperationProtocolConfig
from kosmos.protocols.protocol import Protocol
from kosmos.protocols.protocol_result import CommunicationProtocolResult
from kosmos.protocols.status import ProtocolStatus
from kosmos.quantum_logic.quantum_register_manager import QuantumRegisterManager
from kosmos.topology.net import Network
from kosmos.topology.node import QuantumNode
[docs]
class RemoteOperationProtocol(Protocol):
"""Remote CNOT Protocol.
Takes an entanglement link between the two specified nodes
and increases the global clock by the estimated time.
Attributes:
quantum_manager (QuantumRegisterManager): The quantum register manager.
source_node (QuantumNode): The source node.
target_node (QuantumNode): The target node.
"""
def __init__(
self,
config: RemoteOperationProtocolConfig,
network: Network,
quantum_manager: QuantumRegisterManager,
source_node: QuantumNode,
target_node: QuantumNode,
) -> None:
"""Initialize the entanglement generation protocol.
Args:
config (RemoteOperationProtocolConfig): Entanglement generation protocol configuration.
network (Network): The network topology.
quantum_manager (QuantumRegisterManager): The quantum register manager.
source_node (QuantumNode): The source node.
target_node (QuantumNode): The target node.
"""
super().__init__(config, network)
self.config = config
self.quantum_manager = quantum_manager
self.source_node = source_node
self.target_node = target_node
def _calculate_time_estimate(
self, fidelity: float, operation_type: Literal["teleportation", "remote_cnot"]
) -> int:
"""Estimate total time for remote CNOT including expected retries.
Args:
fidelity (float): Fidelity of the entanglement.
operation_type (Literal["teleportation", "remote_cnot"]): Type of remote operation.
Returns:
int: Estimated time (in picoseconds) needed.
"""
success_prob = (
fidelity
* self.source_node.gate_fid
* self.source_node.meas_fid
* self.target_node.gate_fid
* self.target_node.meas_fid
)
if operation_type == "remote_cnot":
single_attempt = int(
2 * self.config.local_gate_time
+ self.config.measurement_time
+ self.config.classical_communication_delay
)
elif operation_type == "teleportation":
single_attempt = int(
self.config.local_gate_time
+ 2 * self.config.measurement_time
+ self.config.classical_communication_delay
)
else:
msg = f'Unknown operation_type: {operation_type}. Use "remote_cnot" or "teleportation"'
raise ValueError(msg)
if success_prob <= 0.0:
expected_attempts = self.config.max_retries
else:
expected_attempts = 1.0 / success_prob
expected_attempts = min(max(expected_attempts, 1.0), float(self.config.max_retries))
return int(single_attempt * expected_attempts)
[docs]
def execute(self) -> CommunicationProtocolResult:
"""Execute remote CNOT protocol.
Returns:
CommunicationProtocolResult: Result of the remote CNOT protocol.
"""
self.status = ProtocolStatus.RUNNING
found = self.quantum_manager.find_entanglement_between_nodes(
self.source_node, self.target_node
)
if found is None:
self.status = ProtocolStatus.FAILED
return CommunicationProtocolResult(self.status)
key = found
quantum_state = self.quantum_manager.states.get(key)
fidelity = quantum_state.fidelity if quantum_state is not None else 0.0
if fidelity == 0.0:
self.status = ProtocolStatus.FAILED
return CommunicationProtocolResult(self.status)
execution_time = self._calculate_time_estimate(fidelity, self.config.operation_type)
self.quantum_manager.remove_com_entanglement(list(key))
for qubit_id in key:
if qubit_id in self.quantum_manager.qubits:
self.quantum_manager.remove_qubit(qubit_id)
self.status = ProtocolStatus.SUCCESS
return CommunicationProtocolResult(
status=self.status,
execution_time=execution_time,
)