import paho.mqtt.client as mqtt import time from DecisionSystem.messages import Message, CommanderWill, RequestVote import json import numpy as np class Commander: currentVote = None # Stores voters that connect to maintain a majority. # Voters who do not vote in latest round are removed. _connectedVoters = [] # Dict has format: {clientId: vote} _votes = {} _taking_votes = False def __init__(self, timeout = 60): ''' Initial/default waiting time is 1 minute for votes to come in. ''' self.timeout = timeout # Default configuration file for testing. cfg = None with open('config.json') as json_config: cfg = json.load(json_config) self._cfg = cfg self.client = mqtt.Client() self.client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["host"]) self.client.subscribe("swarm1/commander") # Commander needs a will message too, for the decentralised version, so the # voters know to pick a new commander. # If using apache zookeeper this won't be needed. self.client.publish("swarm1/voters", CommanderWill(self.client._client_id)) def make_decision(self): # Should change this to follow strategy pattern, for different implementations of # making a decision on the votes. votes = self._votes dif_votes = {} for vote in votes: # Get the count of different votes. if str(vote) in dif_votes: dif_votes[str(vote)] = dif_votes[str(vote)] + 1 else: dif_votes[str(vote)] = 1 max_vote = None max_vote_num = 0 # Should try using a numpy array for this. for vote in dif_votes.keys(): if dif_votes[vote] > max_vote_num: max_vote = vote max_vote_num = dif_votes[vote] return max_vote def get_votes(self): # Should consider abstracting these messages to another class for portability. self._taking_votes = True # Publish a message that votes are needed. self.client.publish("swarm1/voters", RequestVote(self.client._client_id)) time.sleep(self.timeout) self._taking_votes = False self.make_decision() def on_message(self, client, userdata, message): messageD = None try: messageD = Message().deserialise(message.payload) except: print("Incorrect Message Has Been Sent") return # Need to consider that a malicious message may have a type with incorrect subtypes. if messageD.type == "connect": # Voter just connected/reconnnected. if not messageD["client"] in self._connectedVoters: self._connectedVoters.append(messageD["client"]) elif messageD.type == "vote": # Voter is sending in their vote. if self._taking_votes: # Commander must have requested their taking votes, and the timeout # has not occurred. # Only add vote to list if the client has not already voted. if messageD.sender not in self._votes: self._votes[messageD.sender] = messageD.data["vote"] elif messageD.type == "disconnected": self._connectedVoters.remove(messageD.sender) def on_connect(self, client, userdata, flags, rc): self.client.subscribe("swarm1/commander") def get_participants(self): self.client.publish("swarm1/voters")