Files
picar/pycar/MyRaft/messagestrategy.py

215 lines
6.9 KiB
Python

import json
from concurrent import futures
import time
import multiprocessing as mp
import threading
import grpc
import zmq
import MyRaft.raft_pb2_grpc as raft_pb2_grpc
import MyRaft.raft_pb2 as raft_pb2
class MessageStrategy:
def __init__(self):
pass
def send_RequestVote(self, request):
raise NotImplementedError
def send_AppendEntries(self, request):
raise NotImplementedError
def on_VoteReceived(self, future):
raise NotImplementedError
def on_EntriesResponse(self, future):
raise NotImplementedError
def connect_channels(self):
raise NotImplementedError
class NodeGrpcServer(raft_pb2_grpc.RaftServicer):
"""Contains the gRPC server for the raft node."""
def __init__(self, raftNode, port: int):
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
self.server.add_insecure_port('[::]:%d' % port)
self._raft = raftNode
def AppendEntriesRPC(self, request, context):
"""AppendEntries remote procedural call for raft.
Args:
request: The AppendEntries message sent by the leader
context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html
Returns:
An AppendEntriesResponse message.
"""
print("Received append entries rpc")
# Leaving this here for now, just in case we need it later (gets the client ip address)
# str(context._rpc_event.call_details.host)
return self._raft.rcv_AppendEntries(request)
def RequestVoteRPC(self, request, context):
"""RequestVote remote procedural call for raft.
Args:
request: The RequestVote message sent by the leader
context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html
Returns:
A RequestVoteResponse message
"""
print("Received request vote rpc")
print(request)
result = self._raft.vote_requested(request)
print("Now returning our vote.")
print(result)
print(type(result))
return result
def start_server(self):
print("Starting servicer")
raft_pb2_grpc.add_RaftServicer_to_server(self, self.server)
self.server.start()
while True:
time.sleep(60*60)
class GrpcMessageStrategy(MessageStrategy):
"""This class uses gRPC to communicate between raft nodes."""
# Only create the channels if we become a candidate or leader.
# Also need to close the channels when we become the follower.
def __init__(self, server: NodeGrpcServer, config):
# Also need to consider TLS/secure connection
self._cfg = config
self._neighbours = self._cfg['messaging']['neighbours']
self._server = server
self.message_callbacks = []
self._futures = []
self.channels = None
def connect_channels(self):
print("Creating channels")
self.channels = []
for n in self._neighbours:
channel = grpc.insecure_channel('%s:%d' % (n['ip'], n['port']))
self.channels.append(channel)
def send_RequestVote(self, vote):
print("Sending Request Vote")
if self.channels is None:
self.connect_channels()
for channel in self.channels:
print("In channel")
try:
stub = raft_pb2_grpc.RaftStub(channel)
print("connected")
# vote = stub.RequestVoteRPC(vote)
future = stub.RequestVoteRPC.future(vote)
future.add_done_callback(self.on_VoteReceived)
# print("sending vote received back to node.")
# self._server._raft.vote_received(vote)
except Exception as e:
# print("Couldn't message.")
# print(e)
pass
def on_VoteReceived(self, future):
print("A vote was returned")
print("sending vote received back to node.")
self._server._raft.vote_received(future.result())
def send_AppendEntries(self, entries):
for channel in self.channels:
stub = raft_pb2_grpc.RaftStub(channel)
future = stub.AppendEntriesRPC.future(entries)
future.add_done_callback(self.on_EntriesResponse)
def on_EntriesResponse(self, future):
# Pass to leader? Doesn't matter for now since we aren't using the
# log yet.
print("Received append entries response.")
class ZmqServer:
# Zmq subscribers can subscribe to multiple publishers. However,
# subscribers are not thread safe - Radio-dish pattern aims to solve that.
def __init__(self, config):
self._cfg = config
self.context = zmq.Context()
self.socketSub = self.context.socket(zmq.SUB)
self.started = True
def connect_channels(self):
# Also need to subscribe to other nodes...
for n in self._cfg["messaging"]["neighbours"]:
self.socketSub.connect("tcp://%s:%d" % (n["ip"], n["port"]))
print("Neighbours are connected.")
def start(self):
# Start receiving on a new thread.
t = threading.Thread(target=self.start_receiving)
t.start()
def start_receiving(self):
while self.started:
self.on_message(self.socketSub.recv())
def stop(self):
self.started = False
def on_message(self, message):
m = message.deserialise()
try:
a = m.leaderId
# We have append entries
a = self.context.rcv_AppendEntries(m)
# Need to send back a message with our response. May be easier
# to do this with a request reply mechanism, rather than publish
# subscribe.
except:
pass
try:
a = m.leaderId
# We have request vote.
self.context.rcv_AppendEntries(m)
except:
pass
def on_RequestVote(self, message):
pass
def on_AppendEntries(self, messages):
pass
class ZmqMessageStrategy(MessageStrategy):
def __init__(self, config, vote_callback, entries_callback):
self._cfg = config
self._vote_callback = vote_callback
self._entries_callback = entries_callback
def connect_nodes(self):
print("Creating publish socket.")
self.context = zmq.Context()
self.socketPub = self.context.socket(zmq.REQ)
self.socketPub.bind("tcp://%s:%d" % (self._cfg["messaging"]["me"]["ip"], self._cfg["messaging"]["me"]["port"]))
def send_RequestVote(self, request):
self.socketPub.send(request.serialize)
def send_AppendEntries(self, request):
self.socketPub.send(request.serialize)
def on_VoteReceived(self, message):
self._vote_callback(message)
def on_EntriesResponse(self, message):
self._entries_callback(message)