public class S3NettyHandler extends Object
Modifier and Type | Field and Description |
---|---|
static Pattern |
BUCKET_ADJACENT_DOTS_DASHES_PATTERN |
static Pattern |
BUCKET_INVALID_SUFFIX_PATTERN |
static Pattern |
BUCKET_INVALIDATION_PREFIX_PATTERN |
static boolean |
BUCKET_NAMING_RESTRICTION_ENABLED |
static com.google.common.cache.Cache<String,Boolean> |
BUCKET_PATH_CACHE |
static int |
BUCKET_PATH_CACHE_SIZE |
static Pattern |
BUCKET_PATH_PATTERN |
static Pattern |
BUCKET_VALID_NAME_PATTERN |
AsyncUserAccessAuditLogWriter |
mAsyncAuditLogWriter |
static Pattern |
OBJECT_PATH_PATTERN |
static WritePType |
S3_WRITE_TYPE |
Constructor and Description |
---|
S3NettyHandler(String bucket,
String object,
io.netty.handler.codec.http.HttpRequest request,
io.netty.channel.ChannelHandlerContext ctx,
FileSystem fileSystem,
DoraWorker doraWorker,
AsyncUserAccessAuditLogWriter asyncAuditLogWriter)
Constructs an instance of
S3NettyHandler . |
Modifier and Type | Method and Description |
---|---|
boolean |
addContent(io.netty.handler.codec.http.HttpContent content)
Adds content to the content queue.
|
static void |
checkPathIsAlluxioDirectory(FileSystem fs,
String bucketPath,
S3AuditContext auditContext)
Check if a path in alluxio is a directory.
|
S3AuditContext |
createAuditContext(String command,
String user,
String bucket,
String object)
Creates a
S3AuditContext instance. |
static S3NettyHandler |
createHandler(io.netty.channel.ChannelHandlerContext context,
io.netty.handler.codec.http.HttpRequest request,
FileSystem fileSystem,
DoraWorker doraWorker,
AsyncUserAccessAuditLogWriter asyncAuditLogWriter)
Create a S3Handler based on the incoming Request.
|
void |
doAuthentication()
Do S3 request authentication.
|
void |
extractAMZHeaders()
Utility function to help extract x-amz- headers from request.
|
String |
getBucket()
Get the bucket name of this request.
|
io.netty.channel.ChannelHandlerContext |
getContext()
Get the channel context of this request.
|
DoraWorker |
getDoraWorker()
Get dora worker object.
|
FileSystem |
getFileSystemForUser(String user) |
FileTransferType |
getFileTransferType()
Get the FileTransferType of the netty server.
|
FileSystem |
getFsClient()
Get system user FileSystem object.
|
String |
getHeader(String headerName)
get specified HTTP header value of this request.
|
String |
getHeaderOrDefault(String headerName,
String defaultHeaderValue)
get specified HTTP header with a default if not exist.
|
String |
getHttpMethod()
get HTTP verb of this request.
|
io.netty.handler.codec.http.HttpContent |
getLatestContent()
get HTTP content of this request.
|
String |
getObject()
Get the object name of this request.
|
String |
getQueryParameter(String queryParam)
retrieve given query parameter value.
|
io.netty.handler.codec.http.HttpRequest |
getRequest()
get HTTP request.
|
io.netty.handler.codec.http.HttpResponse |
getResponse()
get HTTP response.
|
S3NettyBaseTask |
getS3Task()
get S3Task of this S3Handler.
|
com.google.common.base.Stopwatch |
getStopwatch()
Get Stopwatch object used for recording this request's latency.
|
AlluxioURI |
getUfsPath(AlluxioURI objectPath)
Gets UFS full path from Alluxio path.
|
String |
getUser()
Get the user name of this request.
|
void |
init()
Initialize the S3Handler object in preparation for handling the request.
|
static void |
logAccess(io.netty.handler.codec.http.HttpRequest request,
io.netty.handler.codec.http.HttpResponse response,
com.google.common.base.Stopwatch stopWatch,
S3NettyBaseTask.OpType opType)
Log the access of every single http request.
|
BlockReader |
openBlock(String ufsFullPath,
long offset,
long length)
Gets a
BlockReader according the ufs full path, offset and length. |
void |
processHttpResponse(io.netty.handler.codec.http.HttpResponse response)
Writes HttpResponse into context channel, After writes context channel will close.
|
void |
processHttpResponse(io.netty.handler.codec.http.HttpResponse response,
boolean closeAfterWrite)
Writes HttpResponse into context channel.
|
void |
processMappedResponse(BlockReader blockReader,
long objectSize)
Writes data into netty channel by copying through ByteBuf.
|
void |
processTransferResponse(DataBuffer packet)
Writes a
DataBuffer into netty channel. |
void |
rejectUnsupportedResources()
Reject unsupported request from the given subresources from request.
|
boolean |
remainContent() |
static void |
setEntityTag(FileSystem fs,
AlluxioURI objectUri,
String entityTag)
This helper method is used to set the ETag xAttr on an object.
|
void |
setResponse(io.netty.handler.codec.http.HttpResponse response)
set FullHttpResponse for this request.
|
void |
setS3Task(S3NettyBaseTask task)
set S3Task for this S3Handler.
|
void |
setStopwatch(com.google.common.base.Stopwatch stopwatch)
Set the Stopwatch object used for recording this request's latency.
|
public AsyncUserAccessAuditLogWriter mAsyncAuditLogWriter
public static final Pattern BUCKET_PATH_PATTERN
public static final Pattern OBJECT_PATH_PATTERN
public static final boolean BUCKET_NAMING_RESTRICTION_ENABLED
public static final WritePType S3_WRITE_TYPE
public static final Pattern BUCKET_ADJACENT_DOTS_DASHES_PATTERN
public static final Pattern BUCKET_INVALIDATION_PREFIX_PATTERN
public static final Pattern BUCKET_INVALID_SUFFIX_PATTERN
public static final Pattern BUCKET_VALID_NAME_PATTERN
public static final int BUCKET_PATH_CACHE_SIZE
public S3NettyHandler(String bucket, String object, io.netty.handler.codec.http.HttpRequest request, io.netty.channel.ChannelHandlerContext ctx, FileSystem fileSystem, DoraWorker doraWorker, AsyncUserAccessAuditLogWriter asyncAuditLogWriter)
S3NettyHandler
.bucket
- object
- request
- ctx
- fileSystem
- doraWorker
- asyncAuditLogWriter
- public static S3NettyHandler createHandler(io.netty.channel.ChannelHandlerContext context, io.netty.handler.codec.http.HttpRequest request, FileSystem fileSystem, DoraWorker doraWorker, AsyncUserAccessAuditLogWriter asyncAuditLogWriter) throws Exception
context
- request
- fileSystem
- doraWorker
- asyncAuditLogWriter
- Exception
public void init() throws Exception
Exception
public void extractAMZHeaders()
public void rejectUnsupportedResources() throws S3Exception
S3Exception
public void doAuthentication() throws Exception
Exception
public S3AuditContext createAuditContext(String command, String user, @Nullable String bucket, @Nullable String object)
S3AuditContext
instance.command
- the command to be logged by this S3AuditContext
user
- user namebucket
- bucket nameobject
- object nameS3AuditContext
instancepublic void processHttpResponse(io.netty.handler.codec.http.HttpResponse response)
response
- HttpResponse objectpublic void processHttpResponse(io.netty.handler.codec.http.HttpResponse response, boolean closeAfterWrite)
response
- HttpResponse objectcloseAfterWrite
- if true, After writes context channel will closepublic void processTransferResponse(DataBuffer packet)
DataBuffer
into netty channel. It supports zero copy through ByteBuf and
FileRegion.packet
- DataBuffer packetpublic void processMappedResponse(BlockReader blockReader, long objectSize) throws IOException
blockReader
- reader instanceobjectSize
- size of the objectIOException
public BlockReader openBlock(String ufsFullPath, long offset, long length) throws IOException, AccessControlException
BlockReader
according the ufs full path, offset and length.ufsFullPath
- UFS full pathoffset
- the offset of this readinglength
- the length of this readingIOException
AccessControlException
public void setS3Task(S3NettyBaseTask task)
task
- public void setResponse(io.netty.handler.codec.http.HttpResponse response)
response
- public String getUser()
public String getBucket()
public String getObject()
public io.netty.channel.ChannelHandlerContext getContext()
public FileTransferType getFileTransferType()
public S3NettyBaseTask getS3Task()
public FileSystem getFsClient()
public DoraWorker getDoraWorker()
public io.netty.handler.codec.http.HttpRequest getRequest()
public io.netty.handler.codec.http.HttpResponse getResponse()
public io.netty.handler.codec.http.HttpContent getLatestContent() throws InterruptedException
InterruptedException
public boolean remainContent()
public boolean addContent(io.netty.handler.codec.http.HttpContent content)
content
- public String getHttpMethod()
public String getHeader(String headerName)
headerName
- public String getHeaderOrDefault(String headerName, String defaultHeaderValue)
headerName
- defaultHeaderValue
- @Nullable public String getQueryParameter(String queryParam)
queryParam
- public com.google.common.base.Stopwatch getStopwatch()
public void setStopwatch(com.google.common.base.Stopwatch stopwatch)
stopwatch
- public FileSystem getFileSystemForUser(String user)
user
- the Subject
name of the filesystem userFileSystem
with the subject set to the provided userpublic AlluxioURI getUfsPath(AlluxioURI objectPath) throws S3Exception
objectPath
- the Alluxio pathS3Exception
public static void checkPathIsAlluxioDirectory(FileSystem fs, String bucketPath, @Nullable S3AuditContext auditContext) throws S3Exception
fs
- instance of FileSystem
bucketPath
- bucket complete pathauditContext
- the audit context for exceptionS3Exception
public static void setEntityTag(FileSystem fs, AlluxioURI objectUri, String entityTag) throws IOException, AlluxioException
fs
- The FileSystem
used to make the gRPC requestobjectUri
- The AlluxioURI
for the object to updateentityTag
- The entity tag of the object (MD5 checksum of the object contents)IOException
AlluxioException
public static void logAccess(io.netty.handler.codec.http.HttpRequest request, io.netty.handler.codec.http.HttpResponse response, com.google.common.base.Stopwatch stopWatch, S3NettyBaseTask.OpType opType)
request
- response
- stopWatch
- opType
- Copyright © 2023. All Rights Reserved.