from collections.abc import Callable
from kosmos.dqc_scheduling.event import EventId
from kosmos.topology.net import Network
from kosmos.topology.node import NodeId, QuantumNode
[docs]
def calculate_parallel_comm_time(
comm_ops: list[tuple[EventId, int]],
network: Network,
get_nodes_func: Callable[[EventId], list[NodeId]],
) -> int:
"""Calculate actual parallel communication time considering node resources.
Args:
comm_ops (list[tuple[EventId, int]]): List of (event_id, execution_time) for communication
operations.
network (Network): The quantum network topology.
get_nodes_func (Callable[[EventId], list[NodeId]]): Function to extract node IDs from an
event ID.
Returns:
int: Actual execution time considering which ops can run in parallel.
"""
if not comm_ops:
return 0
parallel_groups = _create_parallel_groups(comm_ops, network, get_nodes_func)
return sum(max(exec_time for _, exec_time in group) for group in parallel_groups)
def _create_parallel_groups(
comm_ops: list[tuple[EventId, int]],
network: Network,
get_nodes_func: Callable[[EventId], list[NodeId]],
) -> list[list[tuple[EventId, int]]]:
"""Group communication operations that can execute in parallel.
Args:
comm_ops (list[tuple[EventId, int]]): List of (event_id, execution_time) tuples.
network: (Network) The quantum network topology.
get_nodes_func (Callable[[EventId], list[NodeId]]): Function to extract node IDs from an
event ID.
Returns:
list[list[tuple[EventId, int]]]: Groups of operations that can run in parallel.
"""
parallel_groups: list[list[tuple[EventId, int]]] = []
remaining_ops = list(comm_ops)
while remaining_ops:
current_group, ops_to_remove = _build_parallel_group(
remaining_ops, network, get_nodes_func
)
if current_group:
parallel_groups.append(current_group)
for op in ops_to_remove:
remaining_ops.remove(op)
else:
break
return parallel_groups
def _build_parallel_group(
remaining_ops: list[tuple[EventId, int]],
network: Network,
get_nodes_func: Callable[[EventId], list[NodeId]],
) -> tuple[list[tuple[EventId, int]], list[tuple[EventId, int]]]:
"""Build a single group of operations that can execute in parallel.
Args:
remaining_ops (list[tuple[EventId, int]]): Operations not yet assigned to a group.
network (Network): The quantum network topology.
get_nodes_func (Callable[[EventId], list[NodeId]]): Function to extract node IDs from an
event ID.
Returns:
tuple[list[tuple[EventId, int]], list[tuple[EventId, int]]]: (group of parallel ops, ops to
remove from remaining_ops).
"""
current_group = []
node_capacity_used: dict[NodeId, int] = {}
ops_to_remove = []
for event_id, exec_time in remaining_ops:
nodes_involved = get_nodes_func(event_id)
if _can_add_to_group(nodes_involved, node_capacity_used, network):
current_group.append((event_id, exec_time))
ops_to_remove.append((event_id, exec_time))
_update_node_capacity(nodes_involved, node_capacity_used)
return current_group, ops_to_remove
def _can_add_to_group(
nodes_involved: list[NodeId],
node_capacity_used: dict[NodeId, int],
network: Network,
) -> bool:
"""Check if operation can be added to current parallel group.
Args:
nodes_involved (list[NodeId]): Node IDs involved in the operation.
node_capacity_used (dict[NodeId, int]): Current capacity usage per node.
network (Network): The quantum network topology.
Returns:
bool: True if operation can be added without exceeding node capacity.
"""
for node_id in nodes_involved:
node = network.get_node(node_id)
if node is None or not isinstance(node, QuantumNode):
return False
used = node_capacity_used.get(node_id, 0)
if used >= node.communication_qubits:
return False
return True
def _update_node_capacity(
nodes_involved: list[NodeId],
node_capacity_used: dict[NodeId, int],
) -> None:
"""Update the capacity usage for involved nodes.
Args:
nodes_involved (list[NodeId]): Node IDs involved in the operation.
node_capacity_used (dict[NodeId, int]): Current capacity usage per node.
"""
for node_id in nodes_involved:
node_capacity_used[node_id] = node_capacity_used.get(node_id, 0) + 1