import paho.mqtt.client as mqtt import json import random class Messenger: _connect_callbacks = [] _disconnect_callbacks = [] _message_callbacks = [] def __init__(self): pass def broadcast_message(self, message): """ Broadcasts the specified message to the swarm based upon its topic(or group). """ raise NotImplementedError def unicast_message(self, target, message): """ Broadcasts the specified message to the single target. """ raise NotImplementedError def connect(self): """ Connect to the swarm. """ raise NotImplementedError def disconnect(self): """ Disconnect from the swarm. """ raise NotImplementedError def add_connect(self, connect): """ Adds a callback to do something else once we are connected. """ self._connect_callbacks.append(connect) def on_connect(self, code = None): """ Called once the messenger connects to the swarm. """ for cb in self._connect_callbacks: cb(code) def on_disconnect(self, code = None): """ Called when the messenger is disconnected from the swarm. """ for cb in self._disconnect_callbacks: cb(code) def add_disconnect_callback(self, on_disconnect): """ Adds a callback for when the messenger is disconnected. """ self._disconnect_callbacks.append(on_disconnect) def add_message_callback(self, on_message): """ Adds a callback """ self._message_callbacks.append(on_message) def on_message(self, message): """ Called when the messenger receives a message. """ for cb in self._message_callbacks: cb(message) @property def id(self): """ The id for this messenger that is being used in communication. """ raise NotImplementedError @property def swarm(self): """ Gets the name of the swarm this instance is a part of. """ raise NotImplementedError class MqttMessenger(Messenger): """A messenger that uses MQTT.""" def __init__(self): with open('config.json') as json_config: self._cfg = json.load(json_config) self._client = mqtt.Client(client_id=str(random.randint(0,500))) self._client.on_connect = self.on_connect self._client.on_message = self.on_message self._client.on_disconnect = self.on_disconnect def on_message(self, client, userdata, message): Messenger.on_message(self, message) def on_connect(self, client, userdata, flags, rc): # Subscribe to the swarm specified in the config. self._client.subscribe(self._cfg['mqtt']['swarm']) # Also subscribe to our own topic for unicast messages. self._client.subscribe(self._cfg['mqtt']['swarm'] + str(self._client._client_id)) Messenger.on_connect(self, rc) def on_disconnect(self, client, userdata, rc): Messenger.on_disconnect(self, rc) def broadcast_message(self, message): self._client.publish(self._cfg['mqtt']['swarm'], message, qos=1) def unicast_message(self, target, message): self._client.publish(target, message, qos=1) def connect(self): self._client.connect(self._cfg['mqtt']['host'], \ int(self._cfg['mqtt']['port']), \ int(self._cfg['mqtt']['timeout'])) self._client.loop_start() def disconnec(self): self._client.disconnect() @property def id(self): return self._client._client_id @property def swarm(self): return self._cfg['mqtt']['swarm']