import json from concurrent import futures import time import multiprocessing as mp import threading import grpc import zmq import MyRaft.raft_pb2_grpc as raft_pb2_grpc import MyRaft.raft_pb2 as raft_pb2 class MessageStrategy: def __init__(self): pass def send_RequestVote(self, request): raise NotImplementedError def send_AppendEntries(self, request): raise NotImplementedError def on_VoteReceived(self, future): raise NotImplementedError def on_EntriesResponse(self, future): raise NotImplementedError def connect_channels(self): raise NotImplementedError class NodeGrpcServer(raft_pb2_grpc.RaftServicer): """Contains the gRPC server for the raft node.""" def __init__(self, raftNode, port: int): self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) self.server.add_insecure_port('[::]:%d' % port) self._raft = raftNode def AppendEntriesRPC(self, request, context): """AppendEntries remote procedural call for raft. Args: request: The AppendEntries message sent by the leader context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html Returns: An AppendEntriesResponse message. """ print("Received append entries rpc") # Leaving this here for now, just in case we need it later (gets the client ip address) # str(context._rpc_event.call_details.host) return self._raft.rcv_AppendEntries(request) def RequestVoteRPC(self, request, context): """RequestVote remote procedural call for raft. Args: request: The RequestVote message sent by the leader context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html Returns: A RequestVoteResponse message """ print("Received request vote rpc") print(request) result = self._raft.vote_requested(request) print("Now returning our vote.") print(result) print(type(result)) return result def start_server(self): print("Starting servicer") raft_pb2_grpc.add_RaftServicer_to_server(self, self.server) self.server.start() while True: time.sleep(60*60) class GrpcMessageStrategy(MessageStrategy): """This class uses gRPC to communicate between raft nodes.""" # Only create the channels if we become a candidate or leader. # Also need to close the channels when we become the follower. def __init__(self, server: NodeGrpcServer, config): # Also need to consider TLS/secure connection self._cfg = config self._neighbours = self._cfg['messaging']['neighbours'] self._server = server self.message_callbacks = [] self._futures = [] self.channels = None def connect_channels(self): print("Creating channels") self.channels = [] for n in self._neighbours: channel = grpc.insecure_channel('%s:%d' % (n['ip'], n['port'])) self.channels.append(channel) def send_RequestVote(self, vote): print("Sending Request Vote") if self.channels is None: self.connect_channels() for channel in self.channels: print("In channel") try: stub = raft_pb2_grpc.RaftStub(channel) print("connected") # vote = stub.RequestVoteRPC(vote) future = stub.RequestVoteRPC.future(vote) future.add_done_callback(self.on_VoteReceived) # print("sending vote received back to node.") # self._server._raft.vote_received(vote) except Exception as e: # print("Couldn't message.") # print(e) pass def on_VoteReceived(self, future): print("A vote was returned") print("sending vote received back to node.") self._server._raft.vote_received(future.result()) def send_AppendEntries(self, entries): for channel in self.channels: stub = raft_pb2_grpc.RaftStub(channel) future = stub.AppendEntriesRPC.future(entries) future.add_done_callback(self.on_EntriesResponse) def on_EntriesResponse(self, future): # Pass to leader? Doesn't matter for now since we aren't using the # log yet. print("Received append entries response.") class ZmqServer: # Zmq subscribers can subscribe to multiple publishers. However, # subscribers are not thread safe - Radio-dish pattern aims to solve that. def __init__(self, config): self._cfg = config self.context = zmq.Context() self.socketSub = self.context.socket(zmq.SUB) self.started = True def connect_channels(self): # Also need to subscribe to other nodes... for n in self._cfg["messaging"]["neighbours"]: self.socketSub.connect("tcp://%s:%d" % (n["ip"], n["port"])) print("Neighbours are connected.") def start(self): # Start receiving on a new thread. t = threading.Thread(target=self.start_receiving) t.start() def start_receiving(self): while self.started: self.on_message(self.socketSub.recv()) def stop(self): self.started = False def on_message(self, message): m = message.deserialise() try: a = m.leaderId # We have append entries a = self.context.rcv_AppendEntries(m) # Need to send back a message with our response. May be easier # to do this with a request reply mechanism, rather than publish # subscribe. except: pass try: a = m.leaderId # We have request vote. self.context.rcv_AppendEntries(m) except: pass def on_RequestVote(self, message): pass def on_AppendEntries(self, messages): pass class ZmqMessageStrategy(MessageStrategy): def __init__(self, config, vote_callback, entries_callback): self._cfg = config self._vote_callback = vote_callback self._entries_callback = entries_callback def connect_nodes(self): print("Creating publish socket.") self.context = zmq.Context() self.socketPub = self.context.socket(zmq.REQ) self.socketPub.bind("tcp://%s:%d" % (self._cfg["messaging"]["me"]["ip"], self._cfg["messaging"]["me"]["port"])) def send_RequestVote(self, request): self.socketPub.send(request.serialize) def send_AppendEntries(self, request): self.socketPub.send(request.serialize) def on_VoteReceived(self, message): self._vote_callback(message) def on_EntriesResponse(self, message): self._entries_callback(message)