from kosmos.dqc_scheduling.assignment_strategies.assignment_strategy import AssignmentStrategy
from kosmos.dqc_scheduling.space_time_matrix import SpaceTimeMatrix
from kosmos.partitioning.partition import Partition
from kosmos.topology.node import Node, NodeRole, QuantumNode
def _estimate_duration(partition: Partition) -> int:
"""Estimate execution duration for partition.
Args:
partition (Partition): Partition to estimate.
Returns:
int: Estimated timesteps needed.
"""
return partition.circuit.depth()
def _update_availability(availability: dict[str, dict[int, int]], partition: Partition) -> None:
"""Update node availability after allocating partition.
Args:
availability (dict[str, dict[int, int]]): Availability tracker to update.
partition (Partition): Allocated partition.
"""
if partition.is_multi_node:
# Multi-node assignment: update each node based on qubit assignment
for node in partition.qubit_to_node_mapping.values():
node_id = str(node.id)
for t in range(partition.start_time, partition.end_time):
if t in availability[node_id]:
availability[node_id][t] -= 1
else:
node_id = str(partition.node.id)
qubits_used = len(partition.logical_qubit_mapping)
for t in range(partition.start_time, partition.end_time):
if t in availability[node_id]:
availability[node_id][t] -= qubits_used
def _add_partition_to_matrix(matrix: SpaceTimeMatrix, partition: Partition) -> None:
"""Add allocated partition to matrix.
Args:
matrix (SpaceTimeMatrix): Reference to the matrix.
partition (Partition): Partition we add to the matrix.
"""
matrix.partition_info[partition.id] = partition
for time_step in range(partition.start_time, partition.end_time):
for logical_qubit in partition.logical_qubit_mapping.values():
if logical_qubit >= matrix.num_qubits:
msg = (
f"Partition {partition.id} has qubit {logical_qubit} "
f"which exceeds matrix capacity of {matrix.num_qubits}."
)
raise ValueError(msg)
if time_step >= matrix.num_timesteps or time_step < 0:
msg = (
f"Partition {partition.id} extends to timestep {time_step} "
f"which exceeds matrix capacity of {matrix.num_timesteps}."
)
raise ValueError(msg)
if partition.is_multi_node:
node = partition.get_node_for_qubit(logical_qubit)
else:
node = partition.node
matrix.matrix[logical_qubit][time_step] = node
[docs]
class GreedyAssignment(AssignmentStrategy):
"""Greedy assignment: assigns partition to nodes with capacity at earliest time."""
[docs]
def allocate(self, partition: Partition) -> SpaceTimeMatrix:
"""Allocate partition greedily to nodes at earliest available time.
Strategy: For each qubit, assign it to nodes in ascending ID order (QPU_1, QPU_2, QPU_3).
Args:
partition (Partition): Partition to allocate.
Returns:
SpaceTimeMatrix: Matrix with node assignments and timing.
"""
max_qubits_needed = len(partition.logical_qubit_mapping.values())
max_timesteps = partition.circuit.depth()
node_availability = self._initialize_node_availability(max_timesteps * 2)
completion_times: dict[str, int] = {}
max_end_time = 0
earliest_start = 0
node, start_time = self._find_earliest_slot(partition, node_availability, earliest_start)
if node is not None:
partition.node = node
partition.start_time = start_time
else:
qubit_assignment, start_time = self._find_earliest_multi_node_slot(
partition, node_availability, earliest_start
)
if qubit_assignment is None:
msg = (
f"Cannot allocate partition {partition.id}. "
"Insufficient resources across all nodes."
)
raise RuntimeError(msg)
partition.qubit_to_node_mapping = qubit_assignment
partition.node = None
partition.start_time = start_time
max_end_time = max(max_end_time, partition.end_time)
completion_times[partition.id] = partition.end_time
_update_availability(node_availability, partition)
matrix = SpaceTimeMatrix(max_qubits_needed, max_end_time)
_add_partition_to_matrix(matrix, partition)
return matrix
def _initialize_node_availability(self, max_timesteps: int) -> dict[str, dict[int, int]]:
"""Initialize availability tracker for all nodes.
Args:
max_timesteps (int): Maximum number of timesteps to track.
Returns:
dict[str, dict[int, int]]: Availability of qubits at timestep per node.
"""
availability = {}
for node in self.network.nodes():
if isinstance(node, QuantumNode) and node.has_role(NodeRole.END_USER):
availability[str(node.id)] = dict.fromkeys(range(max_timesteps), node.data_qubits)
return availability
def _find_earliest_slot(
self,
partition: Partition,
availability: dict[str, dict[int, int]],
earliest_start: int = 0,
) -> tuple[Node, int] | tuple[None, int]:
"""Find earliest time and node where partition can fit.
Args:
partition (Partition): Partition to allocate.
availability (dict[str, dict[int, int]]): Current node availability.
earliest_start (int): Earliest time to start searching for dependencies.
Returns:
tuple[QuantumNode | None, int]: Node with the start timestep,
or (None, -1) if no slot found.
"""
qubits_needed = len(partition.logical_qubit_mapping)
duration = _estimate_duration(partition)
max_time = 0
for node_slots in availability.values():
if node_slots:
max_time = max(max_time, *node_slots.keys())
for start_time in range(earliest_start, max_time + duration):
for node in self.network.nodes():
if not (isinstance(node, QuantumNode) and node.has_role(NodeRole.END_USER)):
continue
if node.data_qubits < qubits_needed:
continue
time_slots = availability[str(node.id)]
can_fit = True
for t in range(start_time, start_time + duration):
if t not in time_slots or time_slots[t] < qubits_needed:
can_fit = False
break
if can_fit:
return node, start_time
return None, -1
def _find_earliest_multi_node_slot( # noqa: C901
self,
partition: Partition,
availability: dict[str, dict[int, int]],
earliest_start: int = 0,
) -> tuple[dict[int, QuantumNode] | None, int]:
"""Find earliest time when partition can be distributed across multiple nodes.
Args:
partition (Partition): Partition to allocate.
availability (dict[str, dict[int, int]]): Current node availability.
earliest_start (int): Earliest time to start searching.
Returns:
tuple[dict[int, QuantumNode] | None, int]: Mapping of physical qubits to nodes
and start time, or (None, -1) if assignment fails.
"""
physical_qubits = sorted(partition.physical_qubits_used)
duration = _estimate_duration(partition)
max_time = 0
for node_slots in availability.values():
if node_slots:
max_time = max(max_time, *node_slots.keys())
for start_time in range(earliest_start, max_time + duration):
qubit_to_node = {}
available_nodes = sorted(self.network.nodes(), key=lambda n: n.id.value)
for qubit in physical_qubits:
assigned = False
for node in available_nodes:
if not (isinstance(node, QuantumNode) and node.has_role(NodeRole.END_USER)):
continue
node_id = str(node.id)
time_slots = availability[node_id]
qubits_on_node = sum(1 for n in qubit_to_node.values() if n == node)
can_fit = True
for t in range(start_time, start_time + duration):
if t not in time_slots or time_slots[t] < (qubits_on_node + 1):
can_fit = False
break
if can_fit:
qubit_to_node[qubit] = node
assigned = True
break
if not assigned:
break
if len(qubit_to_node) == len(physical_qubits):
return qubit_to_node, start_time
return None, -1