@NotThreadSafe public final class MasterWorkerInfo extends Object
MasterWorkerInfo
has the following groups of metadata:
1. Metadata like ID, address etc, represented by a StaticWorkerMeta
object.
This group is thread safe, meaning no locking is required.
2. Worker last updated timestamp. This is thread safe, meaning no locking is required.
3. Worker register status. This is guarded by a StampedLock.asReadWriteLock()
.
4. Worker resource usage, represented by a WorkerUsageMeta
object.
This is guarded by a StampedLock.asReadWriteLock()
.
5. Worker block lists, including the present blocks and blocks to be removed from the worker.
This is guarded by a StampedLock.asReadWriteLock()
.
When accessing certain fields in this object, external locking is required.
As listed above, group 1 and 2 are thread safe and do not require external locking.
Group 3, 4, and 5 require external locking.
Locking can be done with lockWorkerMeta(EnumSet, boolean)
.
This method returns a LockResource
which can be managed by try-finally.
Internally, WorkerMetaLock
is used to manage the corresponding internal locks
with an order and unlocking them properly on close.
If the fields are only read, shared locks should be acquired and released as below:
If the fields are updated, exclusive locks should be acquired and released as below:try (LockResource r = lockWorkerMeta( EnumSet.of(WorkerMetaLockSection.USAGE), true)) { ... }
The locks are internallytry (LockResource r = lockWorkerMeta( EnumSet.of( WorkerMetaLockSection.STATUS, WorkerMetaLockSection.USAGE, WorkerMetaLockSection.BLOCKS)), true)) { ... }
StampedLock
which are NOT reentrant!
We chose StampedLock
instead of ReentrantReadWriteLock
because the latter does not allow the write lock to be grabbed in one thread
but later released in another thread.
This is undesirable because when the worker registers in a stream, the thread
that handles the 1st message will acquire the lock and the thread that handles
the complete signal will be responsible for releasing the lock.
In the current gRPC architecture it is impossible to enforce the two threads
to be the same.
Because then locks are not reentrant, you must be extra careful NOT to
acquire the lock while holding, because that will result in a deadlock!
This is especially the case for write locks.
The current write lock holders include the following:
1. In DefaultBlockMaster
, the methods related to worker register/heartbeat,
and block removal/commit.
2. In WorkerRegisterContext
,
the write locks are held throughout the lifecycle.
3. In DefaultBlockMaster.LostWorkerDetectionHeartbeatExecutor#heartbeat()
Modifier and Type | Field and Description |
---|---|
boolean |
mIsRegistered
If true, the worker is considered registered.
|
Constructor and Description |
---|
MasterWorkerInfo(long id,
WorkerNetAddress address)
Creates a new instance of
MasterWorkerInfo . |
Modifier and Type | Method and Description |
---|---|
void |
addBlock(long blockId)
Adds a block to the worker.
|
void |
addLostStorage(Map<String,StorageList> lostStorage)
Adds new worker lost storage paths.
|
void |
addLostStorage(String tierAlias,
String dirPath)
Adds a new worker lost storage path.
|
WorkerInfo |
generateWorkerInfo(Set<GetWorkerReportOptions.WorkerInfoField> fieldRange,
boolean isLiveWorker)
Gets the selected field information for this worker.
|
long |
getAvailableBytes()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
int |
getBlockCount()
Return the block count of this worker.
|
Set<Long> |
getBlocks()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS specified. |
long |
getCapacityBytes()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
Map<String,Long> |
getFreeBytesOnTiers()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
long |
getId()
No locking required.
|
long |
getLastUpdatedTimeMs()
No locking required.
|
Map<String,List<String>> |
getLostStorage()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
long |
getStartTime()
No locking required.
|
StorageTierAssoc |
getStorageTierAssoc()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
int |
getToRemoveBlockCount()
Returns the number of blocks that should be removed from the worker.
|
Set<Long> |
getToRemoveBlocks()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS specified. |
Map<String,Long> |
getTotalBytesOnTiers()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
long |
getUsedBytes() |
Map<String,Long> |
getUsedBytesOnTiers()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
WorkerNetAddress |
getWorkerAddress()
StaticWorkerMeta is thread safe so the value can be read without locking. |
boolean |
hasLostStorage()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE specified. |
boolean |
isRegistered()
You should lock externally with
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.STATUS specified. |
LockResource |
lockWorkerMeta(EnumSet<WorkerMetaLockSection> lockTypes,
boolean isShared)
Locks the corresponding locks on the metadata groups.
|
void |
markAllBlocksToRemove()
Marks all the blocks on the worker to be removed.
|
Set<Long> |
register(StorageTierAssoc globalStorageTierAssoc,
List<String> storageTierAliases,
Map<String,Long> totalBytesOnTiers,
Map<String,Long> usedBytesOnTiers,
Set<Long> blocks)
Marks the worker as registered, while updating all of its metadata.
|
void |
removeBlockFromWorkerMeta(long blockId)
Removes a block from the worker.
|
void |
scheduleRemoveFromWorker(long blockId)
Remove the block from the worker metadata and add to the to-remove list.
|
String |
toString() |
void |
updateCapacityBytes(Map<String,Long> capacityBytesOnTiers)
Sets the capacity of the worker in bytes.
|
void |
updateLastUpdatedTimeMs()
Updates the last updated time of the worker in ms.
|
void |
updateToRemovedBlock(boolean add,
long blockId)
Adds or removes a block from the to-be-removed blocks set of the worker.
|
void |
updateUsage(StorageTierAssoc globalStorageTierAssoc,
List<String> storageTiers,
Map<String,Long> totalBytesOnTiers,
Map<String,Long> usedBytesOnTiers)
Updates the worker storage usage.
|
void |
updateUsedBytes(Map<String,Long> usedBytesOnTiers)
Sets the used space of the worker in bytes.
|
void |
updateUsedBytes(String tierAlias,
long usedBytesOnTier)
Sets the used space of the worker in bytes.
|
public boolean mIsRegistered
public MasterWorkerInfo(long id, WorkerNetAddress address)
MasterWorkerInfo
.id
- the worker id to useaddress
- the worker address to usepublic Set<Long> register(StorageTierAssoc globalStorageTierAssoc, List<String> storageTierAliases, Map<String,Long> totalBytesOnTiers, Map<String,Long> usedBytesOnTiers, Set<Long> blocks)
mStatusLock
, mUsageLock
and mBlockListLock
are required.
You should lock externally with lockWorkerMeta(EnumSet, boolean)
with all three lock types specified:
try (LockResource r = worker.lockWorkerMeta(EnumSet.of( WorkerMetaLockSection.STATUS, WorkerMetaLockSection.USAGE, WorkerMetaLockSection.BLOCKS), false)) { register(...); }
globalStorageTierAssoc
- global mapping between storage aliases and ordinal positionstorageTierAliases
- list of storage tier aliases in order of their position in the
hierarchytotalBytesOnTiers
- mapping from storage tier alias to total bytesusedBytesOnTiers
- mapping from storage tier alias to used byesblocks
- set of block ids on this workerpublic void addBlock(long blockId)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
An exclusive lock is required.blockId
- the id of the block to be addedpublic void removeBlockFromWorkerMeta(long blockId)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
An exclusive lock is required.blockId
- the id of the block to be removedpublic void scheduleRemoveFromWorker(long blockId)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
An exclusive lock is required.blockId
- the block IDpublic void addLostStorage(String tierAlias, String dirPath)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
An exclusive lock is required.tierAlias
- the tier aliasdirPath
- the lost storage pathpublic void addLostStorage(Map<String,StorageList> lostStorage)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
An exclusive lock is required.lostStorage
- the lost storage to addpublic WorkerInfo generateWorkerInfo(Set<GetWorkerReportOptions.WorkerInfoField> fieldRange, boolean isLiveWorker)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.fieldRange
- the client selected fieldsisLiveWorker
- the worker is live or notpublic WorkerNetAddress getWorkerAddress()
StaticWorkerMeta
is thread safe so the value can be read without locking.public long getAvailableBytes()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public Set<Long> getBlocks()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
A shared lock is required.
This returns a copy so the lock can be released when this method returns.public int getBlockCount()
public long getCapacityBytes()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public long getId()
public long getLastUpdatedTimeMs()
public Set<Long> getToRemoveBlocks()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
A shared lock is required.
This returns a copy so the lock can be released after this method returns.public long getUsedBytes()
public StorageTierAssoc getStorageTierAssoc()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public Map<String,Long> getTotalBytesOnTiers()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public Map<String,Long> getUsedBytesOnTiers()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public long getStartTime()
public boolean isRegistered()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.STATUS
specified.
A shared lock is required.public Map<String,Long> getFreeBytesOnTiers()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public Map<String,List<String>> getLostStorage()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.
This returns a copy so the lock can be released after the map is returned.public boolean hasLostStorage()
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
A shared lock is required.public void updateLastUpdatedTimeMs()
public void updateToRemovedBlock(boolean add, long blockId)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
An exclusive lock is required.add
- true if to add, to remove otherwiseblockId
- the id of the block to be added or removedpublic void updateCapacityBytes(Map<String,Long> capacityBytesOnTiers)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
An exclusive lock is required.capacityBytesOnTiers
- used bytes on each storage tierpublic void updateUsedBytes(Map<String,Long> usedBytesOnTiers)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
An exclusive lock is required.usedBytesOnTiers
- used bytes on each storage tierpublic void updateUsedBytes(String tierAlias, long usedBytesOnTier)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
An exclusive lock is required.tierAlias
- alias of storage tierusedBytesOnTier
- used bytes on certain storage tierpublic LockResource lockWorkerMeta(EnumSet<WorkerMetaLockSection> lockTypes, boolean isShared)
LockResource
which can be managed by a try-finally block.
See javadoc for WorkerMetaLock
for more details about the internals.lockTypes
- the locksisShared
- if false, the locking is exclusiveLockResource
of the WorkerMetaLock
public int getToRemoveBlockCount()
public void updateUsage(StorageTierAssoc globalStorageTierAssoc, List<String> storageTiers, Map<String,Long> totalBytesOnTiers, Map<String,Long> usedBytesOnTiers)
lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.USAGE
specified.
An exclusive lock is required.globalStorageTierAssoc
- storage tier setup from configurationstorageTiers
- the storage tierstotalBytesOnTiers
- the capacity of each tierusedBytesOnTiers
- the current usage of each tierpublic void markAllBlocksToRemove()
mBlocks
.
First all blocks will be marked to-be-removed, then in the stream when we see a block list,
those blocks will be added to mBlocks
and removed from mToRemoveBlocks
.
In this way, at the end of the stream, mToRemoveBlocks
contains only the blocks
that no longer exist on the worker.
You should lock externally with lockWorkerMeta(EnumSet, boolean)
with WorkerMetaLockSection.BLOCKS
specified.
An exclusive lock is required.Copyright © 2023. All Rights Reserved.