From e13551f79891cd73c782524bd87597b958faa6f5 Mon Sep 17 00:00:00 2001 From: Piv <18462828+Piv200@users.noreply.github.com> Date: Fri, 21 Feb 2020 21:08:50 +1030 Subject: [PATCH] Add RAFT stuff --- MyRaft/Experiment/node1/config.json | 28 +++ MyRaft/Experiment/node2/config.json | 28 +++ MyRaft/Experiment/node3/config.json | 28 +++ MyRaft/candidate.py | 49 +++++ MyRaft/config.ini | 5 + MyRaft/config.json | 27 +++ MyRaft/follower.py | 39 ++++ MyRaft/leader.py | 24 +++ MyRaft/messages.py | 129 ++++++++++++ MyRaft/messagestrategy.py | 215 ++++++++++++++++++++ MyRaft/node.py | 127 ++++++++++++ MyRaft/protos/raft.proto | 35 ++++ MyRaft/raft_pb2.py | 296 ++++++++++++++++++++++++++++ MyRaft/raft_pb2_grpc.py | 63 ++++++ MyRaft/state.py | 33 ++++ MyRaft/test.py | 22 +++ MyRaft/voter.py | 8 + 17 files changed, 1156 insertions(+) create mode 100644 MyRaft/Experiment/node1/config.json create mode 100644 MyRaft/Experiment/node2/config.json create mode 100644 MyRaft/Experiment/node3/config.json create mode 100644 MyRaft/candidate.py create mode 100644 MyRaft/config.ini create mode 100644 MyRaft/config.json create mode 100644 MyRaft/follower.py create mode 100644 MyRaft/leader.py create mode 100644 MyRaft/messages.py create mode 100644 MyRaft/messagestrategy.py create mode 100644 MyRaft/node.py create mode 100644 MyRaft/protos/raft.proto create mode 100644 MyRaft/raft_pb2.py create mode 100644 MyRaft/raft_pb2_grpc.py create mode 100644 MyRaft/state.py create mode 100644 MyRaft/test.py create mode 100644 MyRaft/voter.py diff --git a/MyRaft/Experiment/node1/config.json b/MyRaft/Experiment/node1/config.json new file mode 100644 index 0000000..e399fe9 --- /dev/null +++ b/MyRaft/Experiment/node1/config.json @@ -0,0 +1,28 @@ +{ + "raft": + { + "min_election_timeout": 8, + "varying_election_timeout": 2, + "majority": 2 + }, + "messaging": + { + "me": + { + "ip": "127.0.0.1", + "port": 50051 + }, + "neighbours": + [ + { + "ip": "127.0.0.1", + "port": 50052 + }, + { + "ip": "127.0.0.1", + "port": 50053 + } + ] + } +} + diff --git a/MyRaft/Experiment/node2/config.json b/MyRaft/Experiment/node2/config.json new file mode 100644 index 0000000..143284b --- /dev/null +++ b/MyRaft/Experiment/node2/config.json @@ -0,0 +1,28 @@ +{ + "raft": + { + "min_election_timeout": 8, + "varying_election_timeout": 2, + "majority": 2 + }, + "messaging": + { + "me": + { + "ip": "127.0.0.1", + "port": 50052 + }, + "neighbours": + [ + { + "ip": "127.0.0.1", + "port": 50051 + }, + { + "ip": "127.0.0.1", + "port": 50053 + } + ] + } +} + diff --git a/MyRaft/Experiment/node3/config.json b/MyRaft/Experiment/node3/config.json new file mode 100644 index 0000000..70781fb --- /dev/null +++ b/MyRaft/Experiment/node3/config.json @@ -0,0 +1,28 @@ +{ + "raft": + { + "min_election_timeout": 8, + "varying_election_timeout": 2, + "majority": 3 + }, + "messaging": + { + "me": + { + "ip": "127.0.0.1", + "port": 50053 + }, + "neighbours": + [ + { + "ip": "127.0.0.1", + "port": 50052 + }, + { + "ip": "127.0.0.1", + "port": 50051 + } + ] + } +} + diff --git a/MyRaft/candidate.py b/MyRaft/candidate.py new file mode 100644 index 0000000..a025957 --- /dev/null +++ b/MyRaft/candidate.py @@ -0,0 +1,49 @@ +import MyRaft.state as state +import MyRaft.leader as leader +# import MyRaft.follower as follower +import MyRaft.node as node +import MyRaft.raft_pb2 as raft_pb2 + +class Candidate(state.State): + def __init__(self, context:node.RaftNode, majority = 2): + state.State.__init__(self, context) + print("We're a candidate!") + context.currentTerm += 1 + self._votes_received = [] # List of voters who have voted. + self._votes_received.append(self._context._id) + self._majority = majority + self._context.set_timeout(self._context._min_timout, self._context._vary_timeout) + print("Sending RequestVote to other nodes") + self._context.send_RequestVote() + + def rcv_vote(self, request): + print("Received Vote") + # Checks the term... + if not request.voteGranted: + print("They rejected us!") + if request.voterId not in self._votes_received: + print("Added a vote!") + self._votes_received.append(request.voterId) + if len(self._votes_received) >= self._majority: + self._context.set_state(leader.Leader(self._context)) + + def heartbeat_elapsed(self): + # Start a new election. + self._context.currentTerm += 1 + self._context.set_timeout(self._context._min_timout, self._context._vary_timeout) + print("Sending RequestVote to other nodes") + self._context.send_RequestVote() + + def rcv_AppendEntries(self, request): + if request.term >= self._context.currentTerm: + self._context.set_state(follower.Follower(self._context)) + + def rcv_RequestVote(self, request): + print("Received a vote request") + if request.term > self._context.currentTerm: + print("They're more important, going back to a follower") + self._context.set_state(follower.Follower(self._context)) + self._context.votedFor = request.candidateId + return raft_pb2.RequestVoteResponse(term = self._context.currentTerm, + voteGranted = True, + voterId = self._context._id) \ No newline at end of file diff --git a/MyRaft/config.ini b/MyRaft/config.ini new file mode 100644 index 0000000..da25e2c --- /dev/null +++ b/MyRaft/config.ini @@ -0,0 +1,5 @@ +[RAFT] +min_election_timeout = 100 +varying_election_timeout = 200 +heartbeat_timeout = 50 +majority = 3 \ No newline at end of file diff --git a/MyRaft/config.json b/MyRaft/config.json new file mode 100644 index 0000000..8fbe6eb --- /dev/null +++ b/MyRaft/config.json @@ -0,0 +1,27 @@ +{ + "raft": + { + "min_election_timeout": 8, + "varying_election_timeout": 2, + "majority": 2 + }, + "messaging": + { + "me": + { + "ip": "127.0.0.1", + "port": 50051 + }, + "neighbours": + [ + { + "ip": "127.0.0.1", + "port": 50052 + }, + { + "ip": "127.0.0.1", + "port": 50053 + } + ] + } +} \ No newline at end of file diff --git a/MyRaft/follower.py b/MyRaft/follower.py new file mode 100644 index 0000000..1742928 --- /dev/null +++ b/MyRaft/follower.py @@ -0,0 +1,39 @@ +import MyRaft.state as state +import MyRaft.candidate as candidate +import MyRaft.raft_pb2 as raft_pb2 + +class Follower(state.State): + def __init__(self, context): + state.State.__init__(self, context) + self._context.set_timeout(self._context._min_timout, self._context._vary_timeout) + + def heartbeat_elapsed(self): + print("Becoming a candidate") + self._context.set_state(candidate.Candidate(self._context)) + + def rcv_AppendEntries(self, request): + """Called when an append entries message is received""" + + self._context.set_timeout(self._context._min_timout, self._context._vary_timeout) + + def rcv_RequestVote(self, request): + print("Received a vote request") + # Ignoring log for now. + if request.term < self._context.currentTerm: + print("They're term is worse than ours.") + # If our current term is already the same, then we must have voted already. + return raft_pb2.RequestVoteResponse(term = self._context.currentTerm, voteGranted = False) + elif request.term == self._context.currentTerm and self._context.votedFor is not None: + return raft_pb2.RequestVoteResponse(term = self._context.currentTerm, voteGranted = False) + else: + print("We'll be voting for them!") + # We vote yes, so reset our timeout. + self._context.set_timeout(self._context._min_timout, self._context._vary_timeout) + self._context.currentTerm = request.term + print("setting candidate id") + self._context.votedFor = request.candidateId + print("Returning result.") + return raft_pb2.RequestVoteResponse(term = self._context.currentTerm, + voteGranted = True, + voterId = self._context._id) + diff --git a/MyRaft/leader.py b/MyRaft/leader.py new file mode 100644 index 0000000..defac60 --- /dev/null +++ b/MyRaft/leader.py @@ -0,0 +1,24 @@ +import MyRaft.state as state +import MyRaft.node as node + +class Leader(state.State): + """The leader class represents the leader state in the raft algorithm""" + def __init__(self, context: node.RaftNode): + state.State.__init__(self, context) + print("We're a leader!") + + # For indexes for each server to send. + self.nextIndex = [] + self.matchIndex = [] + + # Change our timeout. + self._context.set_timeout(self._context._heartbeat_timeout, 0) + # Send empty AppendEntries. + self._context.send_empty_AppendEntries() + + def heartbeat_elapsed(self): + print("Sending an append entries message") + self._context.send_empty_AppendEntries() + + # Don't forget to reset timer, otherwise they'll try run for leader. + self._context.set_timeout(self._context._heartbeat_timeout, 0) \ No newline at end of file diff --git a/MyRaft/messages.py b/MyRaft/messages.py new file mode 100644 index 0000000..836413e --- /dev/null +++ b/MyRaft/messages.py @@ -0,0 +1,129 @@ +""" This module holds the messages for raft to use. + +Message -- Base message class + +AppendEntries -- Message representing raft append entries. + +RequestVote -- Message representing raft request vote. + +RequestVoteReponse -- Message for responding to a request vote. + +Response -- Response to an append entries message. +""" +import umsgpack +from enum import Enum + +class Messages(Enum): + AppendEntries = 1 + RequestVote = 2 + RequestVoteResponse = 3 + AppendEntriesResponse = 4 + +class Message: + """The base class of all messages used in raft""" + _type = None + + def __init__(self, sender, data = {}, term = 0): + self._sender = sender + self._data = data + self._term = term + + @property + def sender(self): + return self._sender + + @property + def type(self): + return self._type + + def serialise(self): + """Serialises a Message object into a message pack byte array""" + raise NotImplementedError + + @staticmethod + def deserialise(message): + """Deserialises from a byte array into a Message object + + message -- Message to deserialise. + + Returns -- Deserialised message object, None if incorrect input message. + """ + m = None + try: + m = umsgpack.unpackb(m) + except: + print("Could not decode message") + return m + + m = Message('tbd') + raise NotImplementedError + + def __eq__(self, other): + if not isinstance(other, Message): + return False + + if other.type != self.type: + return False + + if other._data != self._data: + return False + + if other._sender != self._sender: + return False + + return True + +class AppendEntries(Message): + _type = "AppendEntries" + + def __init__(self, term, leaderId, prevLogIndex, prevLogTerm, leaderCommit, entries = None): + self._data["term"] = term + self._data["leaderId"] = leaderId + self._data["prevLogIndex"] = prevLogIndex + self._data["prevLogTerm"] = prevLogTerm + self._data["entries"] = entries + self._data["leaderCommit"] = leaderCommit # Leader's commit index. + + +class RequestVote(Message): + _type = "RequestVote" + + def __init__(self, term, candidate_id, last_log_index, last_log_term): + self._data["candidateId"] = candidate_id + self._data["lastLogIndex"] = last_log_index + self._data["lastLogTerm"] = last_log_term + + @property + def candidate_id(self): + return self._data["candidateId"] + + @property + def last_log_index(self): + return self._data["lastLogIndex"] + + @property + def last_log_term(self): + return self._data["lastLogTerm"] + + +class RequestVoteResponse(Message): + _type = "RequestVoteResponse" + + def __init__(self, term, vote_granted): + self._data["voteGranted"] = vote_granted + + @property + def vote_granted(self): + return self._data["voteGranted"] + +class Response(Message): + _type = "Response" + + def __init__(self, term, success): + self._data["success"] = success + + @property + def success(self): + return self._data["success"] + + \ No newline at end of file diff --git a/MyRaft/messagestrategy.py b/MyRaft/messagestrategy.py new file mode 100644 index 0000000..26f8ed2 --- /dev/null +++ b/MyRaft/messagestrategy.py @@ -0,0 +1,215 @@ +import json +from concurrent import futures +import time +import multiprocessing as mp +import threading + +import grpc +import zmq + +import MyRaft.raft_pb2_grpc as raft_pb2_grpc +import MyRaft.raft_pb2 as raft_pb2 + +class MessageStrategy: + def __init__(self): + pass + + def send_RequestVote(self, request): + raise NotImplementedError + + def send_AppendEntries(self, request): + raise NotImplementedError + + def on_VoteReceived(self, future): + raise NotImplementedError + + def on_EntriesResponse(self, future): + raise NotImplementedError + + def connect_channels(self): + raise NotImplementedError + + +class NodeGrpcServer(raft_pb2_grpc.RaftServicer): + """Contains the gRPC server for the raft node.""" + + def __init__(self, raftNode, port: int): + self.server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + self.server.add_insecure_port('[::]:%d' % port) + self._raft = raftNode + + def AppendEntriesRPC(self, request, context): + """AppendEntries remote procedural call for raft. + + Args: + request: The AppendEntries message sent by the leader + context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html + + Returns: + An AppendEntriesResponse message. + """ + print("Received append entries rpc") + # Leaving this here for now, just in case we need it later (gets the client ip address) + # str(context._rpc_event.call_details.host) + return self._raft.rcv_AppendEntries(request) + + def RequestVoteRPC(self, request, context): + """RequestVote remote procedural call for raft. + + Args: + request: The RequestVote message sent by the leader + context: RPC-related information and actions -> more here: https://grpc.io/grpc/python/grpc.html + + Returns: + A RequestVoteResponse message + """ + print("Received request vote rpc") + print(request) + result = self._raft.vote_requested(request) + print("Now returning our vote.") + print(result) + print(type(result)) + return result + + def start_server(self): + print("Starting servicer") + raft_pb2_grpc.add_RaftServicer_to_server(self, self.server) + self.server.start() + while True: + time.sleep(60*60) + + +class GrpcMessageStrategy(MessageStrategy): + """This class uses gRPC to communicate between raft nodes.""" + # Only create the channels if we become a candidate or leader. + # Also need to close the channels when we become the follower. + + def __init__(self, server: NodeGrpcServer, config): + # Also need to consider TLS/secure connection + self._cfg = config + self._neighbours = self._cfg['messaging']['neighbours'] + self._server = server + self.message_callbacks = [] + self._futures = [] + self.channels = None + + def connect_channels(self): + print("Creating channels") + self.channels = [] + for n in self._neighbours: + channel = grpc.insecure_channel('%s:%d' % (n['ip'], n['port'])) + self.channels.append(channel) + + def send_RequestVote(self, vote): + print("Sending Request Vote") + if self.channels is None: + self.connect_channels() + for channel in self.channels: + print("In channel") + try: + stub = raft_pb2_grpc.RaftStub(channel) + print("connected") + # vote = stub.RequestVoteRPC(vote) + future = stub.RequestVoteRPC.future(vote) + future.add_done_callback(self.on_VoteReceived) + # print("sending vote received back to node.") + # self._server._raft.vote_received(vote) + except Exception as e: + # print("Couldn't message.") + # print(e) + pass + + def on_VoteReceived(self, future): + print("A vote was returned") + print("sending vote received back to node.") + self._server._raft.vote_received(future.result()) + + def send_AppendEntries(self, entries): + for channel in self.channels: + stub = raft_pb2_grpc.RaftStub(channel) + future = stub.AppendEntriesRPC.future(entries) + future.add_done_callback(self.on_EntriesResponse) + + def on_EntriesResponse(self, future): + # Pass to leader? Doesn't matter for now since we aren't using the + # log yet. + print("Received append entries response.") + + +class ZmqServer: + # Zmq subscribers can subscribe to multiple publishers. However, + # subscribers are not thread safe - Radio-dish pattern aims to solve that. + def __init__(self, config): + self._cfg = config + self.context = zmq.Context() + self.socketSub = self.context.socket(zmq.SUB) + self.started = True + + def connect_channels(self): + # Also need to subscribe to other nodes... + for n in self._cfg["messaging"]["neighbours"]: + self.socketSub.connect("tcp://%s:%d" % (n["ip"], n["port"])) + + print("Neighbours are connected.") + + def start(self): + # Start receiving on a new thread. + t = threading.Thread(target=self.start_receiving) + t.start() + + def start_receiving(self): + while self.started: + self.on_message(self.socketSub.recv()) + + def stop(self): + self.started = False + + def on_message(self, message): + m = message.deserialise() + try: + a = m.leaderId + # We have append entries + a = self.context.rcv_AppendEntries(m) + # Need to send back a message with our response. May be easier + # to do this with a request reply mechanism, rather than publish + # subscribe. + except: + pass + + try: + a = m.leaderId + # We have request vote. + self.context.rcv_AppendEntries(m) + except: + pass + + def on_RequestVote(self, message): + pass + + def on_AppendEntries(self, messages): + pass + +class ZmqMessageStrategy(MessageStrategy): + + def __init__(self, config, vote_callback, entries_callback): + self._cfg = config + self._vote_callback = vote_callback + self._entries_callback = entries_callback + + def connect_nodes(self): + print("Creating publish socket.") + self.context = zmq.Context() + self.socketPub = self.context.socket(zmq.REQ) + self.socketPub.bind("tcp://%s:%d" % (self._cfg["messaging"]["me"]["ip"], self._cfg["messaging"]["me"]["port"])) + + def send_RequestVote(self, request): + self.socketPub.send(request.serialize) + + def send_AppendEntries(self, request): + self.socketPub.send(request.serialize) + + def on_VoteReceived(self, message): + self._vote_callback(message) + + def on_EntriesResponse(self, message): + self._entries_callback(message) \ No newline at end of file diff --git a/MyRaft/node.py b/MyRaft/node.py new file mode 100644 index 0000000..2253b9a --- /dev/null +++ b/MyRaft/node.py @@ -0,0 +1,127 @@ +from threading import Timer, Thread +import random +import uuid +import json +import time + +from MyRaft.messagestrategy import MessageStrategy, GrpcMessageStrategy, NodeGrpcServer +import MyRaft.raft_pb2 as raft_pb2 + +class RaftNode: + def __init__(self, message_strategy: MessageStrategy, config): + """ + message_strategy -- Strategy used to send messagesfor the node. + """ + import MyRaft.follower as follower + # Do we need to know who the current leader is? For the purposes of + # the cameras knowing, (as the leader of raft is the leader of out + # swarm) we should know this on each node. VotedFor may work, as it is + # who we last voted for, and therefore who we think is leader. We also need + # this to redirect client requests to the leader. + + self._current_state = None + self._timer = None + self._message_strategy = None + + # Persistent State + self.currentTerm = 0 + self.votedFor = None + self.log = [] + + # Volatile state + self.commitIndex = 0 + self.lastApplied = 0 + + # We only need this for candidates/leaders... + self._id = str(uuid.uuid1()) + if message_strategy is None or not isinstance(message_strategy, MessageStrategy): + raise ValueError(MessageStrategy) + + self._message_strategy = message_strategy + + self._cfg = config + self._min_timout = self._cfg["raft"]["min_election_timeout"] + self._vary_timeout = self._cfg["raft"]["varying_election_timeout"] + self._heartbeat_timeout = self._min_timout // 2 + # Also need to check if we can load log from stable storage in case of + # restart. + + # All nodes start as a follower. State starts the timeout always. + self._current_state = follower.Follower(self) + self._state_changed = [] + + def add_state_change(self, on_change): + """Adds a callback for when the current state of the node changes. + + Args + on_change: function to call when the state changes. + """ + self._state_changed.append(on_change) + + def set_state(self, state): + """Sets the current state of the raft node. + + state -- New state of the node. + """ + # State Pattern: https://en.wikipedia.org/wiki/State_pattern + del(self._current_state) + self._current_state = state + for cb in self._state_changed: + cb() + + def timeout_elapsed(self): + """Election or heartbeat timeout has elapsed.""" + print("Node timeout elapsed") + self._current_state.heartbeat_elapsed() + + def set_timeout(self, min_timeout, vary_timeout): + """Stops the old timer and restarts it to the specified time. + + min_timeout -- The minimum time that can be used for the timer. + vary_timout -- Default 200, the additional random varying time (0 - vary_timeout) to add to timer. + """ + if self._timer is not None: + self._timer.cancel() + randy = random.randint(0,vary_timeout) + self._timer = Timer(min_timeout + randy, self.timeout_elapsed) + self._timer.start() + + def send_RequestVote(self): + self._message_strategy.send_RequestVote(raft_pb2.RequestVote(term = self.currentTerm, + candidateId = self._id)) + + def vote_requested(self, request): + return self._current_state.rcv_RequestVote(request) + + def vote_received(self, voter): + print("Node received vote") + self._current_state.rcv_vote(voter) + + def send_AppendEntries(self, entry): + pass + + def send_empty_AppendEntries(self): + self._message_strategy.send_AppendEntries(raft_pb2.AppendEntries(term = self.currentTerm, + leaderId = self._id)) + + def entries_response_received(self, entryResponse): + self._current_state.rcv_AppendEntriesResponse(entryResponse) + + def rcv_AppendEntries(self, entries, host): + # Always let leader know if fallen behind. + if entries.term < self.currentTerm: + return raft_pb2.AppendEntriesResponse(term = self.currentTerm, success = False) + return self._current_state.rcv_AppendEntries(entries) + +class RaftGrpcNode(RaftNode): + + def __init__(self, config): + cfg = None + with open(config) as f: + cfg = json.load(f) + port = cfg["messaging"]["me"]["port"] + self.servicer = NodeGrpcServer(self, port) + RaftNode.__init__(self, GrpcMessageStrategy(self.servicer, cfg), cfg) + servicer_thread = Thread(target=self.servicer.start_server) + servicer_thread.start() + print("Servicer started") diff --git a/MyRaft/protos/raft.proto b/MyRaft/protos/raft.proto new file mode 100644 index 0000000..f8321b7 --- /dev/null +++ b/MyRaft/protos/raft.proto @@ -0,0 +1,35 @@ +syntax = "proto3"; + +package raft; + +service Raft{ + rpc AppendEntriesRPC(AppendEntries) returns (AppendEntriesResponse) {} + rpc RequestVoteRPC(RequestVote) returns (RequestVoteResponse) {} +} + +message AppendEntries{ + uint32 term = 1; + string leaderId = 2; + uint32 prevLogIndex = 3; + uint32 prevLogTerm = 4; + uint32 leaderCommit = 5; + repeated string entry = 6; +} + +message AppendEntriesResponse{ + uint32 term = 1; + bool success = 2; +} + +message RequestVote{ + uint32 term = 1; + string candidateId = 2; + uint32 lastLogIndex = 3; + uint32 lastLogTerm = 4; +} + +message RequestVoteResponse{ + uint32 term = 1; + bool voteGranted = 2; + string voterId = 3; +} \ No newline at end of file diff --git a/MyRaft/raft_pb2.py b/MyRaft/raft_pb2.py new file mode 100644 index 0000000..ad2f5a0 --- /dev/null +++ b/MyRaft/raft_pb2.py @@ -0,0 +1,296 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: raft.proto + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='raft.proto', + package='raft', + syntax='proto3', + serialized_options=None, + serialized_pb=_b('\n\nraft.proto\x12\x04raft\"\x7f\n\rAppendEntries\x12\x0c\n\x04term\x18\x01 \x01(\r\x12\x10\n\x08leaderId\x18\x02 \x01(\t\x12\x14\n\x0cprevLogIndex\x18\x03 \x01(\r\x12\x13\n\x0bprevLogTerm\x18\x04 \x01(\r\x12\x14\n\x0cleaderCommit\x18\x05 \x01(\r\x12\r\n\x05\x65ntry\x18\x06 \x03(\t\"6\n\x15\x41ppendEntriesResponse\x12\x0c\n\x04term\x18\x01 \x01(\r\x12\x0f\n\x07success\x18\x02 \x01(\x08\"[\n\x0bRequestVote\x12\x0c\n\x04term\x18\x01 \x01(\r\x12\x13\n\x0b\x63\x61ndidateId\x18\x02 \x01(\t\x12\x14\n\x0clastLogIndex\x18\x03 \x01(\r\x12\x13\n\x0blastLogTerm\x18\x04 \x01(\r\"I\n\x13RequestVoteResponse\x12\x0c\n\x04term\x18\x01 \x01(\r\x12\x13\n\x0bvoteGranted\x18\x02 \x01(\x08\x12\x0f\n\x07voterId\x18\x03 \x01(\t2\x90\x01\n\x04Raft\x12\x46\n\x10\x41ppendEntriesRPC\x12\x13.raft.AppendEntries\x1a\x1b.raft.AppendEntriesResponse\"\x00\x12@\n\x0eRequestVoteRPC\x12\x11.raft.RequestVote\x1a\x19.raft.RequestVoteResponse\"\x00\x62\x06proto3') +) + + + + +_APPENDENTRIES = _descriptor.Descriptor( + name='AppendEntries', + full_name='raft.AppendEntries', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='term', full_name='raft.AppendEntries.term', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='leaderId', full_name='raft.AppendEntries.leaderId', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='prevLogIndex', full_name='raft.AppendEntries.prevLogIndex', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='prevLogTerm', full_name='raft.AppendEntries.prevLogTerm', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='leaderCommit', full_name='raft.AppendEntries.leaderCommit', index=4, + number=5, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='entry', full_name='raft.AppendEntries.entry', index=5, + number=6, type=9, cpp_type=9, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=20, + serialized_end=147, +) + + +_APPENDENTRIESRESPONSE = _descriptor.Descriptor( + name='AppendEntriesResponse', + full_name='raft.AppendEntriesResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='term', full_name='raft.AppendEntriesResponse.term', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='success', full_name='raft.AppendEntriesResponse.success', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=149, + serialized_end=203, +) + + +_REQUESTVOTE = _descriptor.Descriptor( + name='RequestVote', + full_name='raft.RequestVote', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='term', full_name='raft.RequestVote.term', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='candidateId', full_name='raft.RequestVote.candidateId', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='lastLogIndex', full_name='raft.RequestVote.lastLogIndex', index=2, + number=3, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='lastLogTerm', full_name='raft.RequestVote.lastLogTerm', index=3, + number=4, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=205, + serialized_end=296, +) + + +_REQUESTVOTERESPONSE = _descriptor.Descriptor( + name='RequestVoteResponse', + full_name='raft.RequestVoteResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='term', full_name='raft.RequestVoteResponse.term', index=0, + number=1, type=13, cpp_type=3, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='voteGranted', full_name='raft.RequestVoteResponse.voteGranted', index=1, + number=2, type=8, cpp_type=7, label=1, + has_default_value=False, default_value=False, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='voterId', full_name='raft.RequestVoteResponse.voterId', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=298, + serialized_end=371, +) + +DESCRIPTOR.message_types_by_name['AppendEntries'] = _APPENDENTRIES +DESCRIPTOR.message_types_by_name['AppendEntriesResponse'] = _APPENDENTRIESRESPONSE +DESCRIPTOR.message_types_by_name['RequestVote'] = _REQUESTVOTE +DESCRIPTOR.message_types_by_name['RequestVoteResponse'] = _REQUESTVOTERESPONSE +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +AppendEntries = _reflection.GeneratedProtocolMessageType('AppendEntries', (_message.Message,), dict( + DESCRIPTOR = _APPENDENTRIES, + __module__ = 'raft_pb2' + # @@protoc_insertion_point(class_scope:raft.AppendEntries) + )) +_sym_db.RegisterMessage(AppendEntries) + +AppendEntriesResponse = _reflection.GeneratedProtocolMessageType('AppendEntriesResponse', (_message.Message,), dict( + DESCRIPTOR = _APPENDENTRIESRESPONSE, + __module__ = 'raft_pb2' + # @@protoc_insertion_point(class_scope:raft.AppendEntriesResponse) + )) +_sym_db.RegisterMessage(AppendEntriesResponse) + +RequestVote = _reflection.GeneratedProtocolMessageType('RequestVote', (_message.Message,), dict( + DESCRIPTOR = _REQUESTVOTE, + __module__ = 'raft_pb2' + # @@protoc_insertion_point(class_scope:raft.RequestVote) + )) +_sym_db.RegisterMessage(RequestVote) + +RequestVoteResponse = _reflection.GeneratedProtocolMessageType('RequestVoteResponse', (_message.Message,), dict( + DESCRIPTOR = _REQUESTVOTERESPONSE, + __module__ = 'raft_pb2' + # @@protoc_insertion_point(class_scope:raft.RequestVoteResponse) + )) +_sym_db.RegisterMessage(RequestVoteResponse) + + + +_RAFT = _descriptor.ServiceDescriptor( + name='Raft', + full_name='raft.Raft', + file=DESCRIPTOR, + index=0, + serialized_options=None, + serialized_start=374, + serialized_end=518, + methods=[ + _descriptor.MethodDescriptor( + name='AppendEntriesRPC', + full_name='raft.Raft.AppendEntriesRPC', + index=0, + containing_service=None, + input_type=_APPENDENTRIES, + output_type=_APPENDENTRIESRESPONSE, + serialized_options=None, + ), + _descriptor.MethodDescriptor( + name='RequestVoteRPC', + full_name='raft.Raft.RequestVoteRPC', + index=1, + containing_service=None, + input_type=_REQUESTVOTE, + output_type=_REQUESTVOTERESPONSE, + serialized_options=None, + ), +]) +_sym_db.RegisterServiceDescriptor(_RAFT) + +DESCRIPTOR.services_by_name['Raft'] = _RAFT + +# @@protoc_insertion_point(module_scope) diff --git a/MyRaft/raft_pb2_grpc.py b/MyRaft/raft_pb2_grpc.py new file mode 100644 index 0000000..6dd6884 --- /dev/null +++ b/MyRaft/raft_pb2_grpc.py @@ -0,0 +1,63 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +import grpc + +import MyRaft.raft_pb2 as raft__pb2 + + +class RaftStub(object): + # missing associated documentation comment in .proto file + pass + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.AppendEntriesRPC = channel.unary_unary( + '/raft.Raft/AppendEntriesRPC', + request_serializer=raft__pb2.AppendEntries.SerializeToString, + response_deserializer=raft__pb2.AppendEntriesResponse.FromString, + ) + self.RequestVoteRPC = channel.unary_unary( + '/raft.Raft/RequestVoteRPC', + request_serializer=raft__pb2.RequestVote.SerializeToString, + response_deserializer=raft__pb2.RequestVoteResponse.FromString, + ) + + +class RaftServicer(object): + # missing associated documentation comment in .proto file + pass + + def AppendEntriesRPC(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def RequestVoteRPC(self, request, context): + # missing associated documentation comment in .proto file + pass + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_RaftServicer_to_server(servicer, server): + rpc_method_handlers = { + 'AppendEntriesRPC': grpc.unary_unary_rpc_method_handler( + servicer.AppendEntriesRPC, + request_deserializer=raft__pb2.AppendEntries.FromString, + response_serializer=raft__pb2.AppendEntriesResponse.SerializeToString, + ), + 'RequestVoteRPC': grpc.unary_unary_rpc_method_handler( + servicer.RequestVoteRPC, + request_deserializer=raft__pb2.RequestVote.FromString, + response_serializer=raft__pb2.RequestVoteResponse.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'raft.Raft', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) diff --git a/MyRaft/state.py b/MyRaft/state.py new file mode 100644 index 0000000..25287f9 --- /dev/null +++ b/MyRaft/state.py @@ -0,0 +1,33 @@ +"""This module contains the base state for all other raft states""" + +import MyRaft.node as node +import MyRaft.raft_pb2 as raft_pb2 + +class State: + """Base class to represent state of the system at any point in time. + + Default behaviour for all messaging methods is to check if term of + message is greater than node's term, and if so convert the current + node to a follower. + """ + + def __init__(self, context: node.RaftNode): + self._context = context + self._currentTerm = 0 + + def heartbeat_elapsed(self): + raise NotImplementedError + + def rcv_RequestVote(self, request): + raise NotImplementedError + + def rcv_AppendEntries(self, request): + raise NotImplementedError + + def rcv_vote(self, request): + raise NotImplementedError + + def rcv_AppendEntriesResponse(self, request): + pass + + \ No newline at end of file diff --git a/MyRaft/test.py b/MyRaft/test.py new file mode 100644 index 0000000..8ee46e8 --- /dev/null +++ b/MyRaft/test.py @@ -0,0 +1,22 @@ +import argparse +import os.path +import sys +from MyRaft.node import RaftGrpcNode + +# parser = argparse.ArgumentParser(description="Runs a raft node for leader election") +# parser.add_argument('-C', '--config', help='Path to config file.') + +# args = parser.parse_args() + +# if args.config: +# print("Getting config") +# if not os.path.isfile(args.config): +# print("Could not find configuration file, aborting") +# sys.exit(1) +# else: +# sys.exit(1) + +# print("Loading gRPC raft node") + +node = RaftGrpcNode('config.json') + diff --git a/MyRaft/voter.py b/MyRaft/voter.py new file mode 100644 index 0000000..70d64b2 --- /dev/null +++ b/MyRaft/voter.py @@ -0,0 +1,8 @@ +from MyRaft.state import State +from MyRaft.node import RaftNode + +class Voter(State): + def __init__(self, context: RaftNode): + State.__init__(self, context) + + \ No newline at end of file