import zmq class ZmqPubSubStreamer: ''' Not thread-safe. Always get this inside the thread/process where you intend to use it. ''' def __init__(self, port): self._socket = zmq.Context.instance().socket(zmq.PUB) print('Starting socket with address: ' + 'tcp://*:' + str(port)) self._socket.bind("tcp://*:" + str(port)) def send_message(self, message): ''' Args ---- message: A message type that has the serialise() method. ''' self.send_message_topic("", message) def send_message_topic(self, topic, message): self._socket.send_multipart([bytes(topic), message.serialise()]) class BluetoothStreamer: def __init__(self): pass def send_message(self, message_bytes): pass class TestStreamer: def __init__(self): self._listeners = [] def send_message(self, message_bytes): print('Got a message') def send_message_topic(self, topic, message): print('Got a message with topic: ' + str(topic)) self._fire_message_received(message) def add_message_listener(self, listener): self._listeners.append(listener) def _fire_message_received(self, message): for listener in self._listeners: listener(message) def getZmqPubSubStreamer(port): ''' Not thread-safe. Always get this inside the thread/process where you intend to use it. ''' return ZmqPubSubStreamer(port) def getTestingStreamer(): return TestStreamer()