From 1e0c5b575f2814bb947816ab39e468fb566a187d Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Tue, 8 Jan 2019 12:40:54 +1030 Subject: [PATCH 01/20] Add class stubs for paxos version --- DecisionSystem/PaxosDecision/acceptor.py | 0 DecisionSystem/PaxosDecision/learner.py | 0 DecisionSystem/PaxosDecision/proposer.py | 0 3 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 DecisionSystem/PaxosDecision/acceptor.py create mode 100644 DecisionSystem/PaxosDecision/learner.py create mode 100644 DecisionSystem/PaxosDecision/proposer.py diff --git a/DecisionSystem/PaxosDecision/acceptor.py b/DecisionSystem/PaxosDecision/acceptor.py new file mode 100644 index 0000000..e69de29 diff --git a/DecisionSystem/PaxosDecision/learner.py b/DecisionSystem/PaxosDecision/learner.py new file mode 100644 index 0000000..e69de29 diff --git a/DecisionSystem/PaxosDecision/proposer.py b/DecisionSystem/PaxosDecision/proposer.py new file mode 100644 index 0000000..e69de29 From c12c57bfcfe50048a6945d13044d4c2a219df402 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Thu, 10 Jan 2019 14:54:06 +1030 Subject: [PATCH 02/20] Add centralised folder and initial implementations for decision system. --- .../CentralisedDecision/ballotvoter.py | 21 ++++++++ .../CentralisedDecision/commander.py | 53 +++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 DecisionSystem/CentralisedDecision/ballotvoter.py create mode 100644 DecisionSystem/CentralisedDecision/commander.py diff --git a/DecisionSystem/CentralisedDecision/ballotvoter.py b/DecisionSystem/CentralisedDecision/ballotvoter.py new file mode 100644 index 0000000..fd24914 --- /dev/null +++ b/DecisionSystem/CentralisedDecision/ballotvoter.py @@ -0,0 +1,21 @@ +import paho.mqtt.client as mqtt + +class BallotVoter: + def __init__(self): + self.client = mqtt.Client() + self.client.connect('172.16.43.2') + + def on_connect(self, client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + if rc == 0: + global connected + connected = True + + self.client.subscribe('swarm1/voters', qos=1) + + def on_message(self, client, userdata, message): + + + def submit_vote(self): + self.client.publish('swarm1/') + \ No newline at end of file diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py new file mode 100644 index 0000000..ac3d0f0 --- /dev/null +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -0,0 +1,53 @@ +import Messaging.mqttsession as ms +import time +import umsgpack + +class Commander: + currentVote = None + + def __init__(self, timeout = 60): + ''' + Initial/default waiting time is 1 minute for votes to come in. + ''' + self.timeout = timeout + self._votes = [] + ms.Client() + + def on_message(client, userdata, message): + self._votes.append(umsgpack.unpackb(message.payload)) + + ms.client.subscribe("FakeSwarm/FirstTest", on_message) + + ms.client.loop_start() + + def make_decision(self): + votes = self._votes + dif_votes = {} + + for vote in votes: + # Get the average/max, etc. + if str(vote) in dif_votes: + dif_votes[str(vote)] = dif_votes[str(vote)] + 1 + else: + dif_votes[str(vote)] = 1 + + max_vote = None + max_vote_num = 0 + for vote in dif_votes.keys(): + if dif_votes[vote] > max_vote_num: + max_vote = vote + max_vote_num = dif_votes[vote] + + return max_vote + + def get_votes(self, topic, message, qos=0): + # Publish a message that votes are needed. + ms.client.publish(topic, message, qos) + time.sleep(self.timeout) + self.make_decision() + + def add_subscription(self, topic, callback=None, qos=0): + ms.client.subscribe(topic) + + if callback is not None: + ms.client.message_callback_add(topic, callback) \ No newline at end of file From 2abdf7bc1ebb0f943b3e74fd9eab5011e0e3af6a Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Thu, 10 Jan 2019 14:54:53 +1030 Subject: [PATCH 03/20] Add files for Paxos decision system. --- DecisionSystem/PaxosDecision/acceptor.py | 26 +++++++++++++ DecisionSystem/PaxosDecision/learner.py | 2 + .../PaxosDecision/paxos_instance.py | 14 +++++++ .../PaxosDecision/paxosmessenger.py | 37 +++++++++++++++++++ DecisionSystem/PaxosDecision/proposal.py | 32 ++++++++++++++++ DecisionSystem/PaxosDecision/proposer.py | 3 ++ 6 files changed, 114 insertions(+) create mode 100644 DecisionSystem/PaxosDecision/paxos_instance.py create mode 100644 DecisionSystem/PaxosDecision/paxosmessenger.py create mode 100644 DecisionSystem/PaxosDecision/proposal.py diff --git a/DecisionSystem/PaxosDecision/acceptor.py b/DecisionSystem/PaxosDecision/acceptor.py index e69de29..a9cbe0e 100644 --- a/DecisionSystem/PaxosDecision/acceptor.py +++ b/DecisionSystem/PaxosDecision/acceptor.py @@ -0,0 +1,26 @@ +from proposer import Proposer +from paxosmessenger import Messenger + +class Acceptor(): + highest_proposal = None + highest_promise = None + messenger = None + + def __init__(self, messenger: Messenger): + self.messenger = messenger + + def on_message(self, client, userdata, message): + pass + + def prepare(self): + pass + + def accept_request(self, fromt_uid, proposal_id, value): + ''' + Called when an accept message is received from a proposer + ''' + if proposal_id >= self.highest_promise: + self.highest_promise = proposal_id + self.accepted_id = proposal_id + self.accepted_value = value + self.messenger.send_accepted(proposal_id, self.accepted_value) \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/learner.py b/DecisionSystem/PaxosDecision/learner.py index e69de29..37955b4 100644 --- a/DecisionSystem/PaxosDecision/learner.py +++ b/DecisionSystem/PaxosDecision/learner.py @@ -0,0 +1,2 @@ +class Learner(): + pass \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/paxos_instance.py b/DecisionSystem/PaxosDecision/paxos_instance.py new file mode 100644 index 0000000..41d8609 --- /dev/null +++ b/DecisionSystem/PaxosDecision/paxos_instance.py @@ -0,0 +1,14 @@ +''' +@author Michael Pivato + +Much thanks to Tom Cocagne by providing basic paxos code +which was the basis of this module and algorithm. +Check out the original at: https://github.com/cocagne/paxos/blob/master/paxos/essential.py +''' + +from learner import Learner +from acceptor import Acceptor +from proposer import Proposer + +class PaxosInstance(): + pass \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/paxosmessenger.py b/DecisionSystem/PaxosDecision/paxosmessenger.py new file mode 100644 index 0000000..47e38a6 --- /dev/null +++ b/DecisionSystem/PaxosDecision/paxosmessenger.py @@ -0,0 +1,37 @@ +from proposal import Proposal + +class Messenger(): + def send_prepare(self, proposal_id): + ''' + Broadcasts a Prepare message to all Acceptors + ''' + + NotImplementedError + + def send_promise(self, proposer_id, proposal_id, previous_proposal): + ''' + Sends a Promise message to the specified Proposer + ''' + + NotImplementedError + + def send_accept(self, proposal): + ''' + Broadcasts an Accept message to all Acceptors + ''' + + NotImplementedError + + def send_accepted(self, proposal): + ''' + Broadcasts an Accepted message to all Learners + ''' + + NotImplementedError + + def on_resolution(self, proposal_id, value): + ''' + Called when a resolution is reached + ''' + + NotImplementedError \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/proposal.py b/DecisionSystem/PaxosDecision/proposal.py new file mode 100644 index 0000000..ef1424a --- /dev/null +++ b/DecisionSystem/PaxosDecision/proposal.py @@ -0,0 +1,32 @@ +from Messaging.messaginginterface import Message +import umsgpack + +# Uses MessagePack for message transfer. + +class Proposal(Message): + ballot_number = None + vote = None + + def __init__(self): + pass + + def serialise(self) -> str: + return umsgpack.packb({"ballot": self.ballot_number, "vote": self.vote}) + + def deserialise(self, binary: bytearray): + ''' + Deserialises a given proposal message into a proposal object. + ''' + obj = umsgpack.unpackb(binary) + old_ballot = self.ballot_number + old_vote = self.vote + try: + self.ballot_number = obj['ballot'] + self.vote = obj['vote'] + except: + # Reset the values in case they have been changed. + self.ballot_number = old_ballot + self.vote = old_vote + print("Error: binary object is not a proposal") + + \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/proposer.py b/DecisionSystem/PaxosDecision/proposer.py index e69de29..2bb19b8 100644 --- a/DecisionSystem/PaxosDecision/proposer.py +++ b/DecisionSystem/PaxosDecision/proposer.py @@ -0,0 +1,3 @@ +class Proposer(): + def propose(self): + pass \ No newline at end of file From 5e7e79e9a980228eb4989b8346a291a51654c7da Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Thu, 10 Jan 2019 14:55:06 +1030 Subject: [PATCH 04/20] Move commander file --- DecisionSystem/commander.py | 35 ----------------------------------- 1 file changed, 35 deletions(-) delete mode 100644 DecisionSystem/commander.py diff --git a/DecisionSystem/commander.py b/DecisionSystem/commander.py deleted file mode 100644 index c9258b8..0000000 --- a/DecisionSystem/commander.py +++ /dev/null @@ -1,35 +0,0 @@ -import Messaging.mqttsession as ms -import time -import umsgpack - -class Commander: - - def __init__(self, timeout = 60): - self.timeout = timeout - self._votes = [] - ms.Client() - - def on_message(client, userdata, message): - self._votes.append(umsgpack.unpackb(message.payload)) - - ms.client.subscribe("FakeSwarm/FirstTest", on_message) - - ms.client.loop_start() - - def make_decision(self): - votes = self._votes - - for vote in votes: - continue - - def get_votes(self, topic, message, qos=0): - # Publish a message that votes are needed. - ms.client.publish(topic, message, qos) - time.sleep(self.timeout) - self.make_decision() - - def add_subscription(self, topic, callback=None, qos=0): - ms.client.subscribe(topic) - - if callback is not None: - ms.client.message_callback_add(topic, callback) \ No newline at end of file From ccc252e5b93801ba5f34e15e297d192743356429 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Thu, 10 Jan 2019 14:55:31 +1030 Subject: [PATCH 05/20] Add methods for callbacks to the mqtt client. --- Messaging/mqttsession.py | 38 +++++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/Messaging/mqttsession.py b/Messaging/mqttsession.py index b7366f2..7353eaf 100644 --- a/Messaging/mqttsession.py +++ b/Messaging/mqttsession.py @@ -1,11 +1,28 @@ import paho.mqtt.client as mqtt +""" +Wrapper module for paho mqtt library, providing a singleton instance of the client to be used. +Also adds some convenience functions such as having multiple connected callbacks, +and managing whether the client is still connected. +""" + + client = mqtt.Client() host = None -""" -Wrapper module for paho mqtt library, providing a singleton instance of the client to be used. -""" +connect_callbacks = [] +disconnect_callbacks = [] + +def on_connect(client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + if rc == 0: + global connected + connected = True + + for callback in connect_callbacks: + callback() + + client.subscribe('hello/test', qos=1) # Arguably not needed, just want to make the client static, but here anyway. def connect(): @@ -16,6 +33,14 @@ def connect(): client.connect(host, port=1883, keepalive=60, bind_address="") client.loop_start() +def add_connect_callback(callback): + global connect_callbacks + connect_callbacks += callback + connectted = True + +def add_disconnect_callback(callback): + global + def disconnect(): global client if client is not None: @@ -24,6 +49,13 @@ def disconnect(): else: print("Error: Client is not initialised.") +def on_disconnect(client, userdata, rc): + if rc != 0: + print("Unexpected disconnection.") + + global connected + connected = False + def Client(): global client if client is None: From 49de393edb9a2c4a2ec5ba34c05b24e364382a0c Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Fri, 11 Jan 2019 11:39:04 +1030 Subject: [PATCH 06/20] Add some comments for future details, as well as partial of on_message --- .../CentralisedDecision/ballotvoter.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/ballotvoter.py b/DecisionSystem/CentralisedDecision/ballotvoter.py index fd24914..0394c35 100644 --- a/DecisionSystem/CentralisedDecision/ballotvoter.py +++ b/DecisionSystem/CentralisedDecision/ballotvoter.py @@ -1,20 +1,34 @@ import paho.mqtt.client as mqtt +import umsgpack class BallotVoter: def __init__(self): self.client = mqtt.Client() - self.client.connect('172.16.43.2') + # id is generated automatically. + self.client.connect('172.16.13.128') def on_connect(self, client, userdata, flags, rc): print("Connected with result code " + str(rc)) - if rc == 0: - global connected - connected = True self.client.subscribe('swarm1/voters', qos=1) + # Create the message to send that it is now part of the swarm. + + # Need to set a will as well to broadcast on unexpected disconnection, so commander + # knows it is no longer part of the set of voters. + # Leaving this until core centralised system is working. + #will_message = {"type": "UDisconnect"} + + # Send a connected message to let any commanders know that + # it is available. + self.client.publish("swarm/commander", "connect," + self.client._client_id) + def on_message(self, client, userdata, message): - + messageDict = umsgpack.unpackb(message) + if "type" in messageDict.keys: + # Ok message. + else: + # Bad message. def submit_vote(self): self.client.publish('swarm1/') From b61da848f4e4f0c0fb14066e276f37aa5347864f Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Fri, 11 Jan 2019 11:39:18 +1030 Subject: [PATCH 07/20] Add coco classes --- COCO-classes.txt | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) create mode 100644 COCO-classes.txt diff --git a/COCO-classes.txt b/COCO-classes.txt new file mode 100644 index 0000000..1f42c8e --- /dev/null +++ b/COCO-classes.txt @@ -0,0 +1,80 @@ +person +bicycle +car +motorcycle +airplane +bus +train +truck +boat +traffic light +fire hydrant +stop sign +parking meter +bench +bird +cat +dog +horse +sheep +cow +elephant +bear +zebra +giraffe +backpack +umbrella +handbag +tie +suitcase +frisbee +skis +snowboard +sports ball +kite +baseball bat +baseball glove +skateboard +surfboard +tennis racket +bottle +wine glass +cup +fork +knife +spoon +bowl +banana +apple +sandwich +orange +broccoli +carrot +hot dog +pizza +donut +cake +chair +couch +potted plant +bed +dining table +toilet +tv +laptop +mouse +remote +keyboard +cell phone +microwave +oven +toaster +sink +refrigerator +book +clock +vase +scissors +teddy bear +hair drier +toothbrush \ No newline at end of file From acc287d5727b8b75f5636ef9e18436f52a3076e2 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Fri, 11 Jan 2019 11:40:04 +1030 Subject: [PATCH 08/20] Remove use of external mqtt file, just use paho library --- .../CentralisedDecision/commander.py | 49 ++++++++++++++----- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index ac3d0f0..2265063 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -1,26 +1,31 @@ -import Messaging.mqttsession as ms +import paho.mqtt.client as mqtt import time import umsgpack class Commander: currentVote = None + # Stores voters that connect to maintain a majority. + # Voters who do not vote in latest round are removed. + connectedVoters = {} + votes = [] + taking_votes = False + def __init__(self, timeout = 60): ''' Initial/default waiting time is 1 minute for votes to come in. ''' - self.timeout = timeout - self._votes = [] - ms.Client() + self.timeout = timeout - def on_message(client, userdata, message): - self._votes.append(umsgpack.unpackb(message.payload)) - - ms.client.subscribe("FakeSwarm/FirstTest", on_message) + self.client = mqtt.Client() + self.client.connect('172.16.13.128') - ms.client.loop_start() + # Commander needs a will message too, for the decentralised version, so the + # voters know to pick a new commander. def make_decision(self): + # Could change this to be a strategy, for different implementations of making + # a decision on the votes. votes = self._votes dif_votes = {} @@ -40,9 +45,11 @@ class Commander: return max_vote - def get_votes(self, topic, message, qos=0): + def get_votes(): + message = {"type": "voteRequest"} + message_packed = umsgpack.packb(message) # Publish a message that votes are needed. - ms.client.publish(topic, message, qos) + ms.client.publish("swarm1/voters", message_packed, qos) time.sleep(self.timeout) self.make_decision() @@ -50,4 +57,22 @@ class Commander: ms.client.subscribe(topic) if callback is not None: - ms.client.message_callback_add(topic, callback) \ No newline at end of file + ms.client.message_callback_add(topic, callback) + + def on_message(self, client, userdata, message): + messageDict = umsgpack.unpackb() + + if "type" in messageDict.keys: + if messageDict["type"] == "connect": + # Voter just connected/reconnnected. + + elif messageDict["type"] == "vote": + # Voter is sending in their vote. + if messageDict[""] + + else: + # Not a message we should be using. + NotImplementedError + + def on_connect(client, userdata, flags, rc): + self.client.subscribe("swarm1/commander") \ No newline at end of file From 97fd356038b8881e02f899b51356abf981bd0c9e Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Fri, 11 Jan 2019 11:40:27 +1030 Subject: [PATCH 09/20] Add mqtt messenger file for implementation of paxos messenger --- DecisionSystem/PaxosDecision/mqttmessenger.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 DecisionSystem/PaxosDecision/mqttmessenger.py diff --git a/DecisionSystem/PaxosDecision/mqttmessenger.py b/DecisionSystem/PaxosDecision/mqttmessenger.py new file mode 100644 index 0000000..fc47ba2 --- /dev/null +++ b/DecisionSystem/PaxosDecision/mqttmessenger.py @@ -0,0 +1,42 @@ +from DecisionSystem.PaxosDecision.paxosmessenger import Messenger +from + +class MqttMessenger(Messenger): + + def __init__(self): + + + def send_prepare(self, proposal_id): + ''' + Broadcasts a Prepare message to all Acceptors + ''' + + + + def send_promise(self, proposer_id, proposal_id, previous_proposal): + ''' + Sends a Promise message to the specified Proposer + ''' + + NotImplementedError + + def send_accept(self, proposal): + ''' + Broadcasts an Accept message to all Acceptors + ''' + + NotImplementedError + + def send_accepted(self, proposal): + ''' + Broadcasts an Accepted message to all Learners + ''' + + NotImplementedError + + def on_resolution(self, proposal_id, value): + ''' + Called when a resolution is reached + ''' + + NotImplementedError \ No newline at end of file From 8d92467a99d9a48f088b5408bde61e563aa842ba Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 08:35:44 +1030 Subject: [PATCH 10/20] Remove last traces of external mqtt abstraction Also subscribed to commander and changed name of vote request message. --- .../CentralisedDecision/commander.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index 2265063..f97f0b4 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -20,11 +20,13 @@ class Commander: self.client = mqtt.Client() self.client.connect('172.16.13.128') + self.client.subscribe("swarm1/commander") # Commander needs a will message too, for the decentralised version, so the # voters know to pick a new commander. + # If using apache zookeeper this won't be needed. def make_decision(self): - # Could change this to be a strategy, for different implementations of making + # Should change this to be a strategy, for different implementations of making # a decision on the votes. votes = self._votes dif_votes = {} @@ -46,26 +48,28 @@ class Commander: return max_vote def get_votes(): - message = {"type": "voteRequest"} + message = {"type": "reqVote"} message_packed = umsgpack.packb(message) # Publish a message that votes are needed. - ms.client.publish("swarm1/voters", message_packed, qos) + self.client.publish("swarm1/voters", message_packed, qos) time.sleep(self.timeout) self.make_decision() - def add_subscription(self, topic, callback=None, qos=0): - ms.client.subscribe(topic) + # Shouldn't be needed anymore. + # def add_subscription(self, topic, callback=None, qos=0): + # self.client.subscribe(topic) - if callback is not None: - ms.client.message_callback_add(topic, callback) + # if callback is not None: + # selfclient.message_callback_add(topic, callback) def on_message(self, client, userdata, message): messageDict = umsgpack.unpackb() + if "type" in messageDict.keys: if messageDict["type"] == "connect": # Voter just connected/reconnnected. - + pass elif messageDict["type"] == "vote": # Voter is sending in their vote. if messageDict[""] From 31367b18930eadf1e833c88069afc2276a3924b4 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 08:36:38 +1030 Subject: [PATCH 11/20] Add message check for having a vote requested --- DecisionSystem/CentralisedDecision/ballotvoter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/DecisionSystem/CentralisedDecision/ballotvoter.py b/DecisionSystem/CentralisedDecision/ballotvoter.py index 0394c35..5c994cf 100644 --- a/DecisionSystem/CentralisedDecision/ballotvoter.py +++ b/DecisionSystem/CentralisedDecision/ballotvoter.py @@ -27,6 +27,8 @@ class BallotVoter: messageDict = umsgpack.unpackb(message) if "type" in messageDict.keys: # Ok message. + if messageDict["type"] == "reqVote": + submit_vote() else: # Bad message. From f101bf61bffe54f3ea340676cdd4ce9930286ef7 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 08:55:36 +1030 Subject: [PATCH 12/20] Put custom on_vote message for voter to use their own method. This maintains cohesion so the voter class is just focussed on how to vote, not also on how to collect the votes, which could be by a means such as an image or video sensor. --- .../CentralisedDecision/ballotvoter.py | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/ballotvoter.py b/DecisionSystem/CentralisedDecision/ballotvoter.py index 5c994cf..b2e1bff 100644 --- a/DecisionSystem/CentralisedDecision/ballotvoter.py +++ b/DecisionSystem/CentralisedDecision/ballotvoter.py @@ -2,10 +2,11 @@ import paho.mqtt.client as mqtt import umsgpack class BallotVoter: - def __init__(self): + def __init__(self, on_vote): self.client = mqtt.Client() # id is generated automatically. self.client.connect('172.16.13.128') + self.on_vote = on_vote def on_connect(self, client, userdata, flags, rc): print("Connected with result code " + str(rc)) @@ -19,19 +20,26 @@ class BallotVoter: # Leaving this until core centralised system is working. #will_message = {"type": "UDisconnect"} - # Send a connected message to let any commanders know that - # it is available. - self.client.publish("swarm/commander", "connect," + self.client._client_id) + self.send_connect() def on_message(self, client, userdata, message): messageDict = umsgpack.unpackb(message) if "type" in messageDict.keys: # Ok message. if messageDict["type"] == "reqVote": - submit_vote() + self.submit_vote() + if messageDict["type"] == "listening": + self.send_connect() else: # Bad message. + pass def submit_vote(self): - self.client.publish('swarm1/') - \ No newline at end of file + binary = umsgpack.packb("type": "vote", "client": self.client._client_id, "vote": self.on_vote()) + self.client.publish('swarm1/commander', binary) + + def send_connect(self): + # Send a connected message to let any commanders know that + # it is available. + binary = umsgpack.packb({"type": "connect", "id": self.client._client_id}) + self.client.publish("swarm/commander", binary) \ No newline at end of file From 6444d6d3a045d39ff0d9a2014092de29e713bf09 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 08:56:34 +1030 Subject: [PATCH 13/20] Add message implementations and make private variables have underscore --- .../CentralisedDecision/commander.py | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index f97f0b4..7df9b01 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -7,9 +7,9 @@ class Commander: # Stores voters that connect to maintain a majority. # Voters who do not vote in latest round are removed. - connectedVoters = {} - votes = [] - taking_votes = False + _connectedVoters = [] + _votes = [] + _taking_votes = False def __init__(self, timeout = 60): ''' @@ -47,7 +47,7 @@ class Commander: return max_vote - def get_votes(): + def get_votes(self): message = {"type": "reqVote"} message_packed = umsgpack.packb(message) # Publish a message that votes are needed. @@ -64,19 +64,21 @@ class Commander: def on_message(self, client, userdata, message): messageDict = umsgpack.unpackb() - - if "type" in messageDict.keys: + # Need to consider that a malicious message may have a type with incorrect subtypes. if messageDict["type"] == "connect": # Voter just connected/reconnnected. - pass + if not messageDict["client"] in self._connectedVoters: + self._connectedVoters.append(messageDict["client"]) elif messageDict["type"] == "vote": # Voter is sending in their vote. - if messageDict[""] - + # Only add vote to list if the client has not already voted. + pass + elif messageDict["type"] == "disconnected": + self._connectedVoters.remove(messageDict["type"]) else: # Not a message we should be using. - NotImplementedError + pass - def on_connect(client, userdata, flags, rc): + def on_connect(self, client, userdata, flags, rc): self.client.subscribe("swarm1/commander") \ No newline at end of file From 6bdf03abfc0a850068389c7c0f5554407d18951a Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 09:02:26 +1030 Subject: [PATCH 14/20] Add implementation for taking votes --- DecisionSystem/CentralisedDecision/commander.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index 7df9b01..8d8f1d4 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -8,7 +8,8 @@ class Commander: # Stores voters that connect to maintain a majority. # Voters who do not vote in latest round are removed. _connectedVoters = [] - _votes = [] + # Dict has format: {clientId: vote} + _votes = {} _taking_votes = False def __init__(self, timeout = 60): @@ -50,9 +51,11 @@ class Commander: def get_votes(self): message = {"type": "reqVote"} message_packed = umsgpack.packb(message) + self._taking_votes = True # Publish a message that votes are needed. self.client.publish("swarm1/voters", message_packed, qos) time.sleep(self.timeout) + self._taking_votes = False self.make_decision() # Shouldn't be needed anymore. @@ -72,8 +75,13 @@ class Commander: self._connectedVoters.append(messageDict["client"]) elif messageDict["type"] == "vote": # Voter is sending in their vote. - # Only add vote to list if the client has not already voted. - pass + if self._taking_votes: + # Commander must have requested their taking votes, and the timeout + # has not occurred. + # Only add vote to list if the client has not already voted. + if messageDict["client"] not in self._votes.keys: + self._votes[messageDict["client"]] = messageDict["vote"] + elif messageDict["type"] == "disconnected": self._connectedVoters.remove(messageDict["type"]) else: From 8db3f279fece1551442202b5c028539638013131 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 09:03:04 +1030 Subject: [PATCH 15/20] Remove previous method relating to subscription additions --- DecisionSystem/CentralisedDecision/commander.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index 8d8f1d4..2582273 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -58,13 +58,6 @@ class Commander: self._taking_votes = False self.make_decision() - # Shouldn't be needed anymore. - # def add_subscription(self, topic, callback=None, qos=0): - # self.client.subscribe(topic) - - # if callback is not None: - # selfclient.message_callback_add(topic, callback) - def on_message(self, client, userdata, message): messageDict = umsgpack.unpackb() if "type" in messageDict.keys: From 4f4f4d531c4d25a09dfeaf3883a85974ca141736 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 09:03:32 +1030 Subject: [PATCH 16/20] Remove qos argument --- DecisionSystem/CentralisedDecision/commander.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index 2582273..b394d4d 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -53,7 +53,7 @@ class Commander: message_packed = umsgpack.packb(message) self._taking_votes = True # Publish a message that votes are needed. - self.client.publish("swarm1/voters", message_packed, qos) + self.client.publish("swarm1/voters", message_packed) time.sleep(self.timeout) self._taking_votes = False self.make_decision() From e122e572b6dc3a758a87c90c98e8989db9c8a62c Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 09:14:41 +1030 Subject: [PATCH 17/20] Update voter interface --- DecisionSystem/voter.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/DecisionSystem/voter.py b/DecisionSystem/voter.py index 6b0da48..4c0fcbf 100644 --- a/DecisionSystem/voter.py +++ b/DecisionSystem/voter.py @@ -2,12 +2,8 @@ from Messaging.packmessage import PackMessage import umsgpack class Voter: - def __init__(self): - # self._client = - pass - - def submit_vote(self, vote_contents): + def submit_vote(self): raise NotImplementedError() - def request_vote(self): + def send_connected(self): raise NotImplementedError() \ No newline at end of file From 90d096a91831df94b62e7670261935b6da992402 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 11:17:26 +1030 Subject: [PATCH 18/20] Add client sending the vote as a private variable. --- DecisionSystem/vote.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/DecisionSystem/vote.py b/DecisionSystem/vote.py index f5a5e47..28c34cf 100644 --- a/DecisionSystem/vote.py +++ b/DecisionSystem/vote.py @@ -1,8 +1,10 @@ from Messaging.packmessage import PackMessage class Vote(PackMessage): - def __init__(self, vote = None): + def __init__(self, client, vote = None): self._vote = vote + self._client = client + @property def vote(self): From 3c978b3ca3eb5f12f4c47b4c53a1924437171115 Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 11:19:27 +1030 Subject: [PATCH 19/20] Add method to get swarm participants for new leader, fix some comments --- DecisionSystem/CentralisedDecision/commander.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py index b394d4d..c78d15a 100644 --- a/DecisionSystem/CentralisedDecision/commander.py +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -27,13 +27,13 @@ class Commander: # If using apache zookeeper this won't be needed. def make_decision(self): - # Should change this to be a strategy, for different implementations of making - # a decision on the votes. + # Should change this to follow strategy pattern, for different implementations of + # making a decision on the votes. votes = self._votes dif_votes = {} for vote in votes: - # Get the average/max, etc. + # Get the count of different votes. if str(vote) in dif_votes: dif_votes[str(vote)] = dif_votes[str(vote)] + 1 else: @@ -49,6 +49,7 @@ class Commander: return max_vote def get_votes(self): + # Should consider abstracting these messages to another class for portability. message = {"type": "reqVote"} message_packed = umsgpack.packb(message) self._taking_votes = True @@ -72,7 +73,7 @@ class Commander: # Commander must have requested their taking votes, and the timeout # has not occurred. # Only add vote to list if the client has not already voted. - if messageDict["client"] not in self._votes.keys: + if messageDict["client"] not in self._votes: self._votes[messageDict["client"]] = messageDict["vote"] elif messageDict["type"] == "disconnected": @@ -82,4 +83,8 @@ class Commander: pass def on_connect(self, client, userdata, flags, rc): - self.client.subscribe("swarm1/commander") \ No newline at end of file + self.client.subscribe("swarm1/commander") + + def get_participants(self): + + self.client.publish("swarm1/voters") \ No newline at end of file From 0b075b6edf5c0cc1e28ebb2f00b8e329e0a7563a Mon Sep 17 00:00:00 2001 From: "DSTO\\pivatom" Date: Mon, 14 Jan 2019 11:20:15 +1030 Subject: [PATCH 20/20] Add voter class for fusion implementation --- .../DecentralisedActivityFusion/voter.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 DecisionSystem/DecentralisedActivityFusion/voter.py diff --git a/DecisionSystem/DecentralisedActivityFusion/voter.py b/DecisionSystem/DecentralisedActivityFusion/voter.py new file mode 100644 index 0000000..ebf3593 --- /dev/null +++ b/DecisionSystem/DecentralisedActivityFusion/voter.py @@ -0,0 +1,26 @@ +class Voter: + ''' + This class acts to replicate sensor information with the network to come to a consensus + of an activity occurrance. This is based upon research by Song et al. available at: + https://ieeexplore.ieee.org/document/5484586 + + The main advantage of this approach, as apposed to techniques such as by using zookeeper + or consul, is it can be completely decentralised and so works without a central server, + or needing to elect a central server. Additionally, it does not require all nodes + to run a Zookeeper/Consul server instance, which were not designed for these constrained + combat environments, which will fail if half the nodes fail, and also use a lot of resources + for handling services not required by this task. + + The original approach in the paper requires some previous training before sensing, so + that there is a probability of a given action based upon the previous set of actions. + ''' + def __init__(self, on_vote): + ''' + on_vote: Callback to get the required vote to broadcast. + ''' + self.on_vote = on_vote + + def submit_vote(self): + pass + + def \ No newline at end of file