Source code for django_river_ml.announce
import queue
[docs]
class MessageAnnouncer:
    def __init__(self):
        self.listeners = []
[docs]
    def announce(self, msg):
        for i in reversed(range(len(self.listeners))):
            try:
                self.listeners[i].put_nowait(msg)
            except queue.Full:
                del self.listeners[i]
METRICS_ANNOUNCER = MessageAnnouncer()
EVENTS_ANNOUNCER = MessageAnnouncer()