diff --git a/DecisionSystem/CentralisedDecision/messenger.py b/DecisionSystem/CentralisedDecision/messenger.py index 53a2e9f..e867611 100644 --- a/DecisionSystem/CentralisedDecision/messenger.py +++ b/DecisionSystem/CentralisedDecision/messenger.py @@ -1,5 +1,6 @@ import paho.mqtt.client as mqtt import json +import random class Messenger: _connect_callbacks = [] @@ -9,93 +10,90 @@ class Messenger: def __init__(self): pass - def broadcast_message(self, topic, message): - ''' + def broadcast_message(self, message): + """ Broadcasts the specified message to the swarm based upon its topic(or group). - ''' + """ raise NotImplementedError def unicast_message(self, target, message): - ''' + """ Broadcasts the specified message to the single target. - ''' + """ raise NotImplementedError def connect(self): - ''' + """ Connect to the swarm. - ''' + """ raise NotImplementedError def disconnect(self): - ''' + """ Disconnect from the swarm. - ''' + """ raise NotImplementedError def add_connect(self, connect): - ''' + """ Adds a callback to do something else once we are connected. - ''' + """ self._connect_callbacks.append(connect) def on_connect(self, code = None): - ''' + """ Called once the messenger connects to the swarm. - ''' + """ for cb in self._connect_callbacks: cb(code) def on_disconnect(self, code = None): - ''' + """ Called when the messenger is disconnected from the swarm. - ''' + """ for cb in self._disconnect_callbacks: cb(code) def add_disconnect_callback(self, on_disconnect): - ''' + """ Adds a callback for when the messenger is disconnected. - ''' + """ self._disconnect_callbacks.append(on_disconnect) def add_message_callback(self, on_message): - ''' + """ Adds a callback - ''' + """ self._message_callbacks.append(on_message) def on_message(self, message): - ''' + """ Called when the messenger receives a message. - ''' + """ for cb in self._message_callbacks: cb(message) @property def id(self): - ''' + """ The id for this messenger that is being used in communication. - ''' + """ raise NotImplementedError @property def swarm(self): - ''' + """ Gets the name of the swarm this instance is a part of. - ''' + """ raise NotImplementedError class MqttMessenger(Messenger): - ''' - A messenger that uses MQTT. - ''' + """A messenger that uses MQTT.""" def __init__(self): with open('config.json') as json_config: self._cfg = json.load(json_config) - - self._client = mqtt.Client() + self._client = mqtt.Client(client_id=str(random.randint(0,500))) self._client.on_connect = self.on_connect self._client.on_message = self.on_message self._client.on_disconnect = self.on_disconnect @@ -106,13 +104,19 @@ class MqttMessenger(Messenger): def on_connect(self, client, userdata, flags, rc): # Subscribe to the swarm specified in the config. self._client.subscribe(self._cfg['mqtt']['swarm']) + + # Also subscribe to our own topic for unicast messages. + self._client.subscribe(self._cfg['mqtt']['swarm'] + str(self._client._client_id)) Messenger.on_connect(self, rc) def on_disconnect(self, client, userdata, rc): Messenger.on_disconnect(self, rc) - def broadcast_message(self, topic, message): - self._client.publish(topic, message, qos=1) + def broadcast_message(self, message): + self._client.publish(self._cfg['mqtt']['swarm'], message, qos=1) + + def unicast_message(self, target, message): + self._client.publish(target, message, qos=1) def connect(self): self._client.connect(self._cfg['mqtt']['host'], \