Files
picar/MyRaft/node.py
2020-02-21 21:08:50 +10:30

128 lines
4.7 KiB
Python

from threading import Timer, Thread
import random
import uuid
import json
import time
from MyRaft.messagestrategy import MessageStrategy, GrpcMessageStrategy, NodeGrpcServer
import MyRaft.raft_pb2 as raft_pb2
class RaftNode:
def __init__(self, message_strategy: MessageStrategy, config):
"""
message_strategy -- Strategy used to send messagesfor the node.
"""
import MyRaft.follower as follower
# Do we need to know who the current leader is? For the purposes of
# the cameras knowing, (as the leader of raft is the leader of out
# swarm) we should know this on each node. VotedFor may work, as it is
# who we last voted for, and therefore who we think is leader. We also need
# this to redirect client requests to the leader.
self._current_state = None
self._timer = None
self._message_strategy = None
# Persistent State
self.currentTerm = 0
self.votedFor = None
self.log = []
# Volatile state
self.commitIndex = 0
self.lastApplied = 0
# We only need this for candidates/leaders...
self._id = str(uuid.uuid1())
if message_strategy is None or not isinstance(message_strategy, MessageStrategy):
raise ValueError(MessageStrategy)
self._message_strategy = message_strategy
self._cfg = config
self._min_timout = self._cfg["raft"]["min_election_timeout"]
self._vary_timeout = self._cfg["raft"]["varying_election_timeout"]
self._heartbeat_timeout = self._min_timout // 2
# Also need to check if we can load log from stable storage in case of
# restart.
# All nodes start as a follower. State starts the timeout always.
self._current_state = follower.Follower(self)
self._state_changed = []
def add_state_change(self, on_change):
"""Adds a callback for when the current state of the node changes.
Args
on_change: function to call when the state changes.
"""
self._state_changed.append(on_change)
def set_state(self, state):
"""Sets the current state of the raft node.
state -- New state of the node.
"""
# State Pattern: https://en.wikipedia.org/wiki/State_pattern
del(self._current_state)
self._current_state = state
for cb in self._state_changed:
cb()
def timeout_elapsed(self):
"""Election or heartbeat timeout has elapsed."""
print("Node timeout elapsed")
self._current_state.heartbeat_elapsed()
def set_timeout(self, min_timeout, vary_timeout):
"""Stops the old timer and restarts it to the specified time.
min_timeout -- The minimum time that can be used for the timer.
vary_timout -- Default 200, the additional random varying time (0 - vary_timeout) to add to timer.
"""
if self._timer is not None:
self._timer.cancel()
randy = random.randint(0,vary_timeout)
self._timer = Timer(min_timeout + randy, self.timeout_elapsed)
self._timer.start()
def send_RequestVote(self):
self._message_strategy.send_RequestVote(raft_pb2.RequestVote(term = self.currentTerm,
candidateId = self._id))
def vote_requested(self, request):
return self._current_state.rcv_RequestVote(request)
def vote_received(self, voter):
print("Node received vote")
self._current_state.rcv_vote(voter)
def send_AppendEntries(self, entry):
pass
def send_empty_AppendEntries(self):
self._message_strategy.send_AppendEntries(raft_pb2.AppendEntries(term = self.currentTerm,
leaderId = self._id))
def entries_response_received(self, entryResponse):
self._current_state.rcv_AppendEntriesResponse(entryResponse)
def rcv_AppendEntries(self, entries, host):
# Always let leader know if fallen behind.
if entries.term < self.currentTerm:
return raft_pb2.AppendEntriesResponse(term = self.currentTerm, success = False)
return self._current_state.rcv_AppendEntries(entries)
class RaftGrpcNode(RaftNode):
def __init__(self, config):
cfg = None
with open(config) as f:
cfg = json.load(f)
port = cfg["messaging"]["me"]["port"]
self.servicer = NodeGrpcServer(self, port)
RaftNode.__init__(self, GrpcMessageStrategy(self.servicer, cfg), cfg)
servicer_thread = Thread(target=self.servicer.start_server)
servicer_thread.start()
print("Servicer started")