from abc import abstractmethod
from multiprocessing import Queue
from queue import Empty as QueueEmpty
from typing import Optional, Generic, Iterator
from .api.message import Message, MessageTypeT
class MessageQueueR(Generic[MessageTypeT], Iterator[Message[MessageTypeT]]):
def __iter__(self) -> Iterator[Message[MessageTypeT]]:
return self
def __next__(self) -> Message[MessageTypeT]:
return self.get()
@abstractmethod
def get(self) -> Message[MessageTypeT]:
raise NotImplementedError
@abstractmethod
def get_nowait(self) -> Optional[Message[MessageTypeT]]:
raise NotImplementedError
class MessageQueueW(Generic[MessageTypeT]):
@abstractmethod
def put(self, message: Message[MessageTypeT]) -> None:
pass
@abstractmethod
def close(self) -> None:
pass
class DummyQueue(MessageQueueR[MessageTypeT]):
def get(self) -> Message[MessageTypeT]:
raise NotImplementedError
def get_nowait(self) -> Optional[Message[MessageTypeT]]:
return None
class IpcMessageQueue(Generic[MessageTypeT], MessageQueueR[MessageTypeT], MessageQueueW[MessageTypeT]):
"""Pipe-backed inter-process queue."""
def __init__(self) -> None:
self._q: Queue = Queue()
def get(self) -> Message[MessageTypeT]:
return self._q.get(block=True, timeout=None)
def get_nowait(self) -> Optional[Message[MessageTypeT]]:
try:
return self._q.get(block=False, timeout=None)
except QueueEmpty:
return None
def put(self, message: Message[MessageTypeT]) -> None:
self._q.put(message)
def close(self) -> None:
self._q.close()