Running Spark on Alluxio

Slack Docker Pulls

This guide describes how to run Apache Spark on Alluxio. HDFS is used as an example of a distributed under storage system. Note that, Alluxio supports many other under storage systems in addition to HDFS and enables frameworks like Spark to read data from or write data to any number of those systems.

Compatibility

Alluxio works together with Spark 1.1 or later out-of-the-box.

Prerequisites

The prerequisite for this part is that you have Java and an instance of Alluxio installed.

General Setup

  • Add the following line to spark/conf/spark-defaults.conf, updating the value to refer to your Alluxio install path.
spark.driver.extraClassPath /path/to/alluxio/client/alluxio-enterprise-1.8.0-client.jar
spark.executor.extraClassPath /path/to/alluxio/client/alluxio-enterprise-1.8.0-client.jar

Additional Setup for HDFS 1.x

  • Create a new file ${SPARK_HOME}/conf/core-site.xml with the following content:
<configuration>
  <property>
    <name>fs.alluxio.impl</name>
    <value>alluxio.hadoop.FileSystem</value>
  </property>
</configuration>

Check Spark with Alluxio integration (Supports Spark 2.X)

Before running Spark on Alluxio, you might want to make sure that your Spark configuration has been setup correctly for integrating with Alluxio. The Spark integration checker can help you achieve this.

When you have a running Spark cluster (or Spark standalone), you can run the following command in the Alluxio project directory:

$ integration/checker/bin/alluxio-checker.sh spark <spark master uri> [partition number]

Here partition number is optional. You can use -h to display helpful information about the command. This command will report potential problems that might prevent you from running Spark on Alluxio.

Use Alluxio as Input and Output

This section shows how to use Alluxio as input and output sources for your Spark applications.

Use Data Already in Alluxio

First, we will copy some local data to the Alluxio file system. Put the file conf/alluxio-site.properties.template into Alluxio, assuming you are in the Alluxio project directory:

$ bin/alluxio fs copyFromLocal conf/alluxio-site.properties.template /alluxio-site.properties.template

Run the following commands from spark-shell, assuming Alluxio Master is running on localhost. Note that if the cluster is not local, localhost should be replaced with the Alluxio master’s hostname.

> val s = sc.textFile("alluxio://localhost:19998/alluxio-site.properties.template")
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://localhost:19998/alluxio-site.properties.template2")

Open your browser and check http://localhost:19999/browse. There should be an output directory alluxio-site.properties.template2 which doubles each line in the file alluxio-site.properties.template.

Use Data from HDFS

Alluxio supports transparently fetching the data from the under storage system, given the exact path. Put a file conf/alluxio-site.properties.template into HDFS under the folder Alluxio is mounted to. The default mount location is /alluxio, meaning any files in HDFS within this folder will be discoverable by Alluxio. You can modify this setting by changing the ALLUXIO_UNDERFS_ADDRESS property in conf/alluxio-env.sh on the server.

Assuming the namenode is running on localhost and you are using the default mount directory /alluxio:

$ hadoop fs -put -f conf/alluxio-site.properties.template hdfs://localhost:9000/alluxio/alluxio-site.properties.template

Note that Alluxio has no notion of the file. You can verify this by going to the web UI. Run the following commands from spark-shell, assuming Alluxio Master is running on localhost:

> val s = sc.textFile("alluxio://localhost:19998/alluxio-site.properties.template")
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://localhost:19998/alluxio-site.properties.template2")

Open your browser and check http://localhost:19999/browse. There should be an output directory alluxio-site.properties.template2 which doubles each line in the file alluxio-site.properties.template. Also, the alluxio-site.properties.template file now appears in the Alluxio file system space.

NOTE: Block caching on partial reads is enabled by default, but if you have turned off the option, it is possible that the alluxio-site.properties.template file is not in Alluxio storage (Not In-Memory). This is because Alluxio only stores fully read blocks, and if the file is too small, the Spark job will have each executor read a partial block. To avoid this behavior, you can specify the partition count in Spark. For this example, we would set it to 1 as there is only 1 block.

> val s = sc.textFile("alluxio://localhost:19998/alluxio-site.properties.template", 1)
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://localhost:19998/alluxio-site.properties.template2")

Using Fault Tolerant Mode

When running Alluxio with fault tolerant mode with zookeeper, add the following line to ${SPARK_HOME}/conf/spark-defaults.conf:

spark.driver.extraJavaOptions -Dalluxio.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181 -Dalluxio.zookeeper.enabled=true
spark.executor.extraJavaOptions -Dalluxio.zookeeper.address=zookeeperHost1:2181,zookeeperHost2:2181 -Dalluxio.zookeeper.enabled=true

Alternatively you can add the properties to the previously created Hadoop configuration file ${SPARK_HOME}/conf/core-site.xml:

<configuration>
  <property>
    <name>alluxio.zookeeper.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>alluxio.zookeeper.address</name>
    <value>[zookeeper_hostname]:2181</value>
  </property>
</configuration>

You can then access file on Alluxio by pointing to any Alluxio master:

> val s = sc.textFile("alluxio://standbyHost:19998/alluxio-site.properties.template")
> val double = s.map(line => line + line)
> double.saveAsTextFile("alluxio://activeHost:19998/alluxio-site.properties.template2")

Data Locality

If Spark task locality is ANY when it should be NODE_LOCAL, it is likely because Alluxio and Spark use different network address representations; one may be using hostnames while the other uses IP addresses. Please refer to this jira ticket for more details, where you can find solutions from the Spark community.

To be consistent with HDFS, Alluxio represents network address by hostname. There is a workaround when launching Spark to achieve data locality. Users can explicitly specify hostnames by using the following script offered in Spark. Start Spark Worker in each slave node with slave-hostname:

$ ${SPARK_HOME}/sbin/start-slave.sh -h <slave-hostname> <spark master uri>

For example:

$ ${SPARK_HOME}/sbin/start-slave.sh -h simple30 spark://simple27:7077

You can also set the SPARK_LOCAL_HOSTNAME in $SPARK_HOME/conf/spark-env.sh to achieve this. For example:

$ SPARK_LOCAL_HOSTNAME=simple30

In either way, the Spark Worker addresses become hostnames and Locality Level becomes NODE_LOCAL as shown in Spark Web UI below.

hostname

locality

Running Spark on YARN

To maximize the amount of locality your Spark jobs attain, you should use as many executors as possible, hopefully at least one executor per node. As with all methods of Alluxio deployment, there should also be an Alluxio worker on all computation nodes.

When a Spark job is run on YARN, Spark launches its executors without taking data locality into account. Spark will then correctly take data locality into account when deciding how to distribute tasks to its executors. For example, if host1 contains blockA and a job using blockA is launched on the YARN cluster with --num-executors=1, Spark might place the only executor on host2 and have poor locality. However, if --num-executors=2 and executors are started on host1 and host2, Spark will be smart enough to prioritize placing the job on host1.

Class alluxio.hadoop.FileSystem not found Issues with SparkSQL and Hive MetaStore

To run the spark-shell with the Alluxio client, the Alluxio client jar will have to be added to the classpath of the Spark driver and Spark executors, as described earlier. However, sometimes SparkSQL may fail to save tables to the Hive MetaStore (location in Alluxio), with an error message similar to the following:

org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.RuntimeException: java.lang.ClassNotFoundException: Class alluxio.hadoop.FileSystem not found)

The recommended solution is to configure spark.sql.hive.metastore.sharedPrefixes. In Spark 1.4.0 and later, Spark uses an isolated classloader to load java classes for accessing the Hive MetaStore. However, the isolated classloader ignores certain packages and allows the main classloader to load “shared” classes (the Hadoop HDFS client is one of these “shared” classes). The Alluxio client should also be loaded by the main classloader, and you can append the alluxio package to the configuration parameter spark.sql.hive.metastore.sharedPrefixes to inform Spark to load Alluxio with the main classloader. For example, the parameter may be set to:

spark.sql.hive.metastore.sharedPrefixes=com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc,alluxio

java.io.IOException: No FileSystem for scheme: alluxio Issue with Spark on YARN

If you use Spark on YARN with Alluxio and run into the exception java.io.IOException: No FileSystem for scheme: alluxio, please add the following content to ${SPARK_HOME}/conf/core-site.xml:

<configuration>
  <property>
    <name>fs.alluxio.impl</name>
    <value>alluxio.hadoop.FileSystem</value>
  </property>
</configuration>