import paho.mqtt.client as mqtt import time import umsgpack class Commander: currentVote = None # Stores voters that connect to maintain a majority. # Voters who do not vote in latest round are removed. _connectedVoters = [] # Dict has format: {clientId: vote} _votes = {} _taking_votes = False def __init__(self, timeout = 60): ''' Initial/default waiting time is 1 minute for votes to come in. ''' self.timeout = timeout self.client = mqtt.Client() self.client.connect('172.16.13.128') self.client.subscribe("swarm1/commander") # Commander needs a will message too, for the decentralised version, so the # voters know to pick a new commander. # If using apache zookeeper this won't be needed. def make_decision(self): # Should change this to be a strategy, for different implementations of making # a decision on the votes. votes = self._votes dif_votes = {} for vote in votes: # Get the average/max, etc. if str(vote) in dif_votes: dif_votes[str(vote)] = dif_votes[str(vote)] + 1 else: dif_votes[str(vote)] = 1 max_vote = None max_vote_num = 0 for vote in dif_votes.keys(): if dif_votes[vote] > max_vote_num: max_vote = vote max_vote_num = dif_votes[vote] return max_vote def get_votes(self): message = {"type": "reqVote"} message_packed = umsgpack.packb(message) self._taking_votes = True # Publish a message that votes are needed. self.client.publish("swarm1/voters", message_packed, qos) time.sleep(self.timeout) self._taking_votes = False self.make_decision() # Shouldn't be needed anymore. # def add_subscription(self, topic, callback=None, qos=0): # self.client.subscribe(topic) # if callback is not None: # selfclient.message_callback_add(topic, callback) def on_message(self, client, userdata, message): messageDict = umsgpack.unpackb() if "type" in messageDict.keys: # Need to consider that a malicious message may have a type with incorrect subtypes. if messageDict["type"] == "connect": # Voter just connected/reconnnected. if not messageDict["client"] in self._connectedVoters: self._connectedVoters.append(messageDict["client"]) elif messageDict["type"] == "vote": # Voter is sending in their vote. if self._taking_votes: # Commander must have requested their taking votes, and the timeout # has not occurred. # Only add vote to list if the client has not already voted. if messageDict["client"] not in self._votes.keys: self._votes[messageDict["client"]] = messageDict["vote"] elif messageDict["type"] == "disconnected": self._connectedVoters.remove(messageDict["type"]) else: # Not a message we should be using. pass def on_connect(self, client, userdata, flags, rc): self.client.subscribe("swarm1/commander")