User Guide
- Prerequisites
- Getting Started
- Configuration for YARN Cluster Mode
- Configuration for Spark Standalone Mode
- Working with SQL Index
- Working with SQL Data Source Cache
- Run TPC-DS Benchmark
- Advanced Configuration
Prerequisites
SQL Index and Data Source Cache on Spark requires a working Hadoop cluster with YARN and Spark. Running Spark on YARN requires a binary distribution of Spark, which is built with YARN support. We provide pre-built Spark-3.0.0 based on hadoop-2.7 with numa patch applied to accelerate performance. If you use a different hadoop version, you should build it from source here.
Getting Started
Building
We have provided a Conda package which will automatically install dependencies and build OAP jars, please follow OAP-Installation-Guide and you can find compiled OAP jars under
$HOME/miniconda2/envs/oapenv/oap_jars
once finished the installation.
If you’d like to build from source code, please refer to Developer Guide for the detailed steps.
Spark Configurations
Users usually test and run Spark SQL or Scala scripts in Spark Shell, which launches Spark applications on YRAN with client mode. In this section, we will start with Spark Shell then introduce other use scenarios.
Before you run $SPARK_HOME/bin/spark-shell
, you need to configure Spark for integration. You need to add or update the following configurations in the Spark configuration file $SPARK_HOME/conf/spark-defaults.conf
on your working node.
spark.sql.extensions org.apache.spark.sql.OapExtensions
# absolute path of the jar on your working node
spark.files $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar,$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
# relative path of the jar
spark.executor.extraClassPath ./oap-cache-<version>-with-spark-<version>.jar:./oap-common-<version>-with-spark-<version>.jar
# absolute path of the jar on your working node
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar:$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
Verify Integration
After configuration, you can follow these steps to verify the OAP integration is working using Spark Shell.
- Create a test data path on your HDFS.
hdfs:///user/oap/
for example. ``` hadoop fs -mkdir /user/oap/
2. Launch Spark Shell using the following command on your working node.
$SPARK_HOME/bin/spark-shell
```
- Execute the following commands in Spark Shell to test OAP integration.
```
spark.sql(s"""CREATE TABLE oap_test (a INT, b STRING) USING parquet OPTIONS (path 'hdfs:///user/oap/')""".stripMargin) val data = (1 to 30000).map { i => (i, s"this is test $i") }.toDF().createOrReplaceTempView("t") spark.sql("insert overwrite table oap_test select * from t") spark.sql("create oindex index1 on oap_test (a)") spark.sql("show oindex from oap_test").show() ```
This test creates an index for a table and then shows it. If there are no errors, the OAP .jar
is working with the configuration. The picture below is an example of a successfully run.
Configuration for YARN Cluster Mode
Spark Shell, Spark SQL CLI and Thrift Sever run Spark application in client mode. While Spark Submit tool can run Spark application in client or cluster mode, which is decided by --deploy-mode
parameter. Getting Started session has shown the configurations needed for client mode. If you are running Spark Submit tool in cluster mode, you need to follow the below configuration steps instead.
Add the following OAP configuration settings to $SPARK_HOME/conf/spark-defaults.conf
on your working node before running spark-submit
in cluster mode.
spark.sql.extensions org.apache.spark.sql.OapExtensions
# absolute path on your working node
spark.files $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar,$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
# relative path
spark.executor.extraClassPath ./oap-cache-<version>-with-spark-<version>.jar:./oap-common-<version>-with-spark-<version>.jar
# relative path
spark.driver.extraClassPath ./oap-cache-<version>-with-spark-<version>.jar:./oap-common-<version>-with-spark-<version>.jar
Configuration for Spark Standalone Mode
In addition to running on the YARN cluster manager, Spark also provides a simple standalone deploy mode. If you are using Spark in Spark Standalone mode:
- Copy the OAP
.jar
to all the worker nodes. - Add the following configuration settings to
$SPARK_HOME/conf/spark-defaults.conf
to the working node.
spark.sql.extensions org.apache.spark.sql.OapExtensions
# absolute path on worker nodes
spark.executor.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar:$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
# absolute path on worker nodes
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar:$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
Working with SQL Index
After a successful OAP integration, you can use OAP SQL DDL to manage table indexes. The DDL operations include index create
, drop
, refresh
, and show
. Test these functions using the following examples in Spark Shell.
> spark.sql(s"""CREATE TABLE oap_test (a INT, b STRING)
USING parquet
OPTIONS (path 'hdfs:///user/oap/')""".stripMargin)
> val data = (1 to 30000).map { i => (i, s"this is test $i") }.toDF().createOrReplaceTempView("t")
> spark.sql("insert overwrite table oap_test select * from t")
Index Creation
Use the CREATE OINDEX DDL command to create a B+ Tree index or bitmap index.
CREATE OINDEX index_name ON table_name (column_name) USING [BTREE, BITMAP]
The following example creates a B+ Tree index on column "a" of the oap_test
table.
> spark.sql("create oindex index1 on oap_test (a)")
Use SHOW OINDEX command to show all the created indexes on a specified table.
> spark.sql("show oindex from oap_test").show()
Use Index
Using index in a query is transparent. When SQL queries have filter conditions on the column(s) which can take advantage of the index to filter the data scan, the index will automatically be applied to the execution of Spark SQL. The following example will automatically use the underlayer index created on column "a".
> spark.sql("SELECT * FROM oap_test WHERE a = 1").show()
Drop index
Use DROP OINDEX command to drop a named index.
> spark.sql("drop oindex index1 on oap_test")
Working with SQL Data Source Cache
Data Source Cache can provide input data cache functionality to the executor. When using the cache data among different SQL queries, configure cache to allow different SQL queries to use the same executor process. Do this by running your queries through the Spark ThriftServer as shown below. For cache media, we support both DRAM and Intel PMem which means you can choose to cache data in DRAM or Intel PMem if you have PMem configured in hardware.
Use DRAM Cache
- Make the following configuration changes in Spark configuration file
$SPARK_HOME/conf/spark-defaults.conf
.
spark.memory.offHeap.enabled false
spark.oap.cache.strategy guava
spark.sql.oap.cache.memory.manager offheap
# according to the resource of cluster
spark.executor.memoryOverhead 50g
# equal to the size of executor.memoryOverhead
spark.executor.sql.oap.cache.offheap.memory.size 50g
# for parquet fileformat, enable binary cache
spark.sql.oap.parquet.binary.cache.enabled true
# for orc fileformat, enable binary cache
spark.sql.oap.orc.binary.cache.enabled true
NOTE: Change spark.executor.sql.oap.cache.offheap.memory.size
based on the availability of DRAM capacity to cache data, and its size is equal to spark.executor.memoryOverhead
- Launch Spark ThriftServer
Launch Spark Thrift Server, and use the Beeline command line tool to connect to the Thrift Server to execute DDL or DML operations. The data cache will automatically take effect for Parquet or ORC file sources.
The rest of this section will show you how to do a quick verification of cache functionality. It will reuse the database metastore created in the Working with SQL Index section, which creates the oap_test
table definition. In production, Spark Thrift Server will have its own metastore database directory or metastore service and use DDL's through Beeline for creating your tables.
When you run spark-shell
to create the oap_test
table, metastore_db
will be created in the directory where you ran '$SPARK_HOME/bin/spark-shell'. Go to that directory and execute the following command to launch Thrift JDBC server and run queries.
. $SPARK_HOME/sbin/start-thriftserver.sh
- Use Beeline and connect to the Thrift JDBC server, replacing the hostname (mythriftserver) with your own Thrift Server hostname.
. $SPARK_HOME/bin/beeline -u jdbc:hive2://<mythriftserver>:10000
After the connection is established, execute the following commands to check the metastore is initialized correctly.
```
SHOW databases; USE default; SHOW tables; ```
- Run queries on the table that will use the cache automatically. For example,
```
SELECT * FROM oap_test WHERE a = 1; SELECT * FROM oap_test WHERE a = 2; SELECT * FROM oap_test WHERE a = 3; ... ```
- Open the Spark History Web UI and go to the OAP tab page to verify the cache metrics. The following picture is an example.
Use PMem Cache
Prerequisites
The following are required to configure OAP to use PMem cache.
-
PMem hardware is successfully deployed on each node in cluster.
-
Directories exposing PMem hardware on each socket. For example, on a two socket system the mounted PMem directories should appear as
/mnt/pmem0
and/mnt/pmem1
. Correctly installed PMem must be formatted and mounted on every cluster worker node.
// use ipmctl command to show topology and dimm info of PMem
ipmctl show -topology
ipmctl show -dimm
// provision PMem in app direct mode
ipmctl create -goal PersistentMemoryType=AppDirect
// reboot system to make configuration take affect
reboot
// check capacity provisioned for app direct mode(AppDirectCapacity)
ipmctl show -memoryresources
// show the PMem region information
ipmctl show -region
// create namespace based on the region, multi namespaces can be created on a single region
ndctl create-namespace -m fsdax -r region0
ndctl create-namespace -m fsdax -r region1
// show the created namespaces
fdisk -l
// create and mount file system
echo y | mkfs.ext4 /dev/pmem0
echo y | mkfs.ext4 /dev/pmem1
mkdir -p /mnt/pmem0
mkdir -p /mnt/pmem1
mount -o dax /dev/pmem0 /mnt/pmem0
mount -o dax /dev/pmem1 /mnt/pmem1
In this case file systems are generated for 2 numa nodes, which can be checked by "numactl --hardware". For a different number of numa nodes, a corresponding number of namespaces should be created to assure correct file system paths mapping to numa nodes.
For more information you can refer to Quick Start Guide: Provision Intel® Optane™ DC Persistent Memory
- Make sure Vmemcache library has been installed on every cluster worker node if vmemcache strategy is chosen for PMem cache. If you have finished OAP-Installation-Guide, vmemcache library will be automatically installed by Conda.
Or you can follow the build/install steps and make sure libvmemcache.so
exist in /lib64
directory in each worker node.
- Currently, using Community Spark occasionally has the problem of two executors being bound to the same PMem path, so we recommend you use our pre-built numa-patched Spark-3.0.0, which can not only improve performance, but also solve this problem.
Configure for NUMA
- Install
numactl
to bind the executor to the PMem device on the same NUMA node.
yum install numactl -y
- We strongly recommend you use numa-patched Spark to achieve better performance gain.
Build Spark from source to enable numa-binding support, refer to enable-numa-binding-for-PMem-in-spark. Or you can just download our pre-built numa-patched Spark-3.0.0.
Configure for PMem
Create persistent-memory.xml
in $SPARK_HOME/conf/
if it doesn't exist. Use the following template and change the initialPath
to your mounted paths for PMem devices.
<persistentMemoryPool>
<!--The numa id-->
<numanode id="0">
<!--The initial path for Intel Optane DC persistent memory-->
<initialPath>/mnt/pmem0</initialPath>
</numanode>
<numanode id="1">
<initialPath>/mnt/pmem1</initialPath>
</numanode>
</persistentMemoryPool>
Configure to enable PMem cache
Make the following configuration changes in $SPARK_HOME/conf/spark-defaults.conf
.
# 2x number of your worker nodes
spark.executor.instances 6
# enable numa
spark.yarn.numa.enabled true
# Enable OAP jar in Spark
spark.sql.extensions org.apache.spark.sql.OapExtensions
# absolute path of the jar on your working node, when in Yarn client mode
spark.files $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar,$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
# relative path of the jar, when in Yarn client mode
spark.executor.extraClassPath ./oap-cache-<version>-with-spark-<version>.jar:./oap-common-<version>-with-spark-<version>.jar
# absolute path of the jar on your working node,when in Yarn client mode
spark.driver.extraClassPath $HOME/miniconda2/envs/oapenv/oap_jars/oap-cache-<version>-with-spark-<version>.jar:$HOME/miniconda2/envs/oapenv/oap_jars/oap-common-<version>-with-spark-<version>.jar
# for parquet file format, enable binary cache
spark.sql.oap.parquet.binary.cache.enabled true
# for ORC file format, enable binary cache
spark.sql.oap.orc.binary.cache.enabled true
spark.oap.cache.strategy vmem
spark.executor.sql.oap.cache.persistent.memory.initial.size 256g
# according to your cluster
spark.executor.sql.oap.cache.guardian.memory.size 10g
The vmem
cache strategy is based on libvmemcache (buffer based LRU cache), which provides a key-value store API. Follow these steps to enable vmemcache support in Data Source Cache.
spark.executor.instances
: We suggest setting the value to 2X the number of worker nodes when NUMA binding is enabled. Each worker node runs two executors, each executor is bound to one of the two sockets, and accesses the corresponding PMem device on that socket.spark.executor.sql.oap.cache.persistent.memory.initial.size
: It is configured to the available PMem capacity to be used as data cache per exectutor.
NOTE: If "PendingFiber Size" (on spark web-UI OAP page) is large, or some tasks fail with "cache guardian use too much memory" error, set spark.executor.sql.oap.cache.guardian.memory.size
to a larger number as the default size is 10GB. The user could also increase spark.sql.oap.cache.guardian.free.thread.nums
or decrease spark.sql.oap.cache.dispose.timeout.ms
to free memory more quickly.
Verify PMem cache functionality
-
After finishing configuration, restart Spark Thrift Server for the configuration changes to take effect. Start at step 2 of the Use DRAM Cache guide to verify that cache is working correctly.
-
Verify NUMA binding status by confirming keywords like
numactl --cpubind=1 --membind=1
contained in executor launch command. -
Check PMem cache size by checking disk space with
df -h
.Forvmemcache
strategy, disk usage will reach the initial cache size once the PMem cache is initialized and will not change during workload execution. ForGuava/Noevict
strategies, the command will show disk space usage increases along with workload execution.
Run TPC-DS Benchmark
This section provides instructions and tools for running TPC-DS queries to evaluate the cache performance of various configurations. The TPC-DS suite has many queries and we select 9 I/O intensive queries to simplify performance evaluation.
We created some tool scripts oap-benchmark-tool.zip to simplify running the workload. If you are already familiar with TPC-DS data generation and running a TPC-DS tool suite, skip our tool and use the TPC-DS tool suite directly.
Prerequisites
- Python 2.7+ is required on the working node.
Prepare the Tool
- Download oap-benchmark-tool.zip and unzip to a folder (for example,
oap-benchmark-tool
folder) on your working node. - Copy
oap-benchmark-tool/tools/tpcds-kits
to ALL worker nodes under the same folder (for example,/home/oap/tpcds-kits
).
Generate TPC-DS Data
-
Update the values for the following variables in
oap-benchmark-tool/scripts/tool.conf
based on your environment and needs. -
SPARK_HOME: Point to the Spark home directory of your Spark setup.
- TPCDS_KITS_DIR: The tpcds-kits directory you coped to the worker nodes in the above prepare process. For example, /home/oap/tpcds-kits
- NAMENODE_ADDRESS: Your HDFS Namenode address in the format of host:port.
- THRIFT_SERVER_ADDRESS: Your working node address on which you will run Thrift Server.
- DATA_SCALE: The data scale to be generated in GB
- DATA_FORMAT: The data file format. You can specify parquet or orc
For example:
export SPARK_HOME=/home/oap/spark-3.0.0
export TPCDS_KITS_DIR=/home/oap/tpcds-kits
export NAMENODE_ADDRESS=mynamenode:9000
export THRIFT_SERVER_ADDRESS=mythriftserver
export DATA_SCALE=1024
export DATA_FORMAT=parquet
- Start data generation.
In the root directory of this tool (oap-benchmark-tool
), run scripts/run_gen_data.sh
to start the data generation process.
cd oap-benchmark-tool
sh ./scripts/run_gen_data.sh
Once finished, the $scale
data will be generated in the HDFS folder genData$scale
. And a database called tpcds$scale
will contain the TPC-DS tables.
Start Spark Thrift Server
Start the Thrift Server in the tool root folder, which is the same folder you run data generation scripts. Use either the PMem or DRAM script to start the Thrift Server.
Use PMem as Cache Media
Update the configuration values in scripts/spark_thrift_server_yarn_with_PMem.sh
to reflect your environment.
Normally, you need to update the following configuration values to cache to PMem.
- --driver-memory
- --executor-memory
- --executor-cores
- --conf spark.oap.cache.strategy
- --conf spark.executor.sql.oap.cache.guardian.memory.size
- --conf spark.executor.sql.oap.cache.persistent.memory.initial.size
These settings will override the values specified in Spark configuration file ( spark-defaults.conf
). After the configuration is done, you can execute the following command to start Thrift Server.
cd oap-benchmark-tool
sh ./scripts/spark_thrift_server_yarn_with_PMem.sh start
In this script, we use vmem
as cache strategy for Parquet Binary data cache.
Use DRAM as Cache Media
Update the configuration values in scripts/spark_thrift_server_yarn_with_DRAM.sh
to reflect your environment. Normally, you need to update the following configuration values to cache to DRAM.
- --driver-memory
- --executor-memory
- --executor-cores
- --conf spark.executor.sql.oap.cache.offheap.memory.size
- --conf spark.executor.memoryOverhead
These settings will override the values specified in Spark configuration file (spark-defaults.conf
). After the configuration is done, you can execute the following command to start Thrift Server.
cd oap-benchmark-tool
sh ./scripts/spark_thrift_server_yarn_with_DRAM.sh start
Run Queries
Execute the following command to start to run queries.
cd oap-benchmark-tool
sh ./scripts/run_tpcds.sh
When all the queries are done, you will see the result.json
file in the current directory.
Advanced Configuration
In addition to vmem cache strategy, SQL Data Source Cache also supports 3 other cache strategies: guava, noevict and external cache. - Index and Data Cache Separation
To optimize the cache media utilization, SQL Data Source Cache supports cache separation of data and index, by using same or different cache media with DRAM and PMem. - Cache Hot Tables
Data Source Cache also supports caching specific tables according to actual situations, these tables are usually hot tables. - Column Vector Cache
This document above use binary cache as example for Parquet file format, if your cluster memory resources is abundant enough, you can choose ColumnVector data cache instead of binary cache for Parquet to spare computation time. - - Large Scale and Heterogeneous Cluster Support
Introduce an external database to store cache locality info to support large-scale and heterogeneous clusters.
For more information and configuration details, please refer to Advanced Configuration.