@ThreadSafe public final class RaftJournalSystem extends AbstractJournalSystem
Unlike the Copycat framework, Alluxio updates state machine state *before* writing to the journal. This lets us avoid journaling operations which do not result in state modification. To make this work in the Copycat framework, we allow RPCs to modify state directly, then write an entry to Copycat afterwards. Once the entry is journaled, Copycat will attempt to apply the journal entry to each master. The entry has already been applied on the primary, so we treat all journal entries applied to the primary as no-ops to avoid double-application.
There are two cases to worry about: (1) incorrectly ignoring an entry and (2) incorrectly applying an entry.
This could happen if a server thinks it is the primary, and ignores a journal entry served from the real primary. To prevent this, primaries wait for a quiet period before serving requests. During this time, the previous primary will go through at least two election cycles without successfully sending heartbeats to the majority of the cluster. If the old primary successfully sent a heartbeat to a node which elected the new primary, the old primary would realize it isn't primary and step down. Therefore, the old primary will step down and no longer ignore requests by the time the new primary begins sending entries.
Entries can never be double-applied to a primary's state because as long as it is the primary, it will ignore all entries, and once it becomes secondary, it will completely reset its state and rejoin the cluster.
The way we apply journal entries to the primary makes it tricky to perform primary state snapshots. Normally Copycat would decide when it wants a snapshot, but with the pre-apply protocol we may be in the middle of modifying state when the snapshot would happen. To manage this, we inject an AtomicBoolean into Copycat which decides whether it will be allowed to take snapshots. Normally, snapshots are prohibited on the primary. However, we don't want the primary's log to grow unbounded, so we allow a snapshot to be taken once a day at a user-configured time. To support this, all state changes must first acquire a read lock, and snapshotting requires the corresponding write lock. Once we have the write lock for all state machines, we enable snapshots in Copycat through our AtomicBoolean, then wait for any snapshot to complete.
JournalSystem.Builder, JournalSystem.Mode
Modifier and Type | Method and Description |
---|---|
CatchupFuture |
catchup(Map<String,Long> journalSequenceNumbers)
Initiates a catching up of journals to given sequences.
|
void |
checkpoint()
Creates a checkpoint in the primary master journal system.
|
static RaftJournalSystem |
create(RaftJournalConfiguration conf)
Creates and initializes a raft journal system.
|
Journal |
createJournal(Master master)
Creates a journal for the given state machine.
|
static io.atomix.catalyst.serializer.Serializer |
createSerializer() |
void |
format()
Formats the journal system.
|
void |
gainPrimacy()
Transitions the journal to primary mode.
|
Map<String,Long> |
getCurrentSequenceNumbers()
Used to get the current state from a leader journal system.
|
PrimarySelector |
getPrimarySelector() |
List<QuorumServerInfo> |
getQuorumServerInfoList()
Used to get information of internal RAFT quorum.
|
boolean |
isEmpty()
Returns whether the journal is formatted and has not had any entries written to it yet.
|
boolean |
isFormatted() |
boolean |
isLeader() |
void |
losePrimacy()
Transitions the journal to secondary mode.
|
void |
removeQuorumServer(NetAddress serverNetAddress)
Removes from RAFT quorum, a server with given address.
|
void |
resume()
Resumes applying for all journals.
|
void |
startInternal()
Starts the journal system.
|
void |
stopInternal()
Stops the journal system.
|
void |
suspend()
Suspends applying for all journals.
|
addJournalSink, getJournalSinks, removeJournalSink, start, stop
public static RaftJournalSystem create(RaftJournalConfiguration conf)
conf
- raft journal configurationpublic static io.atomix.catalyst.serializer.Serializer createSerializer()
StateMachine
public Journal createJournal(Master master)
JournalSystem
Journaled#processJournalEntry(JournalEntry)
and
Journaled.resetState()
to keep the state machine's state in sync with
the entries written to the journal.master
- the master to create the journal forJournal
public void gainPrimacy()
JournalSystem
public void losePrimacy()
JournalSystem
public Map<String,Long> getCurrentSequenceNumbers()
JournalSystem
public void suspend() throws IOException
JournalSystem
IOException
public void resume() throws IOException
JournalSystem
IOException
public CatchupFuture catchup(Map<String,Long> journalSequenceNumbers)
JournalSystem
journalSequenceNumbers
- sequence to advance per each journalpublic void checkpoint() throws IOException
JournalSystem
IOException
public void startInternal() throws InterruptedException, IOException
AbstractJournalSystem
startInternal
in class AbstractJournalSystem
InterruptedException
IOException
public void stopInternal() throws InterruptedException, IOException
AbstractJournalSystem
stopInternal
in class AbstractJournalSystem
InterruptedException
IOException
public List<QuorumServerInfo> getQuorumServerInfoList()
public boolean isLeader()
true
if this journal system is the leaderpublic void removeQuorumServer(NetAddress serverNetAddress) throws IOException
serverNetAddress
- address of the server to remove from the quorumIOException
public boolean isEmpty()
JournalSystem
public boolean isFormatted()
public void format() throws IOException
JournalSystem
IOException
public PrimarySelector getPrimarySelector()
Copyright © 2023. All Rights Reserved.