from threading import Timer, Thread import random import uuid import json import time from MyRaft.messagestrategy import MessageStrategy, GrpcMessageStrategy, NodeGrpcServer import MyRaft.raft_pb2 as raft_pb2 class RaftNode: def __init__(self, message_strategy: MessageStrategy, config): """ message_strategy -- Strategy used to send messagesfor the node. """ import MyRaft.follower as follower # Do we need to know who the current leader is? For the purposes of # the cameras knowing, (as the leader of raft is the leader of out # swarm) we should know this on each node. VotedFor may work, as it is # who we last voted for, and therefore who we think is leader. We also need # this to redirect client requests to the leader. self._current_state = None self._timer = None self._message_strategy = None # Persistent State self.currentTerm = 0 self.votedFor = None self.log = [] # Volatile state self.commitIndex = 0 self.lastApplied = 0 # We only need this for candidates/leaders... self._id = str(uuid.uuid1()) if message_strategy is None or not isinstance(message_strategy, MessageStrategy): raise ValueError(MessageStrategy) self._message_strategy = message_strategy self._cfg = config self._min_timout = self._cfg["raft"]["min_election_timeout"] self._vary_timeout = self._cfg["raft"]["varying_election_timeout"] self._heartbeat_timeout = self._min_timout // 2 # Also need to check if we can load log from stable storage in case of # restart. # All nodes start as a follower. State starts the timeout always. self._current_state = follower.Follower(self) self._state_changed = [] def add_state_change(self, on_change): """Adds a callback for when the current state of the node changes. Args on_change: function to call when the state changes. """ self._state_changed.append(on_change) def set_state(self, state): """Sets the current state of the raft node. state -- New state of the node. """ # State Pattern: https://en.wikipedia.org/wiki/State_pattern del(self._current_state) self._current_state = state for cb in self._state_changed: cb() def timeout_elapsed(self): """Election or heartbeat timeout has elapsed.""" print("Node timeout elapsed") self._current_state.heartbeat_elapsed() def set_timeout(self, min_timeout, vary_timeout): """Stops the old timer and restarts it to the specified time. min_timeout -- The minimum time that can be used for the timer. vary_timout -- Default 200, the additional random varying time (0 - vary_timeout) to add to timer. """ if self._timer is not None: self._timer.cancel() randy = random.randint(0,vary_timeout) self._timer = Timer(min_timeout + randy, self.timeout_elapsed) self._timer.start() def send_RequestVote(self): self._message_strategy.send_RequestVote(raft_pb2.RequestVote(term = self.currentTerm, candidateId = self._id)) def vote_requested(self, request): return self._current_state.rcv_RequestVote(request) def vote_received(self, voter): print("Node received vote") self._current_state.rcv_vote(voter) def send_AppendEntries(self, entry): pass def send_empty_AppendEntries(self): self._message_strategy.send_AppendEntries(raft_pb2.AppendEntries(term = self.currentTerm, leaderId = self._id)) def entries_response_received(self, entryResponse): self._current_state.rcv_AppendEntriesResponse(entryResponse) def rcv_AppendEntries(self, entries, host): # Always let leader know if fallen behind. if entries.term < self.currentTerm: return raft_pb2.AppendEntriesResponse(term = self.currentTerm, success = False) return self._current_state.rcv_AppendEntries(entries) class RaftGrpcNode(RaftNode): def __init__(self, config): cfg = None with open(config) as f: cfg = json.load(f) port = cfg["messaging"]["me"]["port"] self.servicer = NodeGrpcServer(self, port) RaftNode.__init__(self, GrpcMessageStrategy(self.servicer, cfg), cfg) servicer_thread = Thread(target=self.servicer.start_server) servicer_thread.start() print("Servicer started")