import paho.mqtt.client as mqtt import time import json import umsgpack import numpy as np class Voter: ''' This class acts to replicate sensor information with the network to come to a consensus of an activity occurrance. This is based upon research by Song et al. available at: https://ieeexplore.ieee.org/document/5484586 The main advantage of this approach, as apposed to techniques such as by using zookeeper or consul, is it can be completely decentralised and so works without a central server, or needing to elect a central server. Additionally, it does not require all nodes to run a Zookeeper/Consul server instance, which were not designed for these constrained combat environments, which will fail if half the nodes fail, and also use a lot of resources for handling services not required by this task. The original approach in the paper requires some previous training before sensing, so that there is a probability of a given action based upon the previous set of actions. ''' _votes = {} _connected_voters = [] _taking_votes = False def __init__(self, on_vote, swarm_name): ''' on_vote: Callback to get the required vote to broadcast. ''' # Load config file cfg = None with open('config.json') as json_config: cfg = json.load(json_config) self._cfg = cfg self.on_vote = on_vote self._swarm = swarm_name self._client = mqtt.Client() self._client.on_message = self.on_message self._client.on_connect = self.on_connect self._client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["timeout"]) self._client.loop_start() def submit_vote(self): # Publish to swarm where all other voters will receive a vote. self._client.publish(self._swarm, self.collect_vote) self._taking_votes = True time.sleep(self._cfg["mqtt"]["timeout"]) self._taking_votes = False # Wait a certain amount of time for responses, then fuse the information. self.fuse_algorithm() # Need the error and number of timestamps since voting started to finalise the consensus. def fuse_algorithm(self): # First calculate vi -> the actual vote that is taken # (Or the probability that the observation is a label for each) # We're just going to be doing 1 for the detected and 0 for all others. # vi is for each hand (action in paper), but we're just going to do a single # hand for our purposes. Will be able to use the CNN for all hands/gestures if we want to. vi = np.zeros(6,1) # Set correct vi. vote = self.on_vote() vi[vote] = 1 # Now send this off to the other nodes. Potentially using gossip... # Set diagonal of ANDvi to elements of vi. # This should actually be ANDvj, as it is for each observation received. ANDvi = np.diag(vi.flatten()) # Nee # M is the probability of going from one state to the next, which # is assumed to be uniform for our situation - someone is just as likely # to raise 5 fingers from two or any other. # And so a 6x6 matrix is generated with all same probability to show this. # Remember they could be holding up no fingers... # m = np.full((6,6), 0.2) # Y1T = np.full((6,1),1) # Compute consensus state estimate by taking difference between our observations # and all others individually. # Moving to an approach that does not require the previous # timestep (or so much math...) # First take other information and fuse, using algorithm # as appropriate. pass def custom_fuse(self): vi = np.zeros(6,1) # Set correct vi. vote = self.on_vote() vi[vote] = 1 def on_message(self, client, userdata, message): try: message_dict = umsgpack.unpackb(message.payload) except: print("Incorrect message received") return if message_dict["type"] == "vote": # received a vote if self._taking_votes: self._votes[message_dict["client"]] = message_dict["vote"] elif message_dict["type"] == "connect": # voter connected to the swarm self._connected_voters.append(message_dict["client"]) elif message_dict["type"] == "disconnect": # Sent as the voter's will message self._connected_voters.remove(message_dict["client"]) def on_connect(self, client, userdata, flags, rc): print("Connected with result code " + str(rc)) self._client.subscribe(self._swarm) def collect_vote(self): vote_message = umsgpack.packb({"type": "vote", "client":self._client._client_id, "vote": self.on_vote()}) return vote_message def start_vote(self): pass