215 lines
6.9 KiB
Python
215 lines
6.9 KiB
Python
import json
|
|
from concurrent import futures
|
|
import time
|
|
import multiprocessing as mp
|
|
import threading
|
|
|
|
import grpc
|
|
import zmq
|
|
|
|
import MyRaft.raft_pb2_grpc as raft_pb2_grpc
|
|
import MyRaft.raft_pb2 as raft_pb2
|
|
|
|
class MessageStrategy:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def send_RequestVote(self, request):
|
|
raise NotImplementedError
|
|
|
|
def send_AppendEntries(self, request):
|
|
raise NotImplementedError
|
|
|
|
def on_VoteReceived(self, future):
|
|
raise NotImplementedError
|
|
|
|
def on_EntriesResponse(self, future):
|
|
raise NotImplementedError
|
|
|
|
def connect_channels(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class NodeGrpcServer(raft_pb2_grpc.RaftServicer):
|
|
"""Contains the gRPC server for the raft node."""
|
|
|
|
def __init__(self, raftNode, port: int):
|
|
self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
|
|
self.server.add_insecure_port('[::]:%d' % port)
|
|
self._raft = raftNode
|
|
|
|
def AppendEntriesRPC(self, request, context):
|
|
"""AppendEntries remote procedural call for raft.
|
|
|
|
Args:
|
|
request: The AppendEntries message sent by the leader
|
|
context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html
|
|
|
|
Returns:
|
|
An AppendEntriesResponse message.
|
|
"""
|
|
print("Received append entries rpc")
|
|
# Leaving this here for now, just in case we need it later (gets the client ip address)
|
|
# str(context._rpc_event.call_details.host)
|
|
return self._raft.rcv_AppendEntries(request)
|
|
|
|
def RequestVoteRPC(self, request, context):
|
|
"""RequestVote remote procedural call for raft.
|
|
|
|
Args:
|
|
request: The RequestVote message sent by the leader
|
|
context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html
|
|
|
|
Returns:
|
|
A RequestVoteResponse message
|
|
"""
|
|
print("Received request vote rpc")
|
|
print(request)
|
|
result = self._raft.vote_requested(request)
|
|
print("Now returning our vote.")
|
|
print(result)
|
|
print(type(result))
|
|
return result
|
|
|
|
def start_server(self):
|
|
print("Starting servicer")
|
|
raft_pb2_grpc.add_RaftServicer_to_server(self, self.server)
|
|
self.server.start()
|
|
while True:
|
|
time.sleep(60*60)
|
|
|
|
|
|
class GrpcMessageStrategy(MessageStrategy):
|
|
"""This class uses gRPC to communicate between raft nodes."""
|
|
# Only create the channels if we become a candidate or leader.
|
|
# Also need to close the channels when we become the follower.
|
|
|
|
def __init__(self, server: NodeGrpcServer, config):
|
|
# Also need to consider TLS/secure connection
|
|
self._cfg = config
|
|
self._neighbours = self._cfg['messaging']['neighbours']
|
|
self._server = server
|
|
self.message_callbacks = []
|
|
self._futures = []
|
|
self.channels = None
|
|
|
|
def connect_channels(self):
|
|
print("Creating channels")
|
|
self.channels = []
|
|
for n in self._neighbours:
|
|
channel = grpc.insecure_channel('%s:%d' % (n['ip'], n['port']))
|
|
self.channels.append(channel)
|
|
|
|
def send_RequestVote(self, vote):
|
|
print("Sending Request Vote")
|
|
if self.channels is None:
|
|
self.connect_channels()
|
|
for channel in self.channels:
|
|
print("In channel")
|
|
try:
|
|
stub = raft_pb2_grpc.RaftStub(channel)
|
|
print("connected")
|
|
# vote = stub.RequestVoteRPC(vote)
|
|
future = stub.RequestVoteRPC.future(vote)
|
|
future.add_done_callback(self.on_VoteReceived)
|
|
# print("sending vote received back to node.")
|
|
# self._server._raft.vote_received(vote)
|
|
except Exception as e:
|
|
# print("Couldn't message.")
|
|
# print(e)
|
|
pass
|
|
|
|
def on_VoteReceived(self, future):
|
|
print("A vote was returned")
|
|
print("sending vote received back to node.")
|
|
self._server._raft.vote_received(future.result())
|
|
|
|
def send_AppendEntries(self, entries):
|
|
for channel in self.channels:
|
|
stub = raft_pb2_grpc.RaftStub(channel)
|
|
future = stub.AppendEntriesRPC.future(entries)
|
|
future.add_done_callback(self.on_EntriesResponse)
|
|
|
|
def on_EntriesResponse(self, future):
|
|
# Pass to leader? Doesn't matter for now since we aren't using the
|
|
# log yet.
|
|
print("Received append entries response.")
|
|
|
|
|
|
class ZmqServer:
|
|
# Zmq subscribers can subscribe to multiple publishers. However,
|
|
# subscribers are not thread safe - Radio-dish pattern aims to solve that.
|
|
def __init__(self, config):
|
|
self._cfg = config
|
|
self.context = zmq.Context()
|
|
self.socketSub = self.context.socket(zmq.SUB)
|
|
self.started = True
|
|
|
|
def connect_channels(self):
|
|
# Also need to subscribe to other nodes...
|
|
for n in self._cfg["messaging"]["neighbours"]:
|
|
self.socketSub.connect("tcp://%s:%d" % (n["ip"], n["port"]))
|
|
|
|
print("Neighbours are connected.")
|
|
|
|
def start(self):
|
|
# Start receiving on a new thread.
|
|
t = threading.Thread(target=self.start_receiving)
|
|
t.start()
|
|
|
|
def start_receiving(self):
|
|
while self.started:
|
|
self.on_message(self.socketSub.recv())
|
|
|
|
def stop(self):
|
|
self.started = False
|
|
|
|
def on_message(self, message):
|
|
m = message.deserialise()
|
|
try:
|
|
a = m.leaderId
|
|
# We have append entries
|
|
a = self.context.rcv_AppendEntries(m)
|
|
# Need to send back a message with our response. May be easier
|
|
# to do this with a request reply mechanism, rather than publish
|
|
# subscribe.
|
|
except:
|
|
pass
|
|
|
|
try:
|
|
a = m.leaderId
|
|
# We have request vote.
|
|
self.context.rcv_AppendEntries(m)
|
|
except:
|
|
pass
|
|
|
|
def on_RequestVote(self, message):
|
|
pass
|
|
|
|
def on_AppendEntries(self, messages):
|
|
pass
|
|
|
|
class ZmqMessageStrategy(MessageStrategy):
|
|
|
|
def __init__(self, config, vote_callback, entries_callback):
|
|
self._cfg = config
|
|
self._vote_callback = vote_callback
|
|
self._entries_callback = entries_callback
|
|
|
|
def connect_nodes(self):
|
|
print("Creating publish socket.")
|
|
self.context = zmq.Context()
|
|
self.socketPub = self.context.socket(zmq.REQ)
|
|
self.socketPub.bind("tcp://%s:%d" % (self._cfg["messaging"]["me"]["ip"], self._cfg["messaging"]["me"]["port"]))
|
|
|
|
def send_RequestVote(self, request):
|
|
self.socketPub.send(request.serialize)
|
|
|
|
def send_AppendEntries(self, request):
|
|
self.socketPub.send(request.serialize)
|
|
|
|
def on_VoteReceived(self, message):
|
|
self._vote_callback(message)
|
|
|
|
def on_EntriesResponse(self, message):
|
|
self._entries_callback(message) |