Source code for kosmos.dqc_scheduling.assignment_strategies.greedy_assignment

from kosmos.dqc_scheduling.assignment_strategies.assignment_strategy import AssignmentStrategy
from kosmos.dqc_scheduling.space_time_matrix import SpaceTimeMatrix
from kosmos.partitioning.partition import Partition
from kosmos.topology.node import Node, NodeRole, QuantumNode


def _estimate_duration(partition: Partition) -> int:
    """Estimate execution duration for partition.

    Args:
        partition (Partition): Partition to estimate.

    Returns:
        int: Estimated timesteps needed.

    """
    return partition.circuit.depth()


def _update_availability(availability: dict[str, dict[int, int]], partition: Partition) -> None:
    """Update node availability after allocating partition.

    Args:
        availability (dict[str, dict[int, int]]): Availability tracker to update.
        partition (Partition): Allocated partition.

    """
    if partition.is_multi_node:
        # Multi-node assignment: update each node based on qubit assignment
        for node in partition.qubit_to_node_mapping.values():
            node_id = str(node.id)
            for t in range(partition.start_time, partition.end_time):
                if t in availability[node_id]:
                    availability[node_id][t] -= 1
    else:
        node_id = str(partition.node.id)
        qubits_used = len(partition.logical_qubit_mapping)

        for t in range(partition.start_time, partition.end_time):
            if t in availability[node_id]:
                availability[node_id][t] -= qubits_used


def _add_partition_to_matrix(matrix: SpaceTimeMatrix, partition: Partition) -> None:
    """Add allocated partition to matrix.

    Args:
        matrix (SpaceTimeMatrix): Reference to the matrix.
        partition (Partition): Partition we add to the matrix.

    """
    matrix.partition_info[partition.id] = partition

    for time_step in range(partition.start_time, partition.end_time):
        for logical_qubit in partition.logical_qubit_mapping.values():
            if logical_qubit >= matrix.num_qubits:
                msg = (
                    f"Partition {partition.id} has qubit {logical_qubit} "
                    f"which exceeds matrix capacity of {matrix.num_qubits}."
                )
                raise ValueError(msg)

            if time_step >= matrix.num_timesteps or time_step < 0:
                msg = (
                    f"Partition {partition.id} extends to timestep {time_step} "
                    f"which exceeds matrix capacity of {matrix.num_timesteps}."
                )
                raise ValueError(msg)
            if partition.is_multi_node:
                node = partition.get_node_for_qubit(logical_qubit)
            else:
                node = partition.node

            matrix.matrix[logical_qubit][time_step] = node


[docs] class GreedyAssignment(AssignmentStrategy): """Greedy assignment: assigns partition to nodes with capacity at earliest time."""
[docs] def allocate(self, partition: Partition) -> SpaceTimeMatrix: """Allocate partition greedily to nodes at earliest available time. Strategy: For each qubit, assign it to nodes in ascending ID order (QPU_1, QPU_2, QPU_3). Args: partition (Partition): Partition to allocate. Returns: SpaceTimeMatrix: Matrix with node assignments and timing. """ max_qubits_needed = len(partition.logical_qubit_mapping.values()) max_timesteps = partition.circuit.depth() node_availability = self._initialize_node_availability(max_timesteps * 2) completion_times: dict[str, int] = {} max_end_time = 0 earliest_start = 0 node, start_time = self._find_earliest_slot(partition, node_availability, earliest_start) if node is not None: partition.node = node partition.start_time = start_time else: qubit_assignment, start_time = self._find_earliest_multi_node_slot( partition, node_availability, earliest_start ) if qubit_assignment is None: msg = ( f"Cannot allocate partition {partition.id}. " "Insufficient resources across all nodes." ) raise RuntimeError(msg) partition.qubit_to_node_mapping = qubit_assignment partition.node = None partition.start_time = start_time max_end_time = max(max_end_time, partition.end_time) completion_times[partition.id] = partition.end_time _update_availability(node_availability, partition) matrix = SpaceTimeMatrix(max_qubits_needed, max_end_time) _add_partition_to_matrix(matrix, partition) return matrix
def _initialize_node_availability(self, max_timesteps: int) -> dict[str, dict[int, int]]: """Initialize availability tracker for all nodes. Args: max_timesteps (int): Maximum number of timesteps to track. Returns: dict[str, dict[int, int]]: Availability of qubits at timestep per node. """ availability = {} for node in self.network.nodes(): if isinstance(node, QuantumNode) and node.has_role(NodeRole.END_USER): availability[str(node.id)] = dict.fromkeys(range(max_timesteps), node.data_qubits) return availability def _find_earliest_slot( self, partition: Partition, availability: dict[str, dict[int, int]], earliest_start: int = 0, ) -> tuple[Node, int] | tuple[None, int]: """Find earliest time and node where partition can fit. Args: partition (Partition): Partition to allocate. availability (dict[str, dict[int, int]]): Current node availability. earliest_start (int): Earliest time to start searching for dependencies. Returns: tuple[QuantumNode | None, int]: Node with the start timestep, or (None, -1) if no slot found. """ qubits_needed = len(partition.logical_qubit_mapping) duration = _estimate_duration(partition) max_time = 0 for node_slots in availability.values(): if node_slots: max_time = max(max_time, *node_slots.keys()) for start_time in range(earliest_start, max_time + duration): for node in self.network.nodes(): if not (isinstance(node, QuantumNode) and node.has_role(NodeRole.END_USER)): continue if node.data_qubits < qubits_needed: continue time_slots = availability[str(node.id)] can_fit = True for t in range(start_time, start_time + duration): if t not in time_slots or time_slots[t] < qubits_needed: can_fit = False break if can_fit: return node, start_time return None, -1 def _find_earliest_multi_node_slot( # noqa: C901 self, partition: Partition, availability: dict[str, dict[int, int]], earliest_start: int = 0, ) -> tuple[dict[int, QuantumNode] | None, int]: """Find earliest time when partition can be distributed across multiple nodes. Args: partition (Partition): Partition to allocate. availability (dict[str, dict[int, int]]): Current node availability. earliest_start (int): Earliest time to start searching. Returns: tuple[dict[int, QuantumNode] | None, int]: Mapping of physical qubits to nodes and start time, or (None, -1) if assignment fails. """ physical_qubits = sorted(partition.physical_qubits_used) duration = _estimate_duration(partition) max_time = 0 for node_slots in availability.values(): if node_slots: max_time = max(max_time, *node_slots.keys()) for start_time in range(earliest_start, max_time + duration): qubit_to_node = {} available_nodes = sorted(self.network.nodes(), key=lambda n: n.id.value) for qubit in physical_qubits: assigned = False for node in available_nodes: if not (isinstance(node, QuantumNode) and node.has_role(NodeRole.END_USER)): continue node_id = str(node.id) time_slots = availability[node_id] qubits_on_node = sum(1 for n in qubit_to_node.values() if n == node) can_fit = True for t in range(start_time, start_time + duration): if t not in time_slots or time_slots[t] < (qubits_on_node + 1): can_fit = False break if can_fit: qubit_to_node[qubit] = node assigned = True break if not assigned: break if len(qubit_to_node) == len(physical_qubits): return qubit_to_node, start_time return None, -1