From 2e05b83d9981faf846a7c14dcbb9bc7fb0e6ca79 Mon Sep 17 00:00:00 2001 From: Michael Pivato Date: Thu, 17 Jan 2019 14:17:42 +1030 Subject: [PATCH] Use messages module instead of messagepack in commander --- .../CentralisedDecision/commander.py | 60 ++++++++++--------- 1 file changed, 33 insertions(+), 27 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index 840a822..490dce7 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -1,6 +1,8 @@ import paho.mqtt.client as mqtt import time -import umsgpack +from DecisionSystem.messages import Message, CommanderWill, RequestVote +import json +import numpy as np class Commander: currentVote = None @@ -16,15 +18,21 @@ class Commander: ''' Initial/default waiting time is 1 minute for votes to come in. ''' - self.timeout = timeout + self.timeout = timeout + # Default configuration file for testing. + cfg = None + with open('config.json') as json_config: + cfg = json.load(json_config) + self._cfg = cfg self.client = mqtt.Client() - self.client.connect('172.16.13.128') + self.client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["host"]) self.client.subscribe("swarm1/commander") # Commander needs a will message too, for the decentralised version, so the # voters know to pick a new commander. # If using apache zookeeper this won't be needed. + self.client.publish("swarm1/voters", CommanderWill(self.client._client_id)) def make_decision(self): # Should change this to follow strategy pattern, for different implementations of @@ -41,6 +49,8 @@ class Commander: max_vote = None max_vote_num = 0 + # Should try using a numpy array for this. + for vote in dif_votes.keys(): if dif_votes[vote] > max_vote_num: max_vote = vote @@ -50,42 +60,38 @@ class Commander: def get_votes(self): # Should consider abstracting these messages to another class for portability. - message = {"type": "reqVote"} - message_packed = umsgpack.packb(message) self._taking_votes = True # Publish a message that votes are needed. - self.client.publish("swarm1/voters", message_packed) + self.client.publish("swarm1/voters", RequestVote(self.client._client_id)) time.sleep(self.timeout) self._taking_votes = False self.make_decision() def on_message(self, client, userdata, message): + messageD = None try: - messageDict = umsgpack.unpackb(message.payload) + messageD = Message().deserialise(message.payload) except: - print("Incorrect Message") + print("Incorrect Message Has Been Sent") return - if "type" in messageDict.keys: - # Need to consider that a malicious message may have a type with incorrect subtypes. - if messageDict["type"] == "connect": - # Voter just connected/reconnnected. - if not messageDict["client"] in self._connectedVoters: - self._connectedVoters.append(messageDict["client"]) - elif messageDict["type"] == "vote": - # Voter is sending in their vote. - if self._taking_votes: - # Commander must have requested their taking votes, and the timeout - # has not occurred. - # Only add vote to list if the client has not already voted. - if messageDict["client"] not in self._votes: - self._votes[messageDict["client"]] = messageDict["vote"] + + # Need to consider that a malicious message may have a type with incorrect subtypes. + if messageD.type == "connect": + # Voter just connected/reconnnected. + if not messageD["client"] in self._connectedVoters: + self._connectedVoters.append(messageD["client"]) + elif messageD.type == "vote": + # Voter is sending in their vote. + if self._taking_votes: + # Commander must have requested their taking votes, and the timeout + # has not occurred. + # Only add vote to list if the client has not already voted. + if messageD.sender not in self._votes: + self._votes[messageD.sender] = messageD.data["vote"] - elif messageDict["type"] == "disconnected": - self._connectedVoters.remove(messageDict["type"]) - else: - # Not a message we should be using. - pass + elif messageD.type == "disconnected": + self._connectedVoters.remove(messageD.sender) def on_connect(self, client, userdata, flags, rc): self.client.subscribe("swarm1/commander")