Policy-Driven Data Management

Slack Docker Pulls

Moving data from one storage system to another, while keeping computation jobs running correctly is a common challenge for many Alluxio users. Traditionally this is done by manually copying the data to the destination, waiting until all the updates are complete, then finally removing the original copy. With Alluxio, this can be done seamlessly by Policy-Driven Data Management

For example, if a user wants to move data from hdfs://hdfs_cluster/data/ to s3://user_bucket/, they can create a migration policy to move data from HDFS to S3. Alluxio will enforce the policy and automatically move the data from HDFS to S3 in the background.

Moving Data with Policies

Files in different UFS can be moved by setting policies. A policy can be added by the following command:

./bin/alluxio policy add --name <POLICY_NAME> --operation <OPERATION> --src <SOURCE_PATH> --dst <DESTINATION_PATH> --time <TIME_EXPRESSION> --filter <FILTER_CONDITION> --check-content

In the command above:

  • <POLICY_NAME> specifies the name of the policy, must be a unique string containing only alphanumeric characters. If the name is not specified, an automatically generated name will be used.
  • <OPERATION> specifies how the data is affected on this storage location. It can be:
    • move to indicate the data should be moved from the source path to the destination path. For more details, see here .
    • copy to indicate the data should be copied from the source path to the destination path. For more details, see here .
  • <SOURCE_PATH> specifies a source Alluxio path where the policy will apply.
  • <DESTINATION_PATH> specifies a destination Alluxio path where the policy will apply.
  • <TIME_EXPRESSION> specifies a time expression that matches cron format to trigger the policy. The cron expression doc can be found here. You can use this Cron Parser to help understand the cron expression here.
    • NOTE: We do not support policy which are triggered in a frequency more frequent than every 30 seconds. It’s better not to set the policy too frequent. Otherwise, there’s little chance that the user might get an unexpected status. If the previous PDDM run is still running, the new PDDM cron job will not be triggered, it will be skipped.
  • --check-content the flag specifies whether to check content hash when copying files. Only turn on when we are able to compare src/dst content hash with same algorithm.
  • <FILTER_CONDITION> specifies a filter condition that the policy will be executed. Filter condition is specified in the format of <conditionType>(startTime[, endTime]) There are three types of filter condition:
    • unmodifiedFor: the policy will be executed when a file or directory is not modified for a certain period of time.
    • dateFromFileNameOlderThan: the date will be extracted from the file name based on the pattern specified by the user. The pattern needs to include YYYY, which represents the year, MM, which represents the month, DD, which represents the date of the month. For example, if the pattern is bbbbYYYYMMDD, then we can extract date from some file names like: aaaa20230301, bbbb20230301, aaaa20230301aaaa. An exception will be thrown when users are trying to add a pattern without YYYY, MM and DD. You can add the policy with a flag named file_date_pattern to specify the pattern. For example,
      /bin/alluxio policy add --name ufsMigrate --operation copy --src /pddm/online/ --dst /pddm/test --time "0 0 12 * * *" --filter "dateFromFileNameOlderThan(2d)" --file_date_pattern YYYY-MM-DD
      
    • lastModifiedDate: the policy will be executed when the last modified date of a file or directory is within a certain period. The format of the date should be YYYY/MM/DD, otherwise the policy will be invalid.

    The time period can be specified using a single start time, or a start time and an end time. For each time value, use s, m, h, d to indicate time unit seconds, minutes, hours, and days. For example,

    • unmodifiedFor(30m) sets a policy to execute after a file is not modified for 30 minutes.
    • dateFromFileNameOlderThan(2d) sets a policy to execute after the date from the file name is older than 2 days.
    • dateFromFileNameOlderThan(1d, 3d) sets a policy to execute after the date from the file name is older than 1 day, but younger than 3 days.

    The format for lastModifiedDate filter is a little bit different. The startTime and endTime shall be specific dates. For example,

    • lastModifiedDate(2022/10/10) sets a policy to execute if a file is last modified after 2022/10/10, including 2022/10/10.
    • lastModifiedDate(2022/10/10, 2023/08/01) sets a policy to execute if a file is last modified after 2022/10/10 and before 2023/08/01, including 2022/10/10, but excluding 2023/08/01.

For example, if we want to move data from hdfs://local_hdfs/user_a/mydir to s3://mybucket/mydir, the command below will add a policy to move files once they are older than 2 days on a daily basis:

./bin/alluxio policy add --name ufsMigrate --operation move --src "hdfs://local_hdfs/user_a/mydir" --dst "s3://mybucket/mydir" --time "0 0 0 * * *" --filter "unmodifiedFor(2d)"

If we want to move data from HDFS to S3 that is last unmodified after 2022/10/10(inclusive), but before 2023/08/01(exclusive) every day at midnight

./bin/alluxio policy add --name ufsMigrate --operation move --src "hdfs://local_hdfs/user_a/mydir" --dst "s3://mybucket/mydir" --time "0 0 0 * * *" --filter "lastModifiedDate(2022/10/10, 2023/08/01)"

After a policy is added, it will be executed in the background after all its requirements are met.

Managing Policies

To list all policies, use the following command:

./bin/alluxio policy list

To check the status of a specific policy, use the following command:

./bin/alluxio policy status <POLICY_NAME>

where <POLICY_NAME> should be the name of the policy that is listed in the policy list command. This command will print out a summary of the policy execution status like the following examples:

Policy <POLICY_NAME>:

Last execution triggered at 11-03-2019 12:23:01:012
  PolicyExecutionId: 3
        Settings: "check-content: true"
        Job Submitted: 2019-11-03 12:23:01.0
        Job State: SUCCEEDED, finished at 2019-11-03 12:43:01.0
        Files qualified: 1000, 125.00MB
        Files Failed: 0
        Files Skipped: 0
        Files Succeeded: 1000
        Bytes Copied: 125.00MB
        Throughput: 2509.80KB/s
        Files failure rate: 0.00%

History since server started:
PolicyExecution 2 triggered at 11-02-2019 12:23:01:012
PolicyExecution 1 triggered at 11-01-2019 12:23:01:012
Policy <POLICY_NAME>:

Last execution triggered at 11-03-2019 12:23:01:012
  PolicyExecutionId: 3
        Settings: "check-content: true"
        Job Submitted: 2019-11-03 12:23:01.0
        Job State: Running
        Files qualified so far: 500, 60.00MB
        Files Failed: 0
        Files Skipped: 0
        Files Succeeded: 250
        Bytes Copied: 30.00MB
        Throughput: 2509.80KB/s
        Files failure rate: 0.00%

History since server started:
PolicyExecution 2 triggered at 11-02-2019 12:23:01:012
PolicyExecution 1 triggered at 11-01-2019 12:23:01:012

To remove a policy so it is no longer executed, use the following command:

./bin/alluxio policy remove <POLICY_NAME>

You can also manually trigger the execution on the policy with the following command.

./bin/alluxio policy trigger <POLICY_NAME>

Example: Running Alluxio Locally with PDDM

First, start the Alluxio servers locally:

./bin/alluxio journal format
./bin/alluxio process start local

This will start one Alluxio master and one Alluxio worker locally. You can see the master UI at http://localhost:19999.

Run a simple example program:

./bin/alluxio exec basicIOTests

If the test fails with permission errors, make sure that the current user (${USER}) has read/write access to the local directory mounted to Alluxio. By default, the login user is the current user of the host OS. To change the user, set the value of alluxio.security.login.username in conf/alluxio-site.properties to the desired username.

After this succeeds, create two local directories and create test files. In this example we will use /tmp/ufs1 and /tmp/ufs2:

mkdir /tmp/ufs1
mkdir /tmp/ufs2
touch /tmp/ufs1/1

Setup a policy to move from UFS /tmp/ufs1 to UFS /tmp/ufs2:

./bin/alluxio policy add --name ufsMigrate --operation move --src "/tmp/ufs1" --dst "/tmp/ufs2" --time "5 * * * * *" --filter "unmodifiedFor(1s)"

Wait 5 second for the policy to be executed, then check the local directories again. The file should now be moved from /tmp/ufs1 to /tmp/ufs2.

Stop Alluxio by running:

./bin/alluxio process stop local

Deployment

Minimal Resource requirement: 1 Master + 1 Worker Master: 4 cores, 8G memory, 8G disk Worker: 4 cores, 4G memory, 4G disk, good network to UFS Recommended Resource requirement: Add more workers if we need more UFS bandwidth

What happens if there is a fatal error on the Alluxio cluster? In worst case, restart workers & master. Policy will persist on master and Policy execution will be resumed after restart