Almost finish decentralised voter.
This commit is contained in:
@@ -1,5 +1,8 @@
|
|||||||
import paho.mqtt.client as mqtt
|
import paho.mqtt.client as mqtt
|
||||||
import time
|
import time
|
||||||
|
import json
|
||||||
|
import umsgpack
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
class Voter:
|
class Voter:
|
||||||
'''
|
'''
|
||||||
@@ -17,26 +20,90 @@ class Voter:
|
|||||||
The original approach in the paper requires some previous training before sensing, so
|
The original approach in the paper requires some previous training before sensing, so
|
||||||
that there is a probability of a given action based upon the previous set of actions.
|
that there is a probability of a given action based upon the previous set of actions.
|
||||||
'''
|
'''
|
||||||
def __init__(self, on_vote):
|
_votes = {}
|
||||||
|
_connected_voters = []
|
||||||
|
_taking_votes = False
|
||||||
|
|
||||||
|
def __init__(self, on_vote, swarm_name):
|
||||||
'''
|
'''
|
||||||
on_vote: Callback to get the required vote to broadcast.
|
on_vote: Callback to get the required vote to broadcast.
|
||||||
'''
|
'''
|
||||||
|
# Load config file
|
||||||
|
cfg = None
|
||||||
|
with open('config.json') as json_config:
|
||||||
|
cfg = json.load(json_config)
|
||||||
|
self._cfg = cfg
|
||||||
self.on_vote = on_vote
|
self.on_vote = on_vote
|
||||||
self.client = mqtt.Client()
|
self._swarm = swarm_name
|
||||||
|
self._client = mqtt.Client()
|
||||||
|
self._client.on_message = self.on_message
|
||||||
|
self._client.on_connect = self.on_connect
|
||||||
|
self._client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["timeout"])
|
||||||
|
self._client.loop_start()
|
||||||
|
|
||||||
def submit_vote(self):
|
def submit_vote(self):
|
||||||
# Publish to swarm where all other voters will receive a vote.
|
# Publish to swarm where all other voters will receive a vote.
|
||||||
|
self._client.publish(self._swarm, self.collect_vote)
|
||||||
|
self._taking_votes = True
|
||||||
|
time.sleep(self._cfg["mqtt"]["timeout"])
|
||||||
|
self._taking_votes = False
|
||||||
# Wait a certain amount of time for responses, then fuse the information.
|
# Wait a certain amount of time for responses, then fuse the information.
|
||||||
self.fuse_algorithm()
|
self.fuse_algorithm()
|
||||||
|
|
||||||
# Need the error and number of timestamps since voting started to finalise the consensus.
|
# Need the error and number of timestamps since voting started to finalise the consensus.
|
||||||
|
|
||||||
def fuse_algorithm(self):
|
def fuse_algorithm(self):
|
||||||
|
# First calculate vi -> the actual vote that is taken
|
||||||
|
# (Or the probability that the observation is a label for each)
|
||||||
|
# We're just going to be doing 1 for the detected and 0 for all others.
|
||||||
|
# vi = np.zeros(6,1)
|
||||||
|
# ANDvi = np.zeros(6,6)
|
||||||
|
|
||||||
|
# # Set diagonal of ANDvi to elements of vi.
|
||||||
|
# for i in np.size(vi):
|
||||||
|
# ANDvi[i,i] = vi[i]
|
||||||
|
|
||||||
|
# M is the probability of going from one state to the next, which
|
||||||
|
# is assumed to be uniform for our situation - someone is just as likely
|
||||||
|
# to raise 5 fingers from two or any other.
|
||||||
|
# And so a 6x6 matrix is generated with all same probability to show this.
|
||||||
|
# Remember they could be holding up no fingers...
|
||||||
|
# m = np.full((6,6), 0.2)
|
||||||
|
|
||||||
|
# Y1T = np.full((6,1),1)
|
||||||
|
|
||||||
|
# Moving to an approach that does not require the previous
|
||||||
|
# timestep (or so much math...)
|
||||||
|
# First take other information and fuse, using algorithm
|
||||||
|
# as appropriate.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def on_message(self, client, userdata, message):
|
def on_message(self, client, userdata, message):
|
||||||
pass
|
try:
|
||||||
|
message_dict = umsgpack.unpackb(message.payload)
|
||||||
|
except:
|
||||||
|
print("Incorrect message received")
|
||||||
|
return
|
||||||
|
|
||||||
|
if message_dict["type"] == "vote":
|
||||||
|
# received a vote
|
||||||
|
if self._taking_votes:
|
||||||
|
self._votes[message_dict["client"]] = message_dict["vote"]
|
||||||
|
|
||||||
|
elif message_dict["type"] == "connect":
|
||||||
|
# voter connected to the swarm
|
||||||
|
self._connected_voters.append(message_dict["client"])
|
||||||
|
|
||||||
|
elif message_dict["type"] == "disconnect":
|
||||||
|
# Sent as the voter's will message
|
||||||
|
self._connected_voters.remove(message_dict["client"])
|
||||||
|
|
||||||
|
def on_connect(self, client, userdata, flags, rc):
|
||||||
|
print("Connected with result code " + str(rc))
|
||||||
|
self._client.subscribe(self._swarm)
|
||||||
|
|
||||||
|
def collect_vote(self):
|
||||||
|
vote_message = umsgpack.packb({"type": "vote",
|
||||||
|
"client":self._client._client_id, "vote": self.on_vote()})
|
||||||
|
return vote_message
|
||||||
Reference in New Issue
Block a user