64 lines
1.6 KiB
Python
64 lines
1.6 KiB
Python
import zmq
|
|
|
|
|
|
class ZmqPubSubStreamer:
|
|
'''
|
|
Not thread-safe. Always get this inside the thread/process where you intend
|
|
to use it.
|
|
'''
|
|
|
|
def __init__(self, port):
|
|
self._socket = zmq.Context.instance().socket(zmq.PUB)
|
|
print('Starting socket with address: ' + 'tcp://*:' + str(port))
|
|
self._socket.bind("tcp://*:" + str(port))
|
|
|
|
|
|
def send_message(self, message):
|
|
'''
|
|
Args
|
|
----
|
|
message: A message type that has the serialise() method.
|
|
'''
|
|
self.send_message_topic("", message)
|
|
|
|
def send_message_topic(self, topic, message):
|
|
self._socket.send_multipart([bytes(topic), message.serialise()])
|
|
|
|
|
|
class BluetoothStreamer:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def send_message(self, message_bytes):
|
|
pass
|
|
|
|
class TestStreamer:
|
|
def __init__(self):
|
|
self._listeners = []
|
|
|
|
def send_message(self, message_bytes):
|
|
print('Got a message')
|
|
|
|
def send_message_topic(self, topic, message):
|
|
print('Got a message with topic: ' + str(topic))
|
|
self._fire_message_received(message)
|
|
|
|
def add_message_listener(self, listener):
|
|
self._listeners.append(listener)
|
|
|
|
def _fire_message_received(self, message):
|
|
for listener in self._listeners:
|
|
listener(message)
|
|
|
|
def getZmqPubSubStreamer(port):
|
|
'''
|
|
Not thread-safe. Always get this inside the thread/process where you intend
|
|
to use it.
|
|
'''
|
|
return ZmqPubSubStreamer(port)
|
|
|
|
def getTestingStreamer():
|
|
return TestStreamer()
|
|
|
|
# TODO: Create a general get method that will get the streamer based on an
|
|
# environment variable that is set. |