diff --git a/DecisionSystem/DecentralisedActivityFusion/voter.py b/DecisionSystem/DecentralisedActivityFusion/voter.py index 39b00a9..da5b345 100644 --- a/DecisionSystem/DecentralisedActivityFusion/voter.py +++ b/DecisionSystem/DecentralisedActivityFusion/voter.py @@ -1,5 +1,8 @@ import paho.mqtt.client as mqtt import time +import json +import umsgpack +import numpy as np class Voter: ''' @@ -17,26 +20,90 @@ class Voter: The original approach in the paper requires some previous training before sensing, so that there is a probability of a given action based upon the previous set of actions. ''' - def __init__(self, on_vote): + _votes = {} + _connected_voters = [] + _taking_votes = False + + def __init__(self, on_vote, swarm_name): ''' on_vote: Callback to get the required vote to broadcast. ''' + # Load config file + cfg = None + with open('config.json') as json_config: + cfg = json.load(json_config) + self._cfg = cfg self.on_vote = on_vote - self.client = mqtt.Client() + self._swarm = swarm_name + self._client = mqtt.Client() + self._client.on_message = self.on_message + self._client.on_connect = self.on_connect + self._client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["timeout"]) + self._client.loop_start() def submit_vote(self): # Publish to swarm where all other voters will receive a vote. - - + self._client.publish(self._swarm, self.collect_vote) + self._taking_votes = True + time.sleep(self._cfg["mqtt"]["timeout"]) + self._taking_votes = False # Wait a certain amount of time for responses, then fuse the information. self.fuse_algorithm() # Need the error and number of timestamps since voting started to finalise the consensus. def fuse_algorithm(self): + # First calculate vi -> the actual vote that is taken + # (Or the probability that the observation is a label for each) + # We're just going to be doing 1 for the detected and 0 for all others. + # vi = np.zeros(6,1) + # ANDvi = np.zeros(6,6) + + # # Set diagonal of ANDvi to elements of vi. + # for i in np.size(vi): + # ANDvi[i,i] = vi[i] + + # M is the probability of going from one state to the next, which + # is assumed to be uniform for our situation - someone is just as likely + # to raise 5 fingers from two or any other. + # And so a 6x6 matrix is generated with all same probability to show this. + # Remember they could be holding up no fingers... + # m = np.full((6,6), 0.2) + + # Y1T = np.full((6,1),1) + + # Moving to an approach that does not require the previous + # timestep (or so much math...) + # First take other information and fuse, using algorithm + # as appropriate. pass + def on_message(self, client, userdata, message): - pass - - \ No newline at end of file + try: + message_dict = umsgpack.unpackb(message.payload) + except: + print("Incorrect message received") + return + + if message_dict["type"] == "vote": + # received a vote + if self._taking_votes: + self._votes[message_dict["client"]] = message_dict["vote"] + + elif message_dict["type"] == "connect": + # voter connected to the swarm + self._connected_voters.append(message_dict["client"]) + + elif message_dict["type"] == "disconnect": + # Sent as the voter's will message + self._connected_voters.remove(message_dict["client"]) + + def on_connect(self, client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + self._client.subscribe(self._swarm) + + def collect_vote(self): + vote_message = umsgpack.packb({"type": "vote", + "client":self._client._client_id, "vote": self.on_vote()}) + return vote_message \ No newline at end of file