Source code for django_river_ml.announce
import queue
[docs]
class MessageAnnouncer:
def __init__(self):
self.listeners = []
[docs]
def announce(self, msg):
for i in reversed(range(len(self.listeners))):
try:
self.listeners[i].put_nowait(msg)
except queue.Full:
del self.listeners[i]
METRICS_ANNOUNCER = MessageAnnouncer()
EVENTS_ANNOUNCER = MessageAnnouncer()