Data Movement Operators

Slack Docker Pulls

Data movement operators, help users to smoothly transport data across different UFSs. When the operator works on the data, each operation are regarded as a “job”, and the jobs are handled through the backend module called “job scheduler” (here). The data movement operators are also the building blocks for Policy Driven Data Management Tool.

job scheduler

Job scheduler is the underlying module responsible for create and manage the background jobs. Although it is a backend module, there are some user facing aspects worth noting:

  • Users will be able to track these jobs’ progress from different CLIs (see details for each operator).
  • The configuration of each job will be logged in our logs. We will also output error logs for the errors occurred in the jobs.
  • For fault tolerance, when a master is down, the job scheduler will restart the old job from the beginning and find out where it left when the master dies. We currently don’t have any resume mechanism. Jobs will need to spend some time figuring out the delta.

operators

copy

The copy operator copies a file or directory in the Alluxio file system distributed across workers using the scheduler.

If copy is run on a directory, files in the directory will be recursively copied.

$ ./bin/alluxio job copy --src <src> --dst <dst> --submit [--check-content] [--partial-listing]

Options:

  • --check-content option specify whether to check content hash when copying files. When the flag is turned on, the content hash of the target file will be checked at the end of each file copy operation to ensure it matches the original. If the check fails after the configured retries, an error will be reported and the copied artifact for that file in the target directory will be deleted.
  • --partial-listing option specify whether to using batch listStatus API or traditional listStatus. This limits the memory usage and starts copy sooner for larger directory. But progress report cannot report on the total number of files because the whole directory is not listed yet.

After submit the command, you can check the status by running the following

$ ./bin/alluxio job copy --src <src> --dst <dst> --progress [--format TEXT|JSON] [--verbose]

And you would get the following output for a running job:

Progress for jobId 1c849041-ef26-4ed7-a932-2af5549754d7 copying path '/dir-99' to '/dir-100':
        Settings: "check-content: false"
        Job Submitted: 2023-06-30 12:30:45.0
        Job Id: 111111
        Job State: RUNNING
        Files qualified so far: 1, 826.38MB
        Files Failed: 0
        Files Skipped: 0
        Files Succeeded: 1
        Bytes Copied: 826.38MB
        Throughput: 1621.09KB/s
        Files failure rate: 0.00%

You would get the following output for a finished job:

Progress for jobId 1c849041-ef26-4ed7-a932-2af5549754d7 copying path '/dir-99' to '/dir-100':
        Settings: "check-content: false"
        Job Submitted: 2023-06-30 12:30:45.0
        Job Id: 111111
        Job State: SUCCEEDED, finished at 2023-06-30 12:45:50.1
        Files qualified: 1, 826.38MB
        Files Failed: 0
        Files Skipped: 0
        Files Succeeded: 1
        Bytes Copied: 826.38MB
        Throughput: 1621.09KB/s
        Files failure rate: 0.00%

Options:

  • --format option specify output format. TEXT as default
  • --verbose option output job details.

If you want to stop the command by running the following

$ ./bin/alluxio job copy --src <src> --dst <dst> --stop

And you would get the following output:

Copy job from '/dir-99' to '/dir-100' is successfully stopped.

Note:

  • When a job is terminated, the job status will show as STOPPED
  • Tasks that already started will not be terminated, and the list will not be reported to REST API
  • Alluxio scans file by batches, so Files qualified may not be the final count when it is in progress. When a job finishes, one can expect that Files Qualified = Files Failed + Files Skipped + Files Succeeded
  • Alluxio actually counts objects instead of files. Objects include files and directories.
  • If a new task for the same file gets kicked off before the previous task finishes, one of them will error out
  • The max time a user need to wait for a task to finish after STOPPED equal to the configured gRPC timeline time (this configuration(alluxio.user.network.rpc.keepalive.timeout) is global for the entire cluster)

Delta copy:

  • Copy now supports delta copy, which will skip copying files or directories that is already existed on the target side.

Limitations:

  • Currently, copy only supports S3, HDFS and GCS as UFSs.

move

The move operator copies a file or directory in the Alluxio file system distributed across workers using the scheduler and deletes the original file or directory.

If move is run on a directory, files in the directory will be recursively moved.

$ ./bin/alluxio job move --src <src> --dst <dst> --submit [--check-content] [--partial-listing]

Options:

  • --check-content option specify whether to check content hash when moving files. When the flag is turned on, the content hash of the target file will be checked at the end of each file copy operation to ensure it matches the original. If the check fails after the configured retries, an error will be reported and the copied artifact for that file in the target directory will be deleted.
  • --partial-listing option specify whether to using batch listStatus API or traditional listStatus. This limits the memory usage and starts move sooner for larger directory. But progress report cannot report on the total number of files because the whole directory is not listed yet.

After submit the command, you can check the status by running the following

$ ./bin/alluxio job move --src <src> --dst <dst> --progress [--format TEXT|JSON] [--verbose]

And you would get the following output for a running job:

Progress for jobId 1c849041-ef26-4ed7-a932-2af5549754d7 moving path '/dir-99' to '/dir-100':
        Settings: "check-content: false"
        Job Submitted: 2023-06-30 12:30:45.0
        Job Id: 111111
        Job State: RUNNING
        Files qualified so far: 1, 826.38MB
        Files Failed: 0
        Files Succeeded: 1
        Bytes Moved: 826.38MB
        Throughput: 1621.09KB/s
        Files failure rate: 0.00%

You would get the following output for a finished job:

Progress for jobId 1c849041-ef26-4ed7-a932-2af5549754d7 moving path '/dir-99' to '/dir-100':
        Settings: "check-content: false"
        Job Submitted: 2023-06-30 12:30:45.0
        Job Id: 111111
        Job State: SUCCEEDED, finished at 2023-06-30 12:45:50.1
        Files qualified: 1, 826.38MB
        Files Failed: 0
        Files Succeeded: 1
        Bytes Moved: 826.38MB
        Throughput: 1621.09KB/s
        Files failure rate: 0.00%
Options:
* `--format` option specify output format. TEXT as default
* `--verbose` option output job details.

If you want to stop the command by running the following
```console
$ ./bin/alluxio job move --src <src> --dst <dst> --stop

And you would get the following output:

Move job from '/dir-99' to '/dir-100' is successfully stopped.

Note:

  • When a job is terminated, the job status will show as STOPPED
  • Tasks that already started will not be terminated, and the list will not be reported to REST API
  • Alluxio scans file by batches, so Files qualified may not be the final count when it is in progress. When a job finishes, one can expect that Files Qualified = Files Failed + Files Skipped + Files Succeeded
  • Alluxio actually counts objects instead of files. Objects include files and directories.
  • If a new task for the same file gets kicked off before the previous task finishes, one of them will error out
  • The max time a user need to wait for a task to finish after STOPPED equal to the configured gRPC timeline time (this configuration(alluxio.user.network.rpc.keepalive.timeout) is global for the entire cluster)

Limitations:

  • Currently, move only supports S3, HDFS and GCS as UFSs.
  • Note that when moving files from HDFS, GCS(or similar file system) to another UFS, there might be concurrent issue with deleting directories, we would leave empty directory in the source directory even files get moved.

load

The load operator loads data/metadata from the under storage system into Alluxio storage. For example, load can be used to prefetch data for analytics jobs. If load is run on a directory, files in the directory will be recursively loaded.

$ ./bin/alluxio job load --path <path> --submit [--metadata-only]

Options:

  • --metadata-only option specify whether loading metadata only

After submit the command, you can check the status by running the following

$ ./bin/alluxio job load --path <path> --progress [--format TEXT|JSON] [--verbose]

And you would get the following output:

Progress for loading path '/dir-99':
        Settings:       bandwidth: unlimited    verify: false
        Job State: SUCCEEDED
        Files Processed: 1000
        Bytes Loaded: 125.00MB
        Throughput: 2509.80KB/s
        Block load failure rate: 0.00%
        Files Failed: 0

Options:

  • --format option specify output format. TEXT as default
  • --verbose option output job details.

If you want to stop the command by running the following

$ ./bin/alluxio job load --path <path> --stop