Change to use external messaging module

This commit is contained in:
Michael Pivato
2019-01-24 10:59:26 +10:30
parent 6de217a02f
commit 57db02a402
3 changed files with 174 additions and 38 deletions

View File

@@ -1,28 +1,27 @@
import paho.mqtt.client as mqtt
import json import json
from DecisionSystem.messages import ConnectSwarm, SubmitVote, Message, deserialise, RequestVote from DecisionSystem.messages import ConnectSwarm, SubmitVote, Message, deserialise, RequestVote
from multiprocessing import Pool
from messenger import Messenger
class BallotVoter: class BallotVoter:
def __init__(self, on_vote): def __init__(self, on_vote, messenger: Messenger):
# Load config file # Load config file
cfg = None cfg = None
with open('../config.json') as json_config: with open('config.json') as json_config:
cfg = json.load(json_config) cfg = json.load(json_config)
self._cfg = cfg self._cfg = cfg
self.client = mqtt.Client() self.messenger = messenger
self.client.on_connect = self.on_connect self.messenger.add_message_callback(self.on_message)
self.client.on_message = self.on_message self.messenger.add_connect(self.on_connect)
# id is generated automatically.
self.client.connect(cfg["mqtt"]["host"], cfg["mqtt"]["port"], cfg["mqtt"]["timeout"])
self.client.loop_start()
self.on_vote = on_vote self.on_vote = on_vote
def on_connect(self, client, userdata, flags, rc): def on_connect(self, rc):
print("Connected with result code " + str(rc)) print("Connected with result code " + str(rc))
self.client.subscribe('swarm1/voters', qos=1) # Should subscribes be handled by the messenger?
#self.client.subscribe('swarm1/voters', qos=1)
# Need to set a will as well to broadcast on unexpected disconnection, so commander # Need to set a will as well to broadcast on unexpected disconnection, so commander
# knows it is no longer part of the set of voters. # knows it is no longer part of the set of voters.
@@ -30,9 +29,9 @@ class BallotVoter:
#will_message = {"type": "UDisconnect"} #will_message = {"type": "UDisconnect"}
# Tell commander we are now connected. # Tell commander we are now connected.
# self.send_connect() self.send_connect()
def on_message(self, client, userdata, message): def on_message(self, message):
print("Message Received!") print("Message Received!")
messageD = deserialise(message.payload) messageD = deserialise(message.payload)
print("Message Type: " + messageD.type) print("Message Type: " + messageD.type)
@@ -51,14 +50,24 @@ class BallotVoter:
def submit_vote(self): def submit_vote(self):
v = self.on_vote() v = self.on_vote()
if v == None:
print('Could not vote on the frame')
return
print("Got Vote") print("Got Vote")
vote = SubmitVote(v, self.client._client_id) vote = SubmitVote(v, self.messenger.id)
print('created vote') print('created vote')
self.client.publish('swarm1/commander', vote.serialise()) self.messenger.broadcast_message(self.messenger.swarm, vote.serialise())
print('published vote') print('published vote')
def submit_vote_multicore(self):
'''
Uses multiple processes to work on multiple frames at the same time
to take an average for this voter, or if it could not find a hand
in one frame but can in another.
'''
pass
def send_connect(self): def send_connect(self):
# Send a connected message to let any commanders know that # Send a connected message to let any commanders know that
# it is available. # it is available.
self.client.publish("swarm1/commander", ConnectSwarm(self.client._client_id).serialise()) self.messenger.broadcast_message(self.messenger.swarm, ConnectSwarm(self.messenger.id).serialise())

View File

@@ -1,4 +1,3 @@
import paho.mqtt.client as mqtt
import time import time
from DecisionSystem.messages import Message, CommanderWill, RequestVote, GetSwarmParticipants, deserialise from DecisionSystem.messages import Message, CommanderWill, RequestVote, GetSwarmParticipants, deserialise
import json import json
@@ -14,24 +13,17 @@ class Commander:
_votes = {} _votes = {}
_taking_votes = False _taking_votes = False
def __init__(self, timeout = 60): def __init__(self, messenger, timeout = 60):
''' '''
Initial/default waiting time is 1 minute for votes to come in. Initial/default waiting time is 1 minute for votes to come in.
''' '''
self.timeout = timeout self.timeout = timeout
# Default configuration file for testing.
cfg = None
with open('config.json') as json_config:
cfg = json.load(json_config)
self._cfg = cfg
self.client = mqtt.Client() self._messenger = messenger
self.client.on_connect = self.on_connect self._messenger.add_connect(self.on_connect)
self.client.on_message = self.on_message self._messenger.add_message_callback(self.on_message)
self.client.on_disconnect = self.on_disconnect self._messenger.add_disconnect_callback(self.on_disconnect)
self.client.connect(cfg["mqtt"]["host"], int(cfg["mqtt"]["port"]), int(cfg["mqtt"]["timeout"])) print('Connecting')
# Don't forget to start the event loop!!!
self.client.loop_start()
def make_decision(self): def make_decision(self):
# Should change this to follow strategy pattern, for different implementations of # Should change this to follow strategy pattern, for different implementations of
@@ -65,13 +57,13 @@ class Commander:
self._taking_votes = True self._taking_votes = True
# Publish a message that votes are needed. # Publish a message that votes are needed.
print("Sending request message") print("Sending request message")
self.client.publish("swarm1/voters", RequestVote(self.client._client_id).serialise()) self._messenger.broadcast_message(self._messenger.swarm, RequestVote(self._messenger.id).serialise())
print("published message") print("published message")
time.sleep(self.timeout) time.sleep(self.timeout)
self._taking_votes = False self._taking_votes = False
return self.make_decision() return self.make_decision()
def on_message(self, client, userdata, message): def on_message(self, message):
print("Message Received") print("Message Received")
messageD = None messageD = None
try: try:
@@ -101,16 +93,18 @@ class Commander:
print("Voter disconnected :(") print("Voter disconnected :(")
self._connectedVoters.remove(messageD.sender) self._connectedVoters.remove(messageD.sender)
def on_connect(self, client, userdata, flags, rc): def on_connect(self, rc):
self.client.subscribe("swarm1/commander") # Subscribes now handled by the mqtt messenger, this is just here
# for convenience later.
pass
def get_participants(self): def get_participants(self):
self.client.publish("swarm1/voters", GetSwarmParticipants().serialise()) self._messenger.broadcast_message(self._messenger.swarm, GetSwarmParticipants().serialise())
# Commander needs a will message too, for the decentralised version, so the # Commander needs a will message too, for the decentralised version, so the
# voters know to pick a new commander. # voters know to pick a new commander.
# If using apache zookeeper this won't be needed. # If using apache zookeeper this won't be needed.
# That's the wrong method for setting a will. # That's the wrong method for setting a will.
# self.client.publish("swarm1/voters", CommanderWill(self.client._client_id).serialise()) # self.client.publish("swarm1/voters", CommanderWill(self.client._client_id).serialise())
def on_disconnect(self, client, userdata, rc): def on_disconnect(self, rc):
pass pass

View File

@@ -0,0 +1,133 @@
import paho.mqtt.client as mqtt
import json
class Messenger:
_connect_callbacks = []
_disconnect_callbacks = []
_message_callbacks = []
def __init__(self):
pass
def broadcast_message(self, topic, message):
'''
Broadcasts the specified message to the swarm based upon its topic(or group).
'''
raise NotImplementedError
def unicast_message(self, target, message):
'''
Broadcasts the specified message to the single target.
'''
raise NotImplementedError
def connect(self):
'''
Connect to the swarm.
'''
raise NotImplementedError
def disconnect(self):
'''
Disconnect from the swarm.
'''
raise NotImplementedError
def add_connect(self, connect):
'''
Adds a callback to do something else once we are connected.
'''
self._connect_callbacks.append(connect)
def on_connect(self, code = None):
'''
Called once the messenger connects to the swarm.
'''
for cb in self._connect_callbacks:
cb(code)
def on_disconnect(self, code = None):
'''
Called when the messenger is disconnected from the swarm.
'''
for cb in self._disconnect_callbacks:
cb(code)
def add_disconnect_callback(self, on_disconnect):
'''
Adds a callback for when the messenger is disconnected.
'''
self._disconnect_callbacks.append(on_disconnect)
def add_message_callback(self, on_message):
'''
Adds a callback
'''
self._message_callbacks.append(on_message)
def on_message(self, message):
'''
Called when the messenger receives a message.
'''
for cb in self._message_callbacks:
cb(message)
@property
def id(self):
'''
The id for this messenger that is being used in communication.
'''
raise NotImplementedError
@property
def swarm(self):
'''
Gets the name of the swarm this instance is a part of.
'''
raise NotImplementedError
class MqttMessenger(Messenger):
'''
A messenger that uses MQTT.
'''
def __init__(self):
with open('config.json') as json_config:
self._cfg = json.load(json_config)
self._client = mqtt.Client()
self._client.on_connect = self.on_connect
self._client.on_message = self.on_message
self._client.on_disconnect = self.on_disconnect
def on_message(self, client, userdata, message):
Messenger.on_message(self, message)
def on_connect(self, client, userdata, flags, rc):
# Subscribe to the swarm specified in the config.
self._client.subscribe(self._cfg['mqtt']['swarm'])
Messenger.on_connect(self, rc)
def on_disconnect(self, client, userdata, rc):
Messenger.on_disconnect(self, rc)
def broadcast_message(self, topic, message):
self._client.publish(topic, message, qos=1)
def connect(self):
self._client.connect(self._cfg['mqtt']['host'], \
int(self._cfg['mqtt']['port']), \
int(self._cfg['mqtt']['timeout']))
self._client.loop_start()
def disconnec(self):
self._client.disconnect()
@property
def id(self):
return self._client._client_id
@property
def swarm(self):
return self._cfg['mqtt']['swarm']