Source code for kosmos.dqc_scheduling.utils.timing_utils

from collections.abc import Callable

from kosmos.dqc_scheduling.event import EventId
from kosmos.topology.net import Network
from kosmos.topology.node import NodeId, QuantumNode


[docs] def calculate_parallel_comm_time( comm_ops: list[tuple[EventId, int]], network: Network, get_nodes_func: Callable[[EventId], list[NodeId]], ) -> int: """Calculate actual parallel communication time considering node resources. Args: comm_ops (list[tuple[EventId, int]]): List of (event_id, execution_time) for communication operations. network (Network): The quantum network topology. get_nodes_func (Callable[[EventId], list[NodeId]]): Function to extract node IDs from an event ID. Returns: int: Actual execution time considering which ops can run in parallel. """ if not comm_ops: return 0 parallel_groups = _create_parallel_groups(comm_ops, network, get_nodes_func) return sum(max(exec_time for _, exec_time in group) for group in parallel_groups)
def _create_parallel_groups( comm_ops: list[tuple[EventId, int]], network: Network, get_nodes_func: Callable[[EventId], list[NodeId]], ) -> list[list[tuple[EventId, int]]]: """Group communication operations that can execute in parallel. Args: comm_ops (list[tuple[EventId, int]]): List of (event_id, execution_time) tuples. network: (Network) The quantum network topology. get_nodes_func (Callable[[EventId], list[NodeId]]): Function to extract node IDs from an event ID. Returns: list[list[tuple[EventId, int]]]: Groups of operations that can run in parallel. """ parallel_groups: list[list[tuple[EventId, int]]] = [] remaining_ops = list(comm_ops) while remaining_ops: current_group, ops_to_remove = _build_parallel_group( remaining_ops, network, get_nodes_func ) if current_group: parallel_groups.append(current_group) for op in ops_to_remove: remaining_ops.remove(op) else: break return parallel_groups def _build_parallel_group( remaining_ops: list[tuple[EventId, int]], network: Network, get_nodes_func: Callable[[EventId], list[NodeId]], ) -> tuple[list[tuple[EventId, int]], list[tuple[EventId, int]]]: """Build a single group of operations that can execute in parallel. Args: remaining_ops (list[tuple[EventId, int]]): Operations not yet assigned to a group. network (Network): The quantum network topology. get_nodes_func (Callable[[EventId], list[NodeId]]): Function to extract node IDs from an event ID. Returns: tuple[list[tuple[EventId, int]], list[tuple[EventId, int]]]: (group of parallel ops, ops to remove from remaining_ops). """ current_group = [] node_capacity_used: dict[NodeId, int] = {} ops_to_remove = [] for event_id, exec_time in remaining_ops: nodes_involved = get_nodes_func(event_id) if _can_add_to_group(nodes_involved, node_capacity_used, network): current_group.append((event_id, exec_time)) ops_to_remove.append((event_id, exec_time)) _update_node_capacity(nodes_involved, node_capacity_used) return current_group, ops_to_remove def _can_add_to_group( nodes_involved: list[NodeId], node_capacity_used: dict[NodeId, int], network: Network, ) -> bool: """Check if operation can be added to current parallel group. Args: nodes_involved (list[NodeId]): Node IDs involved in the operation. node_capacity_used (dict[NodeId, int]): Current capacity usage per node. network (Network): The quantum network topology. Returns: bool: True if operation can be added without exceeding node capacity. """ for node_id in nodes_involved: node = network.get_node(node_id) if node is None or not isinstance(node, QuantumNode): return False used = node_capacity_used.get(node_id, 0) if used >= node.communication_qubits: return False return True def _update_node_capacity( nodes_involved: list[NodeId], node_capacity_used: dict[NodeId, int], ) -> None: """Update the capacity usage for involved nodes. Args: nodes_involved (list[NodeId]): Node IDs involved in the operation. node_capacity_used (dict[NodeId, int]): Current capacity usage per node. """ for node_id in nodes_involved: node_capacity_used[node_id] = node_capacity_used.get(node_id, 0) + 1