128 lines
4.7 KiB
Python
128 lines
4.7 KiB
Python
from threading import Timer, Thread
|
|
import random
|
|
import uuid
|
|
import json
|
|
import time
|
|
|
|
from MyRaft.messagestrategy import MessageStrategy, GrpcMessageStrategy, NodeGrpcServer
|
|
import MyRaft.raft_pb2 as raft_pb2
|
|
|
|
class RaftNode:
|
|
def __init__(self, message_strategy: MessageStrategy, config):
|
|
"""
|
|
message_strategy -- Strategy used to send messagesfor the node.
|
|
"""
|
|
import MyRaft.follower as follower
|
|
# Do we need to know who the current leader is? For the purposes of
|
|
# the cameras knowing, (as the leader of raft is the leader of out
|
|
# swarm) we should know this on each node. VotedFor may work, as it is
|
|
# who we last voted for, and therefore who we think is leader. We also need
|
|
# this to redirect client requests to the leader.
|
|
|
|
self._current_state = None
|
|
self._timer = None
|
|
self._message_strategy = None
|
|
|
|
# Persistent State
|
|
self.currentTerm = 0
|
|
self.votedFor = None
|
|
self.log = []
|
|
|
|
# Volatile state
|
|
self.commitIndex = 0
|
|
self.lastApplied = 0
|
|
|
|
# We only need this for candidates/leaders...
|
|
self._id = str(uuid.uuid1())
|
|
if message_strategy is None or not isinstance(message_strategy, MessageStrategy):
|
|
raise ValueError(MessageStrategy)
|
|
|
|
self._message_strategy = message_strategy
|
|
|
|
self._cfg = config
|
|
self._min_timout = self._cfg["raft"]["min_election_timeout"]
|
|
self._vary_timeout = self._cfg["raft"]["varying_election_timeout"]
|
|
self._heartbeat_timeout = self._min_timout // 2
|
|
# Also need to check if we can load log from stable storage in case of
|
|
# restart.
|
|
|
|
# All nodes start as a follower. State starts the timeout always.
|
|
self._current_state = follower.Follower(self)
|
|
self._state_changed = []
|
|
|
|
def add_state_change(self, on_change):
|
|
"""Adds a callback for when the current state of the node changes.
|
|
|
|
Args
|
|
on_change: function to call when the state changes.
|
|
"""
|
|
self._state_changed.append(on_change)
|
|
|
|
def set_state(self, state):
|
|
"""Sets the current state of the raft node.
|
|
|
|
state -- New state of the node.
|
|
"""
|
|
# State Pattern: https://en.wikipedia.org/wiki/State_pattern
|
|
del(self._current_state)
|
|
self._current_state = state
|
|
for cb in self._state_changed:
|
|
cb()
|
|
|
|
def timeout_elapsed(self):
|
|
"""Election or heartbeat timeout has elapsed."""
|
|
print("Node timeout elapsed")
|
|
self._current_state.heartbeat_elapsed()
|
|
|
|
def set_timeout(self, min_timeout, vary_timeout):
|
|
"""Stops the old timer and restarts it to the specified time.
|
|
|
|
min_timeout -- The minimum time that can be used for the timer.
|
|
vary_timout -- Default 200, the additional random varying time (0 - vary_timeout) to add to timer.
|
|
"""
|
|
if self._timer is not None:
|
|
self._timer.cancel()
|
|
randy = random.randint(0,vary_timeout)
|
|
self._timer = Timer(min_timeout + randy, self.timeout_elapsed)
|
|
self._timer.start()
|
|
|
|
def send_RequestVote(self):
|
|
self._message_strategy.send_RequestVote(raft_pb2.RequestVote(term = self.currentTerm,
|
|
candidateId = self._id))
|
|
|
|
def vote_requested(self, request):
|
|
return self._current_state.rcv_RequestVote(request)
|
|
|
|
def vote_received(self, voter):
|
|
print("Node received vote")
|
|
self._current_state.rcv_vote(voter)
|
|
|
|
def send_AppendEntries(self, entry):
|
|
pass
|
|
|
|
def send_empty_AppendEntries(self):
|
|
self._message_strategy.send_AppendEntries(raft_pb2.AppendEntries(term = self.currentTerm,
|
|
leaderId = self._id))
|
|
|
|
def entries_response_received(self, entryResponse):
|
|
self._current_state.rcv_AppendEntriesResponse(entryResponse)
|
|
|
|
def rcv_AppendEntries(self, entries, host):
|
|
# Always let leader know if fallen behind.
|
|
if entries.term < self.currentTerm:
|
|
return raft_pb2.AppendEntriesResponse(term = self.currentTerm, success = False)
|
|
return self._current_state.rcv_AppendEntries(entries)
|
|
|
|
class RaftGrpcNode(RaftNode):
|
|
|
|
def __init__(self, config):
|
|
cfg = None
|
|
with open(config) as f:
|
|
cfg = json.load(f)
|
|
port = cfg["messaging"]["me"]["port"]
|
|
self.servicer = NodeGrpcServer(self, port)
|
|
RaftNode.__init__(self, GrpcMessageStrategy(self.servicer, cfg), cfg)
|
|
servicer_thread = Thread(target=self.servicer.start_server)
|
|
servicer_thread.start()
|
|
print("Servicer started")
|