diff --git a/COCO-classes.txt b/COCO-classes.txt new file mode 100644 index 0000000..1f42c8e --- /dev/null +++ b/COCO-classes.txt @@ -0,0 +1,80 @@ +person +bicycle +car +motorcycle +airplane +bus +train +truck +boat +traffic light +fire hydrant +stop sign +parking meter +bench +bird +cat +dog +horse +sheep +cow +elephant +bear +zebra +giraffe +backpack +umbrella +handbag +tie +suitcase +frisbee +skis +snowboard +sports ball +kite +baseball bat +baseball glove +skateboard +surfboard +tennis racket +bottle +wine glass +cup +fork +knife +spoon +bowl +banana +apple +sandwich +orange +broccoli +carrot +hot dog +pizza +donut +cake +chair +couch +potted plant +bed +dining table +toilet +tv +laptop +mouse +remote +keyboard +cell phone +microwave +oven +toaster +sink +refrigerator +book +clock +vase +scissors +teddy bear +hair drier +toothbrush \ No newline at end of file diff --git a/DecisionSystem/CentralisedDecision/ballotvoter.py b/DecisionSystem/CentralisedDecision/ballotvoter.py new file mode 100644 index 0000000..b2e1bff --- /dev/null +++ b/DecisionSystem/CentralisedDecision/ballotvoter.py @@ -0,0 +1,45 @@ +import paho.mqtt.client as mqtt +import umsgpack + +class BallotVoter: + def __init__(self, on_vote): + self.client = mqtt.Client() + # id is generated automatically. + self.client.connect('172.16.13.128') + self.on_vote = on_vote + + def on_connect(self, client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + + self.client.subscribe('swarm1/voters', qos=1) + + # Create the message to send that it is now part of the swarm. + + # Need to set a will as well to broadcast on unexpected disconnection, so commander + # knows it is no longer part of the set of voters. + # Leaving this until core centralised system is working. + #will_message = {"type": "UDisconnect"} + + self.send_connect() + + def on_message(self, client, userdata, message): + messageDict = umsgpack.unpackb(message) + if "type" in messageDict.keys: + # Ok message. + if messageDict["type"] == "reqVote": + self.submit_vote() + if messageDict["type"] == "listening": + self.send_connect() + else: + # Bad message. + pass + + def submit_vote(self): + binary = umsgpack.packb("type": "vote", "client": self.client._client_id, "vote": self.on_vote()) + self.client.publish('swarm1/commander', binary) + + def send_connect(self): + # Send a connected message to let any commanders know that + # it is available. + binary = umsgpack.packb({"type": "connect", "id": self.client._client_id}) + self.client.publish("swarm/commander", binary) \ No newline at end of file diff --git a/DecisionSystem/CentralisedDecision/commander.py b/DecisionSystem/CentralisedDecision/commander.py new file mode 100644 index 0000000..c78d15a --- /dev/null +++ b/DecisionSystem/CentralisedDecision/commander.py @@ -0,0 +1,90 @@ +import paho.mqtt.client as mqtt +import time +import umsgpack + +class Commander: + currentVote = None + + # Stores voters that connect to maintain a majority. + # Voters who do not vote in latest round are removed. + _connectedVoters = [] + # Dict has format: {clientId: vote} + _votes = {} + _taking_votes = False + + def __init__(self, timeout = 60): + ''' + Initial/default waiting time is 1 minute for votes to come in. + ''' + self.timeout = timeout + + self.client = mqtt.Client() + self.client.connect('172.16.13.128') + + self.client.subscribe("swarm1/commander") + # Commander needs a will message too, for the decentralised version, so the + # voters know to pick a new commander. + # If using apache zookeeper this won't be needed. + + def make_decision(self): + # Should change this to follow strategy pattern, for different implementations of + # making a decision on the votes. + votes = self._votes + dif_votes = {} + + for vote in votes: + # Get the count of different votes. + if str(vote) in dif_votes: + dif_votes[str(vote)] = dif_votes[str(vote)] + 1 + else: + dif_votes[str(vote)] = 1 + + max_vote = None + max_vote_num = 0 + for vote in dif_votes.keys(): + if dif_votes[vote] > max_vote_num: + max_vote = vote + max_vote_num = dif_votes[vote] + + return max_vote + + def get_votes(self): + # Should consider abstracting these messages to another class for portability. + message = {"type": "reqVote"} + message_packed = umsgpack.packb(message) + self._taking_votes = True + # Publish a message that votes are needed. + self.client.publish("swarm1/voters", message_packed) + time.sleep(self.timeout) + self._taking_votes = False + self.make_decision() + + def on_message(self, client, userdata, message): + messageDict = umsgpack.unpackb() + if "type" in messageDict.keys: + # Need to consider that a malicious message may have a type with incorrect subtypes. + if messageDict["type"] == "connect": + # Voter just connected/reconnnected. + if not messageDict["client"] in self._connectedVoters: + self._connectedVoters.append(messageDict["client"]) + elif messageDict["type"] == "vote": + # Voter is sending in their vote. + if self._taking_votes: + # Commander must have requested their taking votes, and the timeout + # has not occurred. + # Only add vote to list if the client has not already voted. + if messageDict["client"] not in self._votes: + self._votes[messageDict["client"]] = messageDict["vote"] + + elif messageDict["type"] == "disconnected": + self._connectedVoters.remove(messageDict["type"]) + else: + # Not a message we should be using. + pass + + def on_connect(self, client, userdata, flags, rc): + self.client.subscribe("swarm1/commander") + + def get_participants(self): + + self.client.publish("swarm1/voters") \ No newline at end of file diff --git a/DecisionSystem/DecentralisedActivityFusion/voter.py b/DecisionSystem/DecentralisedActivityFusion/voter.py new file mode 100644 index 0000000..ebf3593 --- /dev/null +++ b/DecisionSystem/DecentralisedActivityFusion/voter.py @@ -0,0 +1,26 @@ +class Voter: + ''' + This class acts to replicate sensor information with the network to come to a consensus + of an activity occurrance. This is based upon research by Song et al. available at: + https://ieeexplore.ieee.org/document/5484586 + + The main advantage of this approach, as apposed to techniques such as by using zookeeper + or consul, is it can be completely decentralised and so works without a central server, + or needing to elect a central server. Additionally, it does not require all nodes + to run a Zookeeper/Consul server instance, which were not designed for these constrained + combat environments, which will fail if half the nodes fail, and also use a lot of resources + for handling services not required by this task. + + The original approach in the paper requires some previous training before sensing, so + that there is a probability of a given action based upon the previous set of actions. + ''' + def __init__(self, on_vote): + ''' + on_vote: Callback to get the required vote to broadcast. + ''' + self.on_vote = on_vote + + def submit_vote(self): + pass + + def \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/acceptor.py b/DecisionSystem/PaxosDecision/acceptor.py new file mode 100644 index 0000000..a9cbe0e --- /dev/null +++ b/DecisionSystem/PaxosDecision/acceptor.py @@ -0,0 +1,26 @@ +from proposer import Proposer +from paxosmessenger import Messenger + +class Acceptor(): + highest_proposal = None + highest_promise = None + messenger = None + + def __init__(self, messenger: Messenger): + self.messenger = messenger + + def on_message(self, client, userdata, message): + pass + + def prepare(self): + pass + + def accept_request(self, fromt_uid, proposal_id, value): + ''' + Called when an accept message is received from a proposer + ''' + if proposal_id >= self.highest_promise: + self.highest_promise = proposal_id + self.accepted_id = proposal_id + self.accepted_value = value + self.messenger.send_accepted(proposal_id, self.accepted_value) \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/learner.py b/DecisionSystem/PaxosDecision/learner.py new file mode 100644 index 0000000..37955b4 --- /dev/null +++ b/DecisionSystem/PaxosDecision/learner.py @@ -0,0 +1,2 @@ +class Learner(): + pass \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/mqttmessenger.py b/DecisionSystem/PaxosDecision/mqttmessenger.py new file mode 100644 index 0000000..fc47ba2 --- /dev/null +++ b/DecisionSystem/PaxosDecision/mqttmessenger.py @@ -0,0 +1,42 @@ +from DecisionSystem.PaxosDecision.paxosmessenger import Messenger +from + +class MqttMessenger(Messenger): + + def __init__(self): + + + def send_prepare(self, proposal_id): + ''' + Broadcasts a Prepare message to all Acceptors + ''' + + + + def send_promise(self, proposer_id, proposal_id, previous_proposal): + ''' + Sends a Promise message to the specified Proposer + ''' + + NotImplementedError + + def send_accept(self, proposal): + ''' + Broadcasts an Accept message to all Acceptors + ''' + + NotImplementedError + + def send_accepted(self, proposal): + ''' + Broadcasts an Accepted message to all Learners + ''' + + NotImplementedError + + def on_resolution(self, proposal_id, value): + ''' + Called when a resolution is reached + ''' + + NotImplementedError \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/paxos_instance.py b/DecisionSystem/PaxosDecision/paxos_instance.py new file mode 100644 index 0000000..41d8609 --- /dev/null +++ b/DecisionSystem/PaxosDecision/paxos_instance.py @@ -0,0 +1,14 @@ +''' +@author Michael Pivato + +Much thanks to Tom Cocagne by providing basic paxos code +which was the basis of this module and algorithm. +Check out the original at: https://github.com/cocagne/paxos/blob/master/paxos/essential.py +''' + +from learner import Learner +from acceptor import Acceptor +from proposer import Proposer + +class PaxosInstance(): + pass \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/paxosmessenger.py b/DecisionSystem/PaxosDecision/paxosmessenger.py new file mode 100644 index 0000000..47e38a6 --- /dev/null +++ b/DecisionSystem/PaxosDecision/paxosmessenger.py @@ -0,0 +1,37 @@ +from proposal import Proposal + +class Messenger(): + def send_prepare(self, proposal_id): + ''' + Broadcasts a Prepare message to all Acceptors + ''' + + NotImplementedError + + def send_promise(self, proposer_id, proposal_id, previous_proposal): + ''' + Sends a Promise message to the specified Proposer + ''' + + NotImplementedError + + def send_accept(self, proposal): + ''' + Broadcasts an Accept message to all Acceptors + ''' + + NotImplementedError + + def send_accepted(self, proposal): + ''' + Broadcasts an Accepted message to all Learners + ''' + + NotImplementedError + + def on_resolution(self, proposal_id, value): + ''' + Called when a resolution is reached + ''' + + NotImplementedError \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/proposal.py b/DecisionSystem/PaxosDecision/proposal.py new file mode 100644 index 0000000..ef1424a --- /dev/null +++ b/DecisionSystem/PaxosDecision/proposal.py @@ -0,0 +1,32 @@ +from Messaging.messaginginterface import Message +import umsgpack + +# Uses MessagePack for message transfer. + +class Proposal(Message): + ballot_number = None + vote = None + + def __init__(self): + pass + + def serialise(self) -> str: + return umsgpack.packb({"ballot": self.ballot_number, "vote": self.vote}) + + def deserialise(self, binary: bytearray): + ''' + Deserialises a given proposal message into a proposal object. + ''' + obj = umsgpack.unpackb(binary) + old_ballot = self.ballot_number + old_vote = self.vote + try: + self.ballot_number = obj['ballot'] + self.vote = obj['vote'] + except: + # Reset the values in case they have been changed. + self.ballot_number = old_ballot + self.vote = old_vote + print("Error: binary object is not a proposal") + + \ No newline at end of file diff --git a/DecisionSystem/PaxosDecision/proposer.py b/DecisionSystem/PaxosDecision/proposer.py new file mode 100644 index 0000000..2bb19b8 --- /dev/null +++ b/DecisionSystem/PaxosDecision/proposer.py @@ -0,0 +1,3 @@ +class Proposer(): + def propose(self): + pass \ No newline at end of file diff --git a/DecisionSystem/commander.py b/DecisionSystem/commander.py deleted file mode 100644 index c9258b8..0000000 --- a/DecisionSystem/commander.py +++ /dev/null @@ -1,35 +0,0 @@ -import Messaging.mqttsession as ms -import time -import umsgpack - -class Commander: - - def __init__(self, timeout = 60): - self.timeout = timeout - self._votes = [] - ms.Client() - - def on_message(client, userdata, message): - self._votes.append(umsgpack.unpackb(message.payload)) - - ms.client.subscribe("FakeSwarm/FirstTest", on_message) - - ms.client.loop_start() - - def make_decision(self): - votes = self._votes - - for vote in votes: - continue - - def get_votes(self, topic, message, qos=0): - # Publish a message that votes are needed. - ms.client.publish(topic, message, qos) - time.sleep(self.timeout) - self.make_decision() - - def add_subscription(self, topic, callback=None, qos=0): - ms.client.subscribe(topic) - - if callback is not None: - ms.client.message_callback_add(topic, callback) \ No newline at end of file diff --git a/DecisionSystem/vote.py b/DecisionSystem/vote.py index f5a5e47..28c34cf 100644 --- a/DecisionSystem/vote.py +++ b/DecisionSystem/vote.py @@ -1,8 +1,10 @@ from Messaging.packmessage import PackMessage class Vote(PackMessage): - def __init__(self, vote = None): + def __init__(self, client, vote = None): self._vote = vote + self._client = client + @property def vote(self): diff --git a/DecisionSystem/voter.py b/DecisionSystem/voter.py index 6b0da48..4c0fcbf 100644 --- a/DecisionSystem/voter.py +++ b/DecisionSystem/voter.py @@ -2,12 +2,8 @@ from Messaging.packmessage import PackMessage import umsgpack class Voter: - def __init__(self): - # self._client = - pass - - def submit_vote(self, vote_contents): + def submit_vote(self): raise NotImplementedError() - def request_vote(self): + def send_connected(self): raise NotImplementedError() \ No newline at end of file diff --git a/Messaging/mqttsession.py b/Messaging/mqttsession.py index b7366f2..7353eaf 100644 --- a/Messaging/mqttsession.py +++ b/Messaging/mqttsession.py @@ -1,11 +1,28 @@ import paho.mqtt.client as mqtt +""" +Wrapper module for paho mqtt library, providing a singleton instance of the client to be used. +Also adds some convenience functions such as having multiple connected callbacks, +and managing whether the client is still connected. +""" + + client = mqtt.Client() host = None -""" -Wrapper module for paho mqtt library, providing a singleton instance of the client to be used. -""" +connect_callbacks = [] +disconnect_callbacks = [] + +def on_connect(client, userdata, flags, rc): + print("Connected with result code " + str(rc)) + if rc == 0: + global connected + connected = True + + for callback in connect_callbacks: + callback() + + client.subscribe('hello/test', qos=1) # Arguably not needed, just want to make the client static, but here anyway. def connect(): @@ -16,6 +33,14 @@ def connect(): client.connect(host, port=1883, keepalive=60, bind_address="") client.loop_start() +def add_connect_callback(callback): + global connect_callbacks + connect_callbacks += callback + connectted = True + +def add_disconnect_callback(callback): + global + def disconnect(): global client if client is not None: @@ -24,6 +49,13 @@ def disconnect(): else: print("Error: Client is not initialised.") +def on_disconnect(client, userdata, rc): + if rc != 0: + print("Unexpected disconnection.") + + global connected + connected = False + def Client(): global client if client is None: