@NotThreadSafe public abstract class ObjectLowLevelOutputStream extends OutputStream implements ContentHashable
We upload data in partitions. When write(), the data will be persisted to
a temporary file mFile
on the local disk. When the data mPartitionOffset
in this temporary file reaches the mPartitionSize
, the file will be submitted
to the upload executor mExecutor
and we do not wait for uploads to finish.
A new temp file will be created for the future write and the mPartitionOffset
will be reset to zero. The process goes until all the data has been written to temp files.
In flush(), we upload the buffered data if they are bigger than 5MB and wait for all uploads to finish. The temp files will be deleted after uploading successfully.
In close(), we upload the last part of data (if exists), wait for all uploads to finish, and complete the multipart upload.
close() will not be retried, but all the multipart upload related operations(init, upload, complete, and abort) will be retried.
If an error occurs and we have no way to recover, we abort the multipart uploads.
Some multipart uploads may not be completed/aborted in normal ways and need periodical cleanup
by enabling the PropertyKey.UNDERFS_CLEANUP_ENABLED
.
When a leader master starts or a cleanup interval is reached, all the multipart uploads
older than clean age will be cleaned.
Modifier and Type | Field and Description |
---|---|
protected static org.slf4j.Logger |
LOG |
protected String |
mBucketName
Bucket name of the object storage bucket.
|
protected boolean |
mClosed
Flag to indicate this stream has been closed, to ensure close is only done once.
|
protected File |
mFile
The local temp file that will be uploaded when reaches the partition size
or when flush() is called and this file is bigger than
UPLOAD_THRESHOLD . |
protected MessageDigest |
mHash
The MD5 hash of the file.
|
protected String |
mKey
Key of the file when it is uploaded to object storage.
|
protected OutputStream |
mLocalOutputStream
The output stream to the local temp file.
|
protected long |
mPartitionOffset
When the offset reaches the partition size, we upload the temp file.
|
protected long |
mPartitionSize
The maximum allowed size of a partition.
|
protected java.util.function.Supplier<RetryPolicy> |
mRetryPolicy
The retry policy of this multipart upload.
|
protected byte[] |
mSingleCharWrite
Pre-allocated byte buffer for writing single characters.
|
protected List<String> |
mTmpDirs |
protected static long |
UPLOAD_THRESHOLD
Only parts bigger than 5MB could be uploaded through multipart upload,
except the last part.
|
Constructor and Description |
---|
ObjectLowLevelOutputStream(String bucketName,
String key,
com.google.common.util.concurrent.ListeningExecutorService executor,
long streamingUploadPartitionSize,
AlluxioConfiguration ufsConf)
Constructs a new stream for writing a file.
|
Modifier and Type | Method and Description |
---|---|
protected void |
abortMultiPartUpload() |
protected abstract void |
abortMultiPartUploadInternal() |
void |
close() |
protected abstract void |
completeMultiPartUploadInternal() |
protected abstract void |
createEmptyObject(String key) |
void |
flush() |
int |
getPartNumber()
Get the part number.
|
protected abstract void |
initMultiPartUploadInternal() |
protected abstract void |
putObject(String key,
File file,
String md5) |
protected void |
uploadPart()
Uploads part async.
|
protected void |
uploadPart(File file,
int partNumber,
boolean lastPart) |
protected abstract void |
uploadPartInternal(File file,
int partNumber,
boolean isLastPart,
String md5) |
protected void |
waitForAllPartsUpload() |
void |
write(byte[] b) |
void |
write(byte[] b,
int off,
int len) |
void |
write(int b) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
getContentHash
protected static final org.slf4j.Logger LOG
protected static final long UPLOAD_THRESHOLD
protected final String mBucketName
protected final String mKey
protected final java.util.function.Supplier<RetryPolicy> mRetryPolicy
protected final byte[] mSingleCharWrite
@Nullable protected MessageDigest mHash
protected boolean mClosed
protected long mPartitionOffset
protected final long mPartitionSize
@Nullable protected File mFile
UPLOAD_THRESHOLD
.@Nullable protected OutputStream mLocalOutputStream
public ObjectLowLevelOutputStream(String bucketName, String key, com.google.common.util.concurrent.ListeningExecutorService executor, long streamingUploadPartitionSize, AlluxioConfiguration ufsConf)
bucketName
- the name of the bucketkey
- the key of the filestreamingUploadPartitionSize
- the size in bytes for partitions of streaming uploadsexecutor
- executorufsConf
- the object store under file system configurationpublic void write(int b) throws IOException
write
in class OutputStream
IOException
public void write(byte[] b) throws IOException
write
in class OutputStream
IOException
public void write(byte[] b, int off, int len) throws IOException
write
in class OutputStream
IOException
public void flush() throws IOException
flush
in interface Flushable
flush
in class OutputStream
IOException
public void close() throws IOException
close
in interface Closeable
close
in interface AutoCloseable
close
in class OutputStream
IOException
protected void uploadPart() throws IOException
IOException
protected void uploadPart(File file, int partNumber, boolean lastPart)
protected void abortMultiPartUpload()
protected void waitForAllPartsUpload() throws IOException
IOException
public int getPartNumber()
protected abstract void uploadPartInternal(File file, int partNumber, boolean isLastPart, @Nullable String md5) throws IOException
IOException
protected abstract void initMultiPartUploadInternal() throws IOException
IOException
protected abstract void completeMultiPartUploadInternal() throws IOException
IOException
protected abstract void abortMultiPartUploadInternal() throws IOException
IOException
protected abstract void createEmptyObject(String key) throws IOException
IOException
protected abstract void putObject(String key, File file, @Nullable String md5) throws IOException
IOException
Copyright © 2023. All Rights Reserved.