Source code for kosmos.dqc_scheduling.remote_operation_detector
from dataclasses import dataclass
from kosmos.dqc_scheduling.space_time_matrix import TWO_QUBIT_GATE_SIZE, SpaceTimeMatrix
from kosmos.partitioning.partition import Partition
from kosmos.topology.node import QuantumNode
[docs]
@dataclass
class RemoteOperationInfo:
"""Information about a remote two-qubit gate.
Attributes:
source_node (QuantumNode): Node that initiates the remote operation.
target_node (QuantumNode): Node that receives the remote operation.
gate_time (int): Scheduled time or timestamp for the remote operation.
qubit (int | None): Qubit index when the operation targets one qubit. Defaults to None.
control_qubit (int | None): Index of the control qubit for the two-qubit gate.
Defaults to None.
target_qubit (int | None): Index of the target qubit for the two-qubit gate.
Defaults to None.
"""
source_node: QuantumNode
target_node: QuantumNode
gate_time: int
qubit: int | None = None
control_qubit: int | None = None
target_qubit: int | None = None
[docs]
class RemoteOperationDetector:
"""Detects remote gates and required teleportations from space-time matrix.
Teleportations occur when the qubit assignment changes during execution.
Remote operations occur when a gate is applied to qubits assigned to a different node.
"""
def __init__(self, matrix: SpaceTimeMatrix) -> None:
"""Initialize remote operation detector.
Args:
matrix (SpaceTimeMatrix): Matrix containing qubit assignments over time.
"""
self._matrix: SpaceTimeMatrix = matrix
self._partitions: dict[str, Partition] = matrix.partition_info
[docs]
def detect_teleportations(self) -> list[RemoteOperationInfo]:
"""Detect teleportations by scanning the matrix for qubit transitions.
A teleportation is needed when the same qubit is allocated on different QPUs in consecutive
timesteps.
Returns:
list[RemoteOperationInfo]: All required teleportations with timing.
"""
teleportations = []
for qubit in range(self._matrix.num_qubits):
prev_node = None
for time in range(self._matrix.num_timesteps):
current_node = self._matrix.matrix[qubit][time]
if current_node is None:
prev_node = None
continue
if prev_node is not None and current_node.id != prev_node.id:
teleportations.append(
RemoteOperationInfo(
qubit=qubit,
source_node=prev_node,
target_node=current_node,
gate_time=time,
)
)
prev_node = current_node
return teleportations
[docs]
def detect_remote_gates(self) -> list[RemoteOperationInfo]:
"""Detect remote gates by analyzing partition circuits.
A remote gate is needed when a multi-qubit gate operates on qubits allocated to different
QPUs at gate's execution time.
Returns:
list[RemoteOperationInfo]: All remote gates with timing and node info.
"""
remote_gates = []
for partition_id, partition in self._partitions.items():
for gate_idx, instruction in enumerate(partition.circuit.data):
circuit_qubits = [partition.circuit.find_bit(q).index for q in instruction.qubits]
if len(circuit_qubits) != TWO_QUBIT_GATE_SIZE:
continue
circuit_q0, circuit_q1 = circuit_qubits
logical_q0 = partition.logical_qubit_mapping.get(circuit_q0)
logical_q1 = partition.logical_qubit_mapping.get(circuit_q1)
if logical_q0 is None or logical_q1 is None:
msg = (
f"Partition {partition_id}: Gate {gate_idx} uses unmapped qubits "
f"(circuit qubits {circuit_q0}, {circuit_q1})"
)
raise ValueError(msg)
partition_duration = partition.end_time - partition.start_time
circuit_depth = partition.circuit.depth()
relative_time = (
int(gate_idx / len(partition.circuit.data) * circuit_depth)
if circuit_depth > 0
else 0
)
gate_time = partition.start_time + min(relative_time, partition_duration - 1)
node_q0 = self._matrix.get_node_at(logical_q0, gate_time)
node_q1 = self._matrix.get_node_at(logical_q1, gate_time)
if node_q0 is None or node_q1 is None:
msg = (
f"Partition {partition_id}: Gate {gate_idx} at time {gate_time} "
f"operates on unallocated qubits (q{logical_q0} on {node_q0}, "
f"q{logical_q1} on {node_q1})."
)
raise ValueError(msg)
if node_q0 and node_q1 and node_q0.id != node_q1.id:
remote_gates.append(
RemoteOperationInfo(
source_node=node_q0,
target_node=node_q1,
control_qubit=logical_q0,
target_qubit=logical_q1,
gate_time=gate_time,
)
)
return remote_gates