Source code for kosmos.dqc_scheduling.event_queue

import heapq

from kosmos.dqc_scheduling.event import Event, EventId
from kosmos.protocols.eg_protocol import EGProtocol
from kosmos.protocols.protocol import Protocol, ProtocolStatus
from kosmos.protocols.protocol_result import ProtocolResult


[docs] class EventQueue: """Event queue for scheduling protocols within the simulation. Attributes: queue (list[Event]): Priority queue of events ready to execute, ordered by time. current_time (int): Global clock for simulation. waiting_events (dict[EventId, Event]): Events waiting for their dependencies to be satisfied before being added to the execution queue. event_results (dict[EventId, ProtocolResult]): Results from the executed protocols. """ def __init__(self) -> None: """Initialize the event queue.""" self.queue: list[Event] = [] self.current_time: int = 0 self._dependencies: dict[EventId, set[EventId]] = {} self.waiting_events: dict[EventId, Event] = {} self._completed_events = set() self.event_results: dict[EventId, ProtocolResult] = {} self._event_counter = 0
[docs] def add_event_with_dependencies( self, protocol: Protocol, delay: int = 0, dependencies: list[EventId] | None = None, event_id: EventId | None = None, ) -> EventId | None: """Add an event with dependencies to the system. Args: protocol (Protocol): Protocol to execute. delay (int): Time delay from current time. Defaults to 0. dependencies (list[str] | None): List of event IDs this event depends on. Defaults to None. event_id (EventId | None): Optional explicit ID for this event. Defaults to None. Returns: EventId or None: ID of the added event. """ if event_id is None: event_id = event_id or EventId(f"event_{self._event_counter}") self._event_counter += 1 event = Event(self.current_time + delay, event_id, protocol) if not dependencies or all(dep in self._completed_events for dep in dependencies): heapq.heappush(self.queue, event) else: self.waiting_events[event_id] = event self._dependencies[event_id] = set(dependencies) return event_id
[docs] def run(self) -> None: """Run all events in the queue until it's empty.""" while self.queue: event = heapq.heappop(self.queue) self.current_time = max(self.current_time, event.time) result = event.protocol.execute() if result.status == ProtocolStatus.FAILED: if isinstance(event.protocol, EGProtocol): msg = ( "Simulation failed during entanglement generation protocol. " "Not enough communication qubits are available." ) raise RuntimeError(msg) msg = f"Execution failed during: {event.protocol} (event_id={event.id})." raise RuntimeError(msg) self.current_time += result.execution_time self._completed_events.add(event.id) self.event_results[event.id] = result self._check_waiting_events()
def _check_waiting_events(self) -> None: """Check if any waiting events have all dependencies satisfied.""" ready_events = [] for event_id, dependencies in self._dependencies.items(): if dependencies.issubset(self._completed_events): ready_events.append(event_id) for event_id in ready_events: event = self.waiting_events.pop(event_id) del self._dependencies[event_id] heapq.heappush(self.queue, event)