Source code for kosmos.dqc_scheduling.event_queue
import heapq
from kosmos.dqc_scheduling.event import Event, EventId
from kosmos.protocols.eg_protocol import EGProtocol
from kosmos.protocols.protocol import Protocol, ProtocolStatus
from kosmos.protocols.protocol_result import ProtocolResult
[docs]
class EventQueue:
"""Event queue for scheduling protocols within the simulation.
Attributes:
queue (list[Event]): Priority queue of events ready to execute, ordered by time.
current_time (int): Global clock for simulation.
waiting_events (dict[EventId, Event]): Events waiting for their dependencies to be
satisfied before being added to the execution queue.
event_results (dict[EventId, ProtocolResult]): Results from the executed protocols.
"""
def __init__(self) -> None:
"""Initialize the event queue."""
self.queue: list[Event] = []
self.current_time: int = 0
self._dependencies: dict[EventId, set[EventId]] = {}
self.waiting_events: dict[EventId, Event] = {}
self._completed_events = set()
self.event_results: dict[EventId, ProtocolResult] = {}
self._event_counter = 0
[docs]
def add_event_with_dependencies(
self,
protocol: Protocol,
delay: int = 0,
dependencies: list[EventId] | None = None,
event_id: EventId | None = None,
) -> EventId | None:
"""Add an event with dependencies to the system.
Args:
protocol (Protocol): Protocol to execute.
delay (int): Time delay from current time. Defaults to 0.
dependencies (list[str] | None): List of event IDs this event depends on.
Defaults to None.
event_id (EventId | None): Optional explicit ID for this event. Defaults to None.
Returns:
EventId or None: ID of the added event.
"""
if event_id is None:
event_id = event_id or EventId(f"event_{self._event_counter}")
self._event_counter += 1
event = Event(self.current_time + delay, event_id, protocol)
if not dependencies or all(dep in self._completed_events for dep in dependencies):
heapq.heappush(self.queue, event)
else:
self.waiting_events[event_id] = event
self._dependencies[event_id] = set(dependencies)
return event_id
[docs]
def run(self) -> None:
"""Run all events in the queue until it's empty."""
while self.queue:
event = heapq.heappop(self.queue)
self.current_time = max(self.current_time, event.time)
result = event.protocol.execute()
if result.status == ProtocolStatus.FAILED:
if isinstance(event.protocol, EGProtocol):
msg = (
"Simulation failed during entanglement generation protocol. "
"Not enough communication qubits are available."
)
raise RuntimeError(msg)
msg = f"Execution failed during: {event.protocol} (event_id={event.id})."
raise RuntimeError(msg)
self.current_time += result.execution_time
self._completed_events.add(event.id)
self.event_results[event.id] = result
self._check_waiting_events()
def _check_waiting_events(self) -> None:
"""Check if any waiting events have all dependencies satisfied."""
ready_events = []
for event_id, dependencies in self._dependencies.items():
if dependencies.issubset(self._completed_events):
ready_events.append(event_id)
for event_id in ready_events:
event = self.waiting_events.pop(event_id)
del self._dependencies[event_id]
heapq.heappush(self.queue, event)