Spark Writer

Overview

Spark Writer is a Spark-based distributed data importer for Nebula Graph. It converts data from multiple data sources into vertices and edges for graphs and batch imports data into the graph database. Currently, the supported data sources are:

  • HDFS, including Parquet, JSON, ORC and CSV
  • HIVE

Spark Writer supports concurrent importing multiple tags and edges, and configuring different data sources for different tags and edges.

Prerequisites

NOTE: To use Nebula Graph Spark Writer, please make sure you have:

  • Spark 2.0 or above
  • Hive 2.3 or above
  • Hadoop 2.0 or above

Get Spark Writer

From Source Code

git clone https://github.com/vesoft-inc/nebula.git
cd nebula/src/tools/spark-sstfile-generator
mvn compile package

Or you can download from OSS.

Download From Cloud Storage OSS

wget https://oss-cdn.nebula-graph.io/jar-packages/sst.generator-1.0.0-beta.jar

User Guide

This section includes the following steps:

  1. Create a graph space and its schema in Nebula Graph
  2. Write data files
  3. Write input source mapping file
  4. Import data

Create Graph Space

Please refer to the example graph in Quick Start.

NOTE: Please create a space and define the schema in Nebula Graph first, then use this tool to import data to Nebula Graph.

Example Data

Vertices

A vertex data file consists of multiple rows. Each line in the file represents a vertex and its properties. In general, the first column is the ID of the vertex. This ID column is specified in the mapping file. The other columns are the properties of the vertex. Consider the following example in JSON format.

  • Player data
{"id":100,"name":"Tim Duncan","age":42}
{"id":101,"name":"Tony Parker","age":36}
{"id":102,"name":"LaMarcus Aldridge","age":33}

Edges

An edge data file consists of multiple rows. Each line in the file represents a point and its properties. In general, the first column is the ID of the source vertex, the second column is the ID of the destination vertex. These ID columns are specified in the mapping file. Other columns are the properties of the edge. Consider the following example in JSON format.

Take edge follow as example:

  • Edge without rank
{"source":100,"target":101,"likeness":95}
{"source":101,"target":100,"likeness":95}
{"source":101,"target":102,"likeness":90}
  • Edge with rank
{"source":100,"target":101,"likeness":95,"ranking":2}
{"source":101,"target":100,"likeness":95,"ranking":1}
{"source":101,"target":102,"likeness":90,"ranking":3}

Spatial Data Geo

Spark Writer supports importing Geo data. Geo data contains latitude and longitude, and the data type is double.

{"latitude":30.2822095,"longitude":120.0298785,"target":0,"dp_poi_name":"0"}
{"latitude":30.2813834,"longitude":120.0208692,"target":1,"dp_poi_name":"1"}
{"latitude":30.2807347,"longitude":120.0181162,"target":2,"dp_poi_name":"2"}
{"latitude":30.2812694,"longitude":120.0164896,"target":3,"dp_poi_name":"3"}

Data Source Files

The currently supported data sources by Spark Writer are:

  • HDFS
  • HIVE
HDFS Files

HDFS supports the following file formats:

  • Parquet
  • JSON
  • CSV
  • ORC

Player data in Parquet format:

+-------+---+---------+
|age| id|        name|
+-------+---+---------+
| 42|100| Tim Duncan |
| 36|101| Tony Parker|
+-------+---+---------+

In JSON:

{"id":100,"name":"Tim Duncan","age":42}
{"id":101,"name":"Tony Parker","age":36}

In CSV:

age,id,name
42,100,Tim Duncan
36,101,Tony Parker
Database

Spark Writer supports database as the data source, and only HIVE is available now.

Player format as follows:

col_name data_type comment
id int
name string
age int

Write Configuration Files

The configuration files consist of the Spark field, the Nebula Graph field, the tags mapping field, and the edges mapping field. The Spark related parameters are configured in the Spark field. The username and password information for Nebula Graph are configured in the Nebula Graph field. Basic data source information for each tag or edge is described in the tag/edge mapping field. The tag/edge mapping field corresponds to multiple tag/edge inputting sources. Different tag/edge can come from different data sources.

Example of a mapping file for the input source:

{
  # Spark related configurations.
  # See also: http://spark.apache.org/docs/latest/configuration.html
  spark: {
    app: {
      name: Spark Writer
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    cores {
      max: 16
    }
  }

  # Nebula Graph related configurations.
  nebula: {
    # Query engine IP list  
    addresses: ["127.0.0.1:3699"]

    # Username and password to connect to Nebula Graph service
    user: user
    pswd: password

    # Graph space name for Nebula Graph
    space: test

    # The thrift connection timeout and retry times.
    # If no configurations are set, the default values are 3000 and 3 respectively.
    connection {
      timeout: 3000
      retry: 3
    }

  # The nGQL execution retry times.
  # If no configuration is set, the default value is 3.
    execution {
      retry: 3
    }
  }

  # Processing tags
  tags: [

    # Loading tag from HDFS and the data type is parquet.
    # The tag's name is tag_name_0.
    # field_0, field_1 and field_2 from HDFS's Parquet file are written into tag_name_0
    # and the vertex column is vertex_key_field.
    {
      name: tag_name_0
      type: parquet
      path: hdfs_path
      fields: {
        field_0: nebula_field_0,
        field_1: nebula_field_1,
        field_2: nebula_field_2
      }
      vertex: vertex_key_field
      batch : 16
    }

    # Similar to the preceding section.
    # Loaded from Hive. The execution command $ {EXEC} is the dataset.
    {
      name: tag_name_1
      type: hive
      exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
      fields: {
        hive_field_0: nebula_field_0,
        hive_field_1: nebula_field_1,
        hive_field_2: nebula_field_2
      }
      vertex: vertex_id_field
    }
  ]

  # Processing edges
  edges: [
    # Loading edge from HDFS and data type is JSON.
    # The edge's name is edge_name_0.
    # field_0, field_1 and field_2 from HDFS's JSON file are written into edge_name_0
    # The source column is source_field, target column is target_field and ranking column is ranking_field.
    {
      name: edge_name_0
      type: json
      path: hdfs_path
      fields: {
        field_0: nebula_field_0,
        field_1: nebula_field_1,
        field_2: nebula_field_2
      }
      source:  source_field
      target:  target_field
      ranking: ranking_field
    }

    # Loading from Hive will execute command ${exec} as data set.
    # Ranking is optional.
    {
      name: edge_name_1
      type: hive
      exec: "select hive_field_0, hive_field_1, hive_field_2 from database.table"
      fields: {
        hive_field_0: nebula_field_0,
        hive_field_1: nebula_field_1,
        hive_field_2: nebula_field_2
      }
      source:  source_id_field
      target:  target_id_field
    }
  ]
}

Spark Properties

The following table gives some example properties, all of which can be found in Spark Available Properties.

Field Default Required Description
spark.app.name Spark Writer No The name of your application
spark.driver.cores 1 No Number of cores to use for the driver process, only in cluster mode.
spark.driver.maxResultSize 1G No Limit of total size of serialized results of all partitions for each Spark action (e.g. collect) in bytes. It must be at least 1M, or 0 for unlimited.
spark.cores.max (not set) No When running on a standalone deploy cluster or a Mesos cluster in "coarse-grained" sharing mode, the maximum amount of CPU cores to request for the application from across the cluster (not from each machine). If not set, the default will be spark.deploy.defaultCores on Spark's standalone cluster manager, or infinite (all available cores) on Mesos.

Nebula Graph Configuration

Field Default Value Required Description
nebula.addresses / yes query engine IP list, separated with comma
nebula.user / yes user name, the default value is user
nebula.pswd / yes password, the default user password is password
nebula.space / yes space to import data, the space name is test in this document
nebula.connection.timeout 3000 no Thrift timeout
nebula.connection.retry 3 no Thrift retry times
nebula.execution.retry 3 no nGQL execution retry times

Mapping of Tags and Edges

The options for tag and edge mapping are very similar. The following describes the same options first, and then introduces the unique options of tag mapping and edge mapping.

  • Same Options
    • type is a case insensitive required field that specifies data type in the context, and currently supports Parquet, JSON, ORC and CSV
    • path is applied to HDFS data source and specifies the absolute path of HDFS file or directory. It is a required field when the type is HDFS
    • exec is applied to Hive data source. It is a required filed when the query type is HIVE
    • fields is a required filed that maps the columns of the data source to properties of tag / edge
  • unique options for tag mapping
    • vertex is a required field that specifies a column as the vertex ID column
  • unique options for edge mapping
    • source is a required field that specifies a column in the input source as the source vertex ID column
    • target is a required field that specifies a column as the destination vertex ID column
    • ranking is an optional field that specifies a column as the edge ranking column when the inserted edge has a ranking value

Data Source Mapping

  • HDFS Parquet Files
    • type specifies the input source type. When it is parquet, it is a case insensitive required field
    • path specifies the HDFS file directory. It is a required field that must be the absolute directory
  • HDFS JSON Files
    • type specifies the type of the input source. When it is JSON, it is a case insensitive required field
    • path specifies the HDFS file directory. It is a required field that must be absolute directory
  • HIVE ORC Files
    • type specifies the input source type. When it is ORC, it is a case insensitive required field
    • path specifies the HDFS file directory. It is a required field that must be the absolute directory
  • HIVE CSV Files
    • type specifies the input source type. When it is CSV, it is a case insensitive required field
    • path specifies the HDFS file directory. It is a required field that must be the absolute directory
  • HIVE
    • type specifies the input source type. When it is HIVE, it is a case insensitive required field
    • exec is a required field that specifies the HIVE executed query

Import Data

Import data with the following command:

bin/spark-submit \
 --class com.vesoft.nebula.tools.generator.v2.SparkClientGenerator \
 --master ${MASTER-URL} \
 ${SPARK_WRITER_JAR_PACKAGE} -c conf/test.conf -h -d

Parameter descriptions:

Abbreviation Required Default Description Example
--class yes / Specify the program's primary class
--master yes / Specify spark cluster master url. Refer to master urls for detail e.g. spark://23.195.26.187:7077
-c / --config  yes / The configuration file path in the context
-h / --hive no false Used to specify whether to support Hive
-d / --directly no false True for console insertion;
false for sst import (TODO)
-D / --dry no false Check if the configuration file is correct

Performance

It takes about four minutes (i.e. 400k QPS) to input 100 million rows (each row contains three fields, each batch contains 64 rows) into three nodes (56 core, 250G memory, 10G network, SSD).