@ThreadSafe public class RaftJournalSystem extends AbstractJournalSystem
Unlike the Ratis framework, Alluxio updates state machine state *before* writing to the journal. This lets us avoid journaling operations which do not result in state modification. To make this work in the Ratis framework, we allow RPCs to modify state directly, then write an entry to Ratis afterwards. Once the entry is journaled, Ratis will attempt to apply the journal entry to each master. The entry has already been applied on the primary, so we treat all journal entries applied to the primary as no-ops to avoid double-application.
There are two cases to worry about: (1) incorrectly ignoring an entry and (2) incorrectly applying an entry.
This could happen if a server thinks it is the primary, and ignores a journal entry served from the real primary. To prevent this, primaries wait for a quiet period before serving requests. During this time, the previous primary will go through at least two election cycles without successfully sending heartbeats to the majority of the cluster. If the old primary successfully sent a heartbeat to a node which elected the new primary, the old primary would realize it isn't primary and step down. Therefore, the old primary will step down and no longer ignore requests by the time the new primary begins sending entries.
Entries can never be double-applied to a primary's state because as long as it is the primary, it will ignore all entries, and once it becomes standby, it will completely reset its state and rejoin the cluster.
The way we apply journal entries to the primary makes it tricky to perform primary state snapshots. Normally Ratis would decide when it wants a snapshot, but with the pre-apply protocol we may be in the middle of modifying state when the snapshot would happen. To manage this, we declare an AtomicBoolean field which decides whether it will be allowed to take snapshots. Normally, snapshots are prohibited on the primary. However, we don't want the primary's log to grow unbounded, so we allow a snapshot to be taken once a day at a user-configured time. To support this, all state changes must first acquire a read lock, and snapshotting requires the corresponding write lock. Once we have the write lock for all state machines, we enable snapshots in Ratis through our AtomicBoolean, then wait for any snapshot to complete.
JournalSystem.Builder, JournalSystem.Mode
Modifier and Type | Field and Description |
---|---|
static org.apache.ratis.protocol.RaftGroupId |
RAFT_GROUP_ID |
static UUID |
RAFT_GROUP_UUID |
Constructor and Description |
---|
RaftJournalSystem(URI path,
NetworkAddressUtils.ServiceType serviceType)
Creates a
RaftJournalSystem . |
Modifier and Type | Method and Description |
---|---|
void |
addQuorumServer(NetAddress serverNetAddress)
Adds a server to the quorum.
|
CatchupFuture |
catchup(Map<String,Long> journalSequenceNumbers)
Initiates a catching up of journals to given sequences.
|
void |
checkpoint(StateLockManager stateLockManager)
Creates a checkpoint in the primary master journal system.
|
Journal |
createJournal(Master master)
Creates a journal for the given state machine.
|
void |
format()
Formats the journal system.
|
void |
gainPrimacy()
Transitions the journal to primary mode.
|
org.apache.ratis.protocol.RaftGroup |
getCurrentGroup() |
Map<String,Long> |
getCurrentSequenceNumbers()
Used to get the current state from a leader journal system.
|
Map<ServiceType,GrpcService> |
getJournalServices() |
String |
getLeaderId()
Get the leader id.
|
protected int |
getLeaderIndex()
Gets leader index.
|
org.apache.ratis.protocol.RaftPeerId |
getLocalPeerId() |
PrimarySelector |
getPrimarySelector() |
List<QuorumServerInfo> |
getQuorumServerInfoList()
Used to get information of internal RAFT quorum.
|
int |
getRoleId()
Get the role index.
|
TransferLeaderMessage |
getTransferLeaderMessage(String transferId)
Gets exception message throwing when transfer leader.
|
boolean |
isEmpty()
Returns whether the journal is formatted and has not had any entries written to it yet.
|
boolean |
isFormatted() |
boolean |
isLeader() |
boolean |
isSuspended() |
void |
losePrimacy()
Transitions the journal to standby mode.
|
void |
notifyLeadershipStateChanged(boolean isLeader)
Notifies the journal that the leadership state has changed.
|
void |
removeQuorumServer(NetAddress serverNetAddress)
Removes from RAFT quorum, a server with given address.
|
void |
resetPriorities()
Resets RaftPeer priorities.
|
void |
resume()
Resumes applying for all journals.
|
CompletableFuture<org.apache.ratis.protocol.RaftClientReply> |
sendMessageAsync(org.apache.ratis.protocol.RaftPeerId server,
org.apache.ratis.protocol.Message message)
Sends a message to a raft server asynchronously.
|
CompletableFuture<org.apache.ratis.protocol.RaftClientReply> |
sendMessageAsync(org.apache.ratis.protocol.RaftPeerId server,
org.apache.ratis.protocol.Message message,
long timeoutMs)
Sends a message to a raft server asynchronously.
|
void |
startInternal()
Throws
AlluxioRuntimeException when it cannot start a RaftCluster and therefore
cannot join the quorum. |
void |
stopInternal()
Stops the journal system.
|
void |
suspend(Runnable interruptCallback)
Suspends applying for all journals.
|
String |
toString() |
String |
transferLeadership(NetAddress newLeaderNetAddress)
Transfers the leadership of the quorum to another server.
|
void |
updateGroup()
Updates raft group with the current values from raft server.
|
addJournalSink, getJournalSinks, registerMetrics, removeJournalSink, start, stop
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
waitForCatchup
public static final UUID RAFT_GROUP_UUID
public static final org.apache.ratis.protocol.RaftGroupId RAFT_GROUP_ID
public RaftJournalSystem(URI path, NetworkAddressUtils.ServiceType serviceType)
RaftJournalSystem
.path
- where the journal will be storedserviceType
- is either MASTER_RAFT or JOB_MASTER_RAFTpublic org.apache.ratis.protocol.RaftPeerId getLocalPeerId()
public org.apache.ratis.protocol.RaftGroup getCurrentGroup()
public Journal createJournal(Master master)
JournalSystem
Journaled#processJournalEntry(JournalEntry)
and
Journaled.resetState()
to keep the state machine's state in sync with
the entries written to the journal.master
- the master to create the journal forJournal
public void gainPrimacy()
JournalSystem
public void losePrimacy()
JournalSystem
public Map<String,Long> getCurrentSequenceNumbers()
JournalSystem
public void suspend(Runnable interruptCallback) throws IOException
JournalSystem
interruptCallback
- the callback function to be invoked when the suspension is interruptedIOException
public void resume() throws IOException
JournalSystem
IOException
public boolean isSuspended()
public CatchupFuture catchup(Map<String,Long> journalSequenceNumbers)
JournalSystem
journalSequenceNumbers
- sequence to advance per each journalpublic void checkpoint(StateLockManager stateLockManager) throws IOException
JournalSystem
stateLockManager
- used to prevent reads and writes while the journal system is
checkpointingIOException
public Map<ServiceType,GrpcService> getJournalServices()
public void startInternal()
AlluxioRuntimeException
when it cannot start a RaftCluster and therefore
cannot join the quorum.startInternal
in class AbstractJournalSystem
public void stopInternal()
AbstractJournalSystem
stopInternal
in class AbstractJournalSystem
public List<QuorumServerInfo> getQuorumServerInfoList() throws IOException
IOException
public CompletableFuture<org.apache.ratis.protocol.RaftClientReply> sendMessageAsync(org.apache.ratis.protocol.RaftPeerId server, org.apache.ratis.protocol.Message message)
server
- the raft peer id of the target servermessage
- the message to sendpublic CompletableFuture<org.apache.ratis.protocol.RaftClientReply> sendMessageAsync(org.apache.ratis.protocol.RaftPeerId server, org.apache.ratis.protocol.Message message, long timeoutMs)
server
- the raft peer id of the target servermessage
- the message to sendtimeoutMs
- the message timeout in millisecondspublic boolean isLeader()
true
if this journal system is the leaderpublic void removeQuorumServer(NetAddress serverNetAddress) throws IOException
serverNetAddress
- address of the server to remove from the quorumIOException
- raft exceptionpublic void resetPriorities() throws IOException
IOException
- raft exceptionpublic String transferLeadership(NetAddress newLeaderNetAddress)
newLeaderNetAddress
- the address of the serverpublic TransferLeaderMessage getTransferLeaderMessage(String transferId)
transferId
- the guid of transferLeader commandpublic void addQuorumServer(NetAddress serverNetAddress) throws IOException
serverNetAddress
- the address of the serverIOException
- if error occurred while performing the operationpublic boolean isEmpty()
JournalSystem
public boolean isFormatted()
public void format() throws IOException
JournalSystem
IOException
public PrimarySelector getPrimarySelector()
public void notifyLeadershipStateChanged(boolean isLeader)
isLeader
- whether the local server is teh current leaderpublic void updateGroup()
public int getRoleId()
RaftProtos.RaftPeerRole
.public String getLeaderId()
RaftProtos.RaftPeerRole
.protected int getLeaderIndex()
Copyright © 2023. All Rights Reserved.