Source code for kosmos.dqc_scheduling.remote_operation_detector

from dataclasses import dataclass

from kosmos.dqc_scheduling.space_time_matrix import TWO_QUBIT_GATE_SIZE, SpaceTimeMatrix
from kosmos.partitioning.partition import Partition
from kosmos.topology.node import QuantumNode


[docs] @dataclass class RemoteOperationInfo: """Information about a remote two-qubit gate. Attributes: source_node (QuantumNode): Node that initiates the remote operation. target_node (QuantumNode): Node that receives the remote operation. gate_time (int): Scheduled time or timestamp for the remote operation. qubit (int | None): Qubit index when the operation targets one qubit. Defaults to None. control_qubit (int | None): Index of the control qubit for the two-qubit gate. Defaults to None. target_qubit (int | None): Index of the target qubit for the two-qubit gate. Defaults to None. """ source_node: QuantumNode target_node: QuantumNode gate_time: int qubit: int | None = None control_qubit: int | None = None target_qubit: int | None = None
[docs] class RemoteOperationDetector: """Detects remote gates and required teleportations from space-time matrix. Teleportations occur when the qubit assignment changes during execution. Remote operations occur when a gate is applied to qubits assigned to a different node. """ def __init__(self, matrix: SpaceTimeMatrix) -> None: """Initialize remote operation detector. Args: matrix (SpaceTimeMatrix): Matrix containing qubit assignments over time. """ self._matrix: SpaceTimeMatrix = matrix self._partitions: dict[str, Partition] = matrix.partition_info
[docs] def detect_teleportations(self) -> list[RemoteOperationInfo]: """Detect teleportations by scanning the matrix for qubit transitions. A teleportation is needed when the same qubit is allocated on different QPUs in consecutive timesteps. Returns: list[RemoteOperationInfo]: All required teleportations with timing. """ teleportations = [] for qubit in range(self._matrix.num_qubits): prev_node = None for time in range(self._matrix.num_timesteps): current_node = self._matrix.matrix[qubit][time] if current_node is None: prev_node = None continue if prev_node is not None and current_node.id != prev_node.id: teleportations.append( RemoteOperationInfo( qubit=qubit, source_node=prev_node, target_node=current_node, gate_time=time, ) ) prev_node = current_node return teleportations
[docs] def detect_remote_gates(self) -> list[RemoteOperationInfo]: """Detect remote gates by analyzing partition circuits. A remote gate is needed when a multi-qubit gate operates on qubits allocated to different QPUs at gate's execution time. Returns: list[RemoteOperationInfo]: All remote gates with timing and node info. """ remote_gates = [] for partition_id, partition in self._partitions.items(): for gate_idx, instruction in enumerate(partition.circuit.data): circuit_qubits = [partition.circuit.find_bit(q).index for q in instruction.qubits] if len(circuit_qubits) != TWO_QUBIT_GATE_SIZE: continue circuit_q0, circuit_q1 = circuit_qubits logical_q0 = partition.logical_qubit_mapping.get(circuit_q0) logical_q1 = partition.logical_qubit_mapping.get(circuit_q1) if logical_q0 is None or logical_q1 is None: msg = ( f"Partition {partition_id}: Gate {gate_idx} uses unmapped qubits " f"(circuit qubits {circuit_q0}, {circuit_q1})" ) raise ValueError(msg) partition_duration = partition.end_time - partition.start_time circuit_depth = partition.circuit.depth() relative_time = ( int(gate_idx / len(partition.circuit.data) * circuit_depth) if circuit_depth > 0 else 0 ) gate_time = partition.start_time + min(relative_time, partition_duration - 1) node_q0 = self._matrix.get_node_at(logical_q0, gate_time) node_q1 = self._matrix.get_node_at(logical_q1, gate_time) if node_q0 is None or node_q1 is None: msg = ( f"Partition {partition_id}: Gate {gate_idx} at time {gate_time} " f"operates on unallocated qubits (q{logical_q0} on {node_q0}, " f"q{logical_q1} on {node_q1})." ) raise ValueError(msg) if node_q0 and node_q1 and node_q0.id != node_q1.id: remote_gates.append( RemoteOperationInfo( source_node=node_q0, target_node=node_q1, control_qubit=logical_q0, target_qubit=logical_q1, gate_time=gate_time, ) ) return remote_gates