@ThreadSafe public class RaftJournalSystem extends AbstractJournalSystem
Unlike the Ratis framework, Alluxio updates state machine state *before* writing to the journal. This lets us avoid journaling operations which do not result in state modification. To make this work in the Ratis framework, we allow RPCs to modify state directly, then write an entry to Ratis afterwards. Once the entry is journaled, Ratis will attempt to apply the journal entry to each master. The entry has already been applied on the primary, so we treat all journal entries applied to the primary as no-ops to avoid double-application.
There are two cases to worry about: (1) incorrectly ignoring an entry and (2) incorrectly applying an entry.
This could happen if a server thinks it is the primary, and ignores a journal entry served from the real primary. To prevent this, primaries wait for a quiet period before serving requests. During this time, the previous primary will go through at least two election cycles without successfully sending heartbeats to the majority of the cluster. If the old primary successfully sent a heartbeat to a node which elected the new primary, the old primary would realize it isn't primary and step down. Therefore, the old primary will step down and no longer ignore requests by the time the new primary begins sending entries.
Entries can never be double-applied to a primary's state because as long as it is the primary, it will ignore all entries, and once it becomes secondary, it will completely reset its state and rejoin the cluster.
The way we apply journal entries to the primary makes it tricky to perform primary state snapshots. Normally Ratis would decide when it wants a snapshot, but with the pre-apply protocol we may be in the middle of modifying state when the snapshot would happen. To manage this, we declare an AtomicBoolean field which decides whether it will be allowed to take snapshots. Normally, snapshots are prohibited on the primary. However, we don't want the primary's log to grow unbounded, so we allow a snapshot to be taken once a day at a user-configured time. To support this, all state changes must first acquire a read lock, and snapshotting requires the corresponding write lock. Once we have the write lock for all state machines, we enable snapshots in Copycat through our AtomicBoolean, then wait for any snapshot to complete.
JournalSystem.Builder, JournalSystem.Mode
Modifier and Type | Field and Description |
---|---|
static org.apache.ratis.protocol.RaftGroupId |
RAFT_GROUP_ID |
static UUID |
RAFT_GROUP_UUID |
Modifier and Type | Method and Description |
---|---|
void |
addQuorumServer(NetAddress serverNetAddress)
Adds a server to the quorum.
|
CatchupFuture |
catchup(Map<String,Long> journalSequenceNumbers)
Initiates a catching up of journals to given sequences.
|
void |
checkpoint()
Creates a checkpoint in the primary master journal system.
|
static RaftJournalSystem |
create(RaftJournalConfiguration conf)
Creates and initializes a raft journal system.
|
Journal |
createJournal(Master master)
Creates a journal for the given state machine.
|
void |
format()
Formats the journal system.
|
void |
gainPrimacy()
Transitions the journal to primary mode.
|
Map<String,Long> |
getCurrentSequenceNumbers()
Used to get the current state from a leader journal system.
|
Map<ServiceType,GrpcService> |
getJournalServices() |
org.apache.ratis.protocol.RaftPeerId |
getLocalPeerId() |
PrimarySelector |
getPrimarySelector() |
List<QuorumServerInfo> |
getQuorumServerInfoList()
Used to get information of internal RAFT quorum.
|
boolean |
isEmpty()
Returns whether the journal is formatted and has not had any entries written to it yet.
|
boolean |
isFormatted() |
boolean |
isLeader() |
boolean |
isSnapshotAllowed() |
boolean |
isSuspended() |
void |
losePrimacy()
Transitions the journal to secondary mode.
|
void |
notifyLeadershipStateChanged(boolean isLeader)
Notifies the journal that the leadership state has changed.
|
void |
removeQuorumServer(NetAddress serverNetAddress)
Removes from RAFT quorum, a server with given address.
|
void |
resume()
Resumes applying for all journals.
|
CompletableFuture<org.apache.ratis.protocol.RaftClientReply> |
sendMessageAsync(org.apache.ratis.protocol.RaftPeerId server,
org.apache.ratis.protocol.Message message)
Sends a message to a raft server asynchronously.
|
void |
startInternal()
Starts the journal system.
|
void |
stopInternal()
Stops the journal system.
|
void |
suspend(Runnable interruptCallback)
Suspends applying for all journals.
|
addJournalSink, getJournalSinks, removeJournalSink, start, stop
public static final UUID RAFT_GROUP_UUID
public static final org.apache.ratis.protocol.RaftGroupId RAFT_GROUP_ID
public static RaftJournalSystem create(RaftJournalConfiguration conf)
conf
- raft journal configurationpublic org.apache.ratis.protocol.RaftPeerId getLocalPeerId()
public Journal createJournal(Master master)
JournalSystem
Journaled#processJournalEntry(JournalEntry)
and
Journaled.resetState()
to keep the state machine's state in sync with
the entries written to the journal.master
- the master to create the journal forJournal
public void gainPrimacy()
JournalSystem
public void losePrimacy()
JournalSystem
public Map<String,Long> getCurrentSequenceNumbers()
JournalSystem
public void suspend(Runnable interruptCallback) throws IOException
JournalSystem
interruptCallback
- the callback function to be invoked when the suspension is interruptedIOException
public void resume() throws IOException
JournalSystem
IOException
public boolean isSuspended()
public CatchupFuture catchup(Map<String,Long> journalSequenceNumbers)
JournalSystem
journalSequenceNumbers
- sequence to advance per each journalpublic void checkpoint() throws IOException
JournalSystem
IOException
public Map<ServiceType,GrpcService> getJournalServices()
public void startInternal() throws InterruptedException, IOException
AbstractJournalSystem
startInternal
in class AbstractJournalSystem
InterruptedException
IOException
public void stopInternal() throws InterruptedException, IOException
AbstractJournalSystem
stopInternal
in class AbstractJournalSystem
InterruptedException
IOException
public List<QuorumServerInfo> getQuorumServerInfoList() throws IOException
IOException
public CompletableFuture<org.apache.ratis.protocol.RaftClientReply> sendMessageAsync(org.apache.ratis.protocol.RaftPeerId server, org.apache.ratis.protocol.Message message)
server
- the raft peer id of the target servermessage
- the message to sendpublic boolean isLeader()
true
if this journal system is the leaderpublic void removeQuorumServer(NetAddress serverNetAddress) throws IOException
serverNetAddress
- address of the server to remove from the quorumIOException
public void addQuorumServer(NetAddress serverNetAddress) throws IOException
serverNetAddress
- the address of the serverIOException
- if error occurred while performing the operationpublic boolean isEmpty()
JournalSystem
public boolean isFormatted()
public void format() throws IOException
JournalSystem
IOException
public PrimarySelector getPrimarySelector()
public boolean isSnapshotAllowed()
public void notifyLeadershipStateChanged(boolean isLeader)
isLeader
- whether the local server is teh current leaderCopyright © 2023. All Rights Reserved.