Spark Writer¶
Overview¶
Spark Writer is a Spark-based distributed data importer for Nebula Graph. It converts data from multiple data sources into vertices and edges for graphs and batch imports data into the graph database. Currently, the supported data sources are:
- HDFS, including Parquet, JSON, ORC and CSV
- HIVE
Spark Writer supports concurrent importing multiple tags and edges, and configuring different data sources for different tags and edges.
Prerequisites¶
NOTE: To use Nebula Graph Spark Writer, please make sure you have:
- Spark 2.0 or above
- Hive 2.3 or above
- Hadoop 2.0 or above
Get Spark Writer¶
From Source Code¶
git clone https://github.com/vesoft-inc/nebula.git
cd nebula/src/tools/spark-sstfile-generator
mvn compile package
Or you can download from OSS.
Download From Cloud Storage OSS¶
wget https://oss-cdn.nebula-graph.io/jar-packages/sst.generator-1.2.1-beta.jar
User Guide¶
This section includes the following steps:
- Create a graph space and its schema in Nebula Graph
- Write data files
- Write input source mapping file
- Import data
Create Graph Space¶
Please refer to the example graph in Quick Start.
NOTE: Please create a space and define the schema in Nebula Graph first, then use this tool to import data to Nebula Graph.
Example Data¶
Vertices¶
A vertex data file consists of multiple rows. Each line in the file represents a vertex and its properties. In general, the first column is the ID of the vertex. This ID column is specified in the mapping file. The other columns are the properties of the vertex. Consider the following example in JSON format.
- Player data
{"id":100,"name":"Tim Duncan","age":42}
{"id":101,"name":"Tony Parker","age":36}
{"id":102,"name":"LaMarcus Aldridge","age":33}
Edges¶
An edge data file consists of multiple rows. Each line in the file represents a point and its properties. In general, the first column is the ID of the source vertex, the second column is the ID of the destination vertex. These ID columns are specified in the mapping file. Other columns are the properties of the edge. Consider the following example in JSON format.
Take edge follow as example:
- Edge without rank
{"source":100,"target":101,"likeness":95}
{"source":101,"target":100,"likeness":95}
{"source":101,"target":102,"likeness":90}
- Edge with rank
{"source":100,"target":101,"likeness":95,"ranking":2}
{"source":101,"target":100,"likeness":95,"ranking":1}
{"source":101,"target":102,"likeness":90,"ranking":3}
Spatial Data Geo¶
Spark Writer supports importing Geo data. Geo data contains latitude and longitude, and the data type is double.
{"latitude":30.2822095,"longitude":120.0298785,"target":0,"dp_poi_name":"0"}
{"latitude":30.2813834,"longitude":120.0208692,"target":1,"dp_poi_name":"1"}
{"latitude":30.2807347,"longitude":120.0181162,"target":2,"dp_poi_name":"2"}
{"latitude":30.2812694,"longitude":120.0164896,"target":3,"dp_poi_name":"3"}
Data Source Files¶
The currently supported data sources by Spark Writer are:
- HDFS
- HIVE
HDFS Files¶
HDFS supports the following file formats:
- Parquet
- JSON
- CSV
- ORC
Player data in Parquet format:
+-------+---+---------+
|age| id| name|
+-------+---+---------+
| 42|100| Tim Duncan |
| 36|101| Tony Parker|
+-------+---+---------+
In JSON:
{"id":100,"name":"Tim Duncan","age":42}
{"id":101,"name":"Tony Parker","age":36}
In CSV:
age,id,name
42,100,Tim Duncan
36,101,Tony Parker
Database¶
Spark Writer supports database as the data source, and only HIVE is available now.
Player format as follows:
col_name | data_type | comment |
---|---|---|
id | int | |
name | string | |
age | int |
Write Configuration Files¶
The configuration files consist of the Spark field, the Nebula Graph field, the tags mapping field, and the edges mapping field. The Spark related parameters are configured in the Spark field. The username and password information for Nebula Graph are configured in the Nebula Graph field. Basic data source information for each tag or edge is described in the tag/edge mapping field. The tag/edge mapping field corresponds to multiple tag/edge inputting sources. Different tag/edge can come from different data sources.
Example of a mapping file for the input source:
{
# Spark related configurations.
# See also: http://spark.apache.org/docs/latest/configuration.html
spark: {
app: {
name: Spark Writer
}
driver: {
cores: 1
maxResultSize: 1G
}
cores {
max: 16
}
}
# Nebula Graph related configurations.
nebula: {
# Query engine IP list
addresses: ["127.0.0.1:3699"]
# Username and password to connect to Nebula Graph service
user: user
pswd: password
# Graph space name for Nebula Graph
space: test
# The thrift connection timeout and retry times.
# If no configurations are set, the default values are 3000 and 3 respectively.
connection {
timeout: 3000
retry: 3
}
# The nGQL execution retry times.
# If no configuration is set, the default value is 3.
execution {
retry: 3
}
}
# Processing tags
tags: [
# Loading tag from HDFS and the data type is parquet.
# The tag's name is tag_name_0.
# field_0, field_1 and field_2 from HDFS's Parquet file are written into tag_name_0
# and the vertex column is vertex_key_field.
{
name: tag_name_0
type: parquet
path: hdfs_path
fields: {
field_0: nebula_field_0,
field_1: nebula_field_1,
field_2: nebula_field_2
}
vertex: vertex_key_field
batch : 16
}
# Similar to the preceding section.
# Loaded from Hive. The execution command $ {EXEC} is the dataset.
{
name: tag_name_1
type: hive
exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
fields: {
hive_field_0: nebula_field_0,
hive_field_1: nebula_field_1,
hive_field_2: nebula_field_2
}
vertex: vertex_id_field
}
]
# Processing edges
edges: [
# Loading edge from HDFS and data type is JSON.
# The edge's name is edge_name_0.
# field_0, field_1 and field_2 from HDFS's JSON file are written into edge_name_0
# The source column is source_field, target column is target_field and ranking column is ranking_field.
{
name: edge_name_0
type: json
path: hdfs_path
fields: {
field_0: nebula_field_0,
field_1: nebula_field_1,
field_2: nebula_field_2
}
source: source_field
target: target_field
ranking: ranking_field
}
# Loading from Hive will execute command ${exec} as data set.
# Ranking is optional.
{
name: edge_name_1
type: hive
exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
fields: {
hive_field_0: nebula_field_0,
hive_field_1: nebula_field_1,
hive_field_2: nebula_field_2
}
source: source_id_field
target: target_id_field
}
]
}
Spark Properties¶
The following table gives some example properties, all of which can be found in Spark Available Properties.
Field | Default | Required | Description |
---|---|---|---|
spark.app.name | Spark Writer | No | The name of your application |
spark.driver.cores | 1 | No | Number of cores to use for the driver process, only in cluster mode. |
spark.driver.maxResultSize | 1G | No | Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. It must be at least 1M, or 0 for unlimited. |
spark.cores.max | (not set) | No | When running on a standalone deploy cluster or a Mesos cluster in "coarse-grained" sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos. |
Nebula Graph Configuration¶
Field | Default Value | Required | Description |
---|---|---|---|
nebula.addresses | / | yes | query engine IP list, separated with comma |
nebula.user | / | yes | user name, the default value is user |
nebula.pswd | / | yes | password, the default user password is password |
nebula.space | / | yes | space to import data, the space name is test in this document |
nebula.connection.timeout | 3000 | no | Thrift timeout |
nebula.connection.retry | 3 | no | Thrift retry times |
nebula.execution.retry | 3 | no | nGQL execution retry times |
Mapping of Tags and Edges¶
The options for tag and edge mapping are very similar. The following describes the same options first, and then introduces the unique options of tag mapping
and edge mapping
.
- Same Options
type
is a case insensitive required field that specifies data type in the context, and currently supports Parquet, JSON, ORC and CSVpath
is applied to HDFS data source and specifies the absolute path of HDFS file or directory. It is a required field when the type is HDFSexec
is applied to Hive data source. It is a required filed when the query type is HIVEfields
is a required filed that maps the columns of the data source to properties of tag / edge
- unique options for tag mapping
vertex
is a required field that specifies a column as the vertex ID column
- unique options for edge mapping
source
is a required field that specifies a column in the input source as the source vertex ID columntarget
is a required field that specifies a column as the destination vertex ID columnranking
is an optional field that specifies a column as the edge ranking column when the inserted edge has a ranking value
Data Source Mapping¶
- HDFS Parquet Files
type
specifies the input source type. When it is parquet, it is a case insensitive required fieldpath
specifies the HDFS file directory. It is a required field that must be the absolute directory
- HDFS JSON Files
type
specifies the type of the input source. When it is JSON, it is a case insensitive required fieldpath
specifies the HDFS file directory. It is a required field that must be absolute directory
- HIVE ORC Files
type
specifies the input source type. When it is ORC, it is a case insensitive required fieldpath
specifies the HDFS file directory. It is a required field that must be the absolute directory
- HIVE CSV Files
type
specifies the input source type. When it is CSV, it is a case insensitive required fieldpath
specifies the HDFS file directory. It is a required field that must be the absolute directory
- HIVE
type
specifies the input source type. When it is HIVE, it is a case insensitive required fieldexec
is a required field that specifies the HIVE executed query
Import Data¶
Import data with the following command:
bin/spark-submit \
--class com.vesoft.nebula.tools.generator.v2.SparkClientGenerator \
--master ${MASTER-URL} \
${SPARK_WRITER_JAR_PACKAGE} -c conf/test.conf -h -d
Parameter descriptions:
Abbreviation | Required | Default | Description | Example |
---|---|---|---|---|
--class | yes | / | Specify the program's primary class | |
--master | yes | / | Specify spark cluster master url. Refer to master urls for detail | e.g. spark://23.195.26.187:7077 |
-c / --config | yes | / | The configuration file path in the context | |
-h / --hive | no | false | Used to specify whether to support Hive | |
-d / --directly | no | false | True for console insertion; false for sst import (TODO) |
|
-D / --dry | no | false | Check if the configuration file is correct |
Performance¶
It takes about four minutes (i.e. 400k QPS) to input 100 million rows (each row contains three fields, each batch contains 64 rows) into three nodes (56 core, 250G memory, 10G network, SSD).