From 57db02a402a17972614a1aa607ad46ec34827c3f Mon Sep 17 00:00:00 2001 From: Michael Pivato Date: Thu, 24 Jan 2019 10:59:26 +1030 Subject: [PATCH] Change to use external messaging module --- .../CentralisedDecision/ballotvoter.py | 45 +++--- .../CentralisedDecision/commander.py | 34 ++--- .../CentralisedDecision/messenger.py | 133 ++++++++++++++++++ 3 files changed, 174 insertions(+), 38 deletions(-) create mode 100644 DecisionSystem/CentralisedDecision/messenger.py diff --git a/DecisionSystem/CentralisedDecision/ballotvoter.py b/DecisionSystem/CentralisedDecision/ballotvoter.py index 125d523..48552ed 100644 --- a/DecisionSystem/CentralisedDecision/ballotvoter.py +++ b/DecisionSystem/CentralisedDecision/ballotvoter.py @@ -1,28 +1,27 @@ -import paho.mqtt.client as mqtt import json from DecisionSystem.messages import ConnectSwarm, SubmitVote, Message, deserialise, RequestVote +from multiprocessing import Pool +from messenger import Messenger class BallotVoter: - def __init__(self, on_vote): + def __init__(self, on_vote, messenger: Messenger): # Load config file cfg = None - with open('../config.json') as json_config: + with open('config.json') as json_config: cfg = json.load(json_config) self._cfg = cfg - self.client = mqtt.Client() - self.client.on_connect = self.on_connect - self.client.on_message = self.on_message - # id is generated automatically. - self.client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["timeout"]) - self.client.loop_start() + self.messenger = messenger + self.messenger.add_message_callback(self.on_message) + self.messenger.add_connect(self.on_connect) self.on_vote = on_vote - def on_connect(self, client, userdata, flags, rc): + def on_connect(self, rc): print("Connected with result code " + str(rc)) - self.client.subscribe('swarm1/voters', qos=1) + # Should subscribes be handled by the messenger? + #self.client.subscribe('swarm1/voters', qos=1) # Need to set a will as well to broadcast on unexpected disconnection, so commander # knows it is no longer part of the set of voters. @@ -30,9 +29,9 @@ class BallotVoter: #will_message = {"type": "UDisconnect"} # Tell commander we are now connected. - # self.send_connect() + self.send_connect() - def on_message(self, client, userdata, message): + def on_message(self, message): print("Message Received!") messageD = deserialise(message.payload) print("Message Type: " + messageD.type) @@ -51,14 +50,24 @@ class BallotVoter: def submit_vote(self): v = self.on_vote() + if v == None: + print('Could not vote on the frame') + return print("Got Vote") - vote = SubmitVote(v, self.client._client_id) + vote = SubmitVote(v, self.messenger.id) print('created vote') - self.client.publish('swarm1/commander', vote.serialise()) + self.messenger.broadcast_message(self.messenger.swarm, vote.serialise()) print('published vote') - + + def submit_vote_multicore(self): + ''' + Uses multiple processes to work on multiple frames at the same time + to take an average for this voter, or if it could not find a hand + in one frame but can in another. + ''' + pass + def send_connect(self): # Send a connected message to let any commanders know that # it is available. - self.client.publish("swarm1/commander", ConnectSwarm(self.client._client_id).serialise()) - \ No newline at end of file + self.messenger.broadcast_message(self.messenger.swarm, ConnectSwarm(self.messenger.id).serialise()) \ No newline at end of file diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index fcb7973..3e55969 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -1,4 +1,3 @@ -import paho.mqtt.client as mqtt import time from DecisionSystem.messages import Message, CommanderWill, RequestVote, GetSwarmParticipants, deserialise import json @@ -14,24 +13,17 @@ class Commander: _votes = {} _taking_votes = False - def __init__(self, timeout = 60): + def __init__(self, messenger, timeout = 60): ''' Initial/default waiting time is 1 minute for votes to come in. ''' self.timeout = timeout - # Default configuration file for testing. - cfg = None - with open('config.json') as json_config: - cfg = json.load(json_config) - self._cfg = cfg - self.client = mqtt.Client() - self.client.on_connect = self.on_connect - self.client.on_message = self.on_message - self.client.on_disconnect = self.on_disconnect - self.client.connect(cfg["mqtt"]["host"], int(cfg["mqtt"]["port"]), int(cfg["mqtt"]["timeout"])) - # Don't forget to start the event loop!!! - self.client.loop_start() + self._messenger = messenger + self._messenger.add_connect(self.on_connect) + self._messenger.add_message_callback(self.on_message) + self._messenger.add_disconnect_callback(self.on_disconnect) + print('Connecting') def make_decision(self): # Should change this to follow strategy pattern, for different implementations of @@ -65,13 +57,13 @@ class Commander: self._taking_votes = True # Publish a message that votes are needed. print("Sending request message") - self.client.publish("swarm1/voters", RequestVote(self.client._client_id).serialise()) + self._messenger.broadcast_message(self._messenger.swarm, RequestVote(self._messenger.id).serialise()) print("published message") time.sleep(self.timeout) self._taking_votes = False return self.make_decision() - def on_message(self, client, userdata, message): + def on_message(self, message): print("Message Received") messageD = None try: @@ -101,16 +93,18 @@ class Commander: print("Voter disconnected :(") self._connectedVoters.remove(messageD.sender) - def on_connect(self, client, userdata, flags, rc): - self.client.subscribe("swarm1/commander") + def on_connect(self, rc): + # Subscribes now handled by the mqtt messenger, this is just here + # for convenience later. + pass def get_participants(self): - self.client.publish("swarm1/voters", GetSwarmParticipants().serialise()) + self._messenger.broadcast_message(self._messenger.swarm, GetSwarmParticipants().serialise()) # Commander needs a will message too, for the decentralised version, so the # voters know to pick a new commander. # If using apache zookeeper this won't be needed. # That's the wrong method for setting a will. # self.client.publish("swarm1/voters", CommanderWill(self.client._client_id).serialise()) - def on_disconnect(self, client, userdata, rc): + def on_disconnect(self, rc): pass \ No newline at end of file diff --git a/DecisionSystem/CentralisedDecision/messenger.py b/DecisionSystem/CentralisedDecision/messenger.py new file mode 100644 index 0000000..53a2e9f --- /dev/null +++ b/DecisionSystem/CentralisedDecision/messenger.py @@ -0,0 +1,133 @@ +import paho.mqtt.client as mqtt +import json + +class Messenger: + _connect_callbacks = [] + _disconnect_callbacks = [] + _message_callbacks = [] + + def __init__(self): + pass + + def broadcast_message(self, topic, message): + ''' + Broadcasts the specified message to the swarm based upon its topic(or group). + ''' + raise NotImplementedError + + def unicast_message(self, target, message): + ''' + Broadcasts the specified message to the single target. + ''' + raise NotImplementedError + + def connect(self): + ''' + Connect to the swarm. + ''' + raise NotImplementedError + + def disconnect(self): + ''' + Disconnect from the swarm. + ''' + raise NotImplementedError + + def add_connect(self, connect): + ''' + Adds a callback to do something else once we are connected. + ''' + self._connect_callbacks.append(connect) + + def on_connect(self, code = None): + ''' + Called once the messenger connects to the swarm. + ''' + for cb in self._connect_callbacks: + cb(code) + + def on_disconnect(self, code = None): + ''' + Called when the messenger is disconnected from the swarm. + ''' + for cb in self._disconnect_callbacks: + cb(code) + + def add_disconnect_callback(self, on_disconnect): + ''' + Adds a callback for when the messenger is disconnected. + ''' + self._disconnect_callbacks.append(on_disconnect) + + def add_message_callback(self, on_message): + ''' + Adds a callback + ''' + self._message_callbacks.append(on_message) + + def on_message(self, message): + ''' + Called when the messenger receives a message. + ''' + for cb in self._message_callbacks: + cb(message) + + @property + def id(self): + ''' + The id for this messenger that is being used in communication. + ''' + raise NotImplementedError + + @property + def swarm(self): + ''' + Gets the name of the swarm this instance is a part of. + ''' + raise NotImplementedError + + +class MqttMessenger(Messenger): + ''' + A messenger that uses MQTT. + ''' + def __init__(self): + with open('config.json') as json_config: + self._cfg = json.load(json_config) + + self._client = mqtt.Client() + self._client.on_connect = self.on_connect + self._client.on_message = self.on_message + self._client.on_disconnect = self.on_disconnect + + def on_message(self, client, userdata, message): + Messenger.on_message(self, message) + + def on_connect(self, client, userdata, flags, rc): + # Subscribe to the swarm specified in the config. + self._client.subscribe(self._cfg['mqtt']['swarm']) + Messenger.on_connect(self, rc) + + def on_disconnect(self, client, userdata, rc): + Messenger.on_disconnect(self, rc) + + def broadcast_message(self, topic, message): + self._client.publish(topic, message, qos=1) + + def connect(self): + self._client.connect(self._cfg['mqtt']['host'], \ + int(self._cfg['mqtt']['port']), \ + int(self._cfg['mqtt']['timeout'])) + + self._client.loop_start() + + def disconnec(self): + self._client.disconnect() + + @property + def id(self): + return self._client._client_id + + @property + def swarm(self): + return self._cfg['mqtt']['swarm'] \ No newline at end of file