import paho.mqtt.client as mqtt import json class Messenger: _connect_callbacks = [] _disconnect_callbacks = [] _message_callbacks = [] def __init__(self): pass def broadcast_message(self, topic, message): ''' Broadcasts the specified message to the swarm based upon its topic(or group). ''' raise NotImplementedError def unicast_message(self, target, message): ''' Broadcasts the specified message to the single target. ''' raise NotImplementedError def connect(self): ''' Connect to the swarm. ''' raise NotImplementedError def disconnect(self): ''' Disconnect from the swarm. ''' raise NotImplementedError def add_connect(self, connect): ''' Adds a callback to do something else once we are connected. ''' self._connect_callbacks.append(connect) def on_connect(self, code = None): ''' Called once the messenger connects to the swarm. ''' for cb in self._connect_callbacks: cb(code) def on_disconnect(self, code = None): ''' Called when the messenger is disconnected from the swarm. ''' for cb in self._disconnect_callbacks: cb(code) def add_disconnect_callback(self, on_disconnect): ''' Adds a callback for when the messenger is disconnected. ''' self._disconnect_callbacks.append(on_disconnect) def add_message_callback(self, on_message): ''' Adds a callback ''' self._message_callbacks.append(on_message) def on_message(self, message): ''' Called when the messenger receives a message. ''' for cb in self._message_callbacks: cb(message) @property def id(self): ''' The id for this messenger that is being used in communication. ''' raise NotImplementedError @property def swarm(self): ''' Gets the name of the swarm this instance is a part of. ''' raise NotImplementedError class MqttMessenger(Messenger): ''' A messenger that uses MQTT. ''' def __init__(self): with open('config.json') as json_config: self._cfg = json.load(json_config) self._client = mqtt.Client() self._client.on_connect = self.on_connect self._client.on_message = self.on_message self._client.on_disconnect = self.on_disconnect def on_message(self, client, userdata, message): Messenger.on_message(self, message) def on_connect(self, client, userdata, flags, rc): # Subscribe to the swarm specified in the config. self._client.subscribe(self._cfg['mqtt']['swarm']) Messenger.on_connect(self, rc) def on_disconnect(self, client, userdata, rc): Messenger.on_disconnect(self, rc) def broadcast_message(self, topic, message): self._client.publish(topic, message, qos=1) def connect(self): self._client.connect(self._cfg['mqtt']['host'], \ int(self._cfg['mqtt']['port']), \ int(self._cfg['mqtt']['timeout'])) self._client.loop_start() def disconnec(self): self._client.disconnect() @property def id(self): return self._client._client_id @property def swarm(self): return self._cfg['mqtt']['swarm']