Import data from SST files¶
This topic provides an example of how to generate the data from the data source into an SST (Sorted String Table) file and save it on HDFS, and then import it into NebulaGraph. The sample data source is a CSV file.
Precautions¶
- The SST file can be imported only in Linux.
- The default value of the property is not supported.
Background information¶
Exchange supports two data import modes:
- Import the data from the data source directly into NebulaGraph as nGQL statements.
- Generate the SST file from the data source, and use Console to import the SST file into NebulaGraph.
The following describes the scenarios, implementation methods, prerequisites, and steps for generating an SST file and importing data.
Scenarios¶
-
Suitable for online services, because the generation almost does not affect services (just reads the Schema), and the import speed is fast.
Caution
Although the import speed is fast, write operations in the corresponding space are blocked during the import period (about 10 seconds). Therefore, you are advised to import data in off-peak hours.
- Suitable for scenarios with a large amount of data from data sources for its fast import speed.
Implementation methods¶
The underlying code in NebulaGraph uses RocksDB as the key-value storage engine. RocksDB is a storage engine based on the hard disk, providing a series of APIs for creating and importing SST files to help quickly import massive data.
The SST file is an internal file containing an arbitrarily long set of ordered key-value pairs for efficient storage of large amounts of key-value data. The entire process of generating SST files is mainly done by Exchange Reader, sstProcessor, and sstWriter. The whole data processing steps are as follows:
-
Reader reads data from the data source.
-
sstProcessor generates the SST file from the NebulaGraph's Schema information and uploads it to the HDFS. For details about the format of the SST file, see Data Storage Format.
-
sstWriter opens a file and inserts data. When generating SST files, keys must be written in sequence.
-
After the SST file is generated, RocksDB imports the SST file into NebulaGraph using the
IngestExternalFile()
method. For example:IngestExternalFileOptions ifo; # Import two SST files Status s = db_->IngestExternalFile({"/home/usr/file1.sst", "/home/usr/file2.sst"}, ifo); if (!s.ok()) { printf("Error while adding file %s and %s, Error %s\n", file_path1.c_str(), file_path2.c_str(), s.ToString().c_str()); return 1; }
When the
IngestExternalFile()
method is called, RocksDB copies the file to the data directory by default and blocks the RocksDB write operation. If the key range in the SST file overwrites the Memtable key range, flush the Memtable to the hard disk. After placing the SST file in an optimal location in the LSM tree, assign a global serial number to the file and turn on the write operation.
Data set¶
This topic takes the basketballplayer dataset as an example.
Environment¶
This example is done on MacOS. Here is the environment configuration information:
- Hardware specifications:
- CPU: 1.7 GHz Quad-Core Intel Core i7
- Memory: 16 GB
- Spark: 2.4.7, stand-alone
- Hadoop: 2.9.2, pseudo-distributed deployment
- NebulaGraph: 3.6.0.
Prerequisites¶
Before importing data, you need to confirm the following information:
-
NebulaGraph has been installed and deployed with the following information:
- IP addresses and ports of Graph and Meta services.
- The user name and password with write permission to NebulaGraph.
--ws_storage_http_port
in the Meta service configuration file is the same as--ws_http_port
in the Storage service configuration file. For example,19779
.
--ws_meta_http_port
in the Graph service configuration file is the same as--ws_http_port
in the Meta service configuration file. For example,19559
.
- The information about the Schema, including names and properties of Tags and Edge types, and more.
- Spark has been installed.
- JDK 1.8 or the later version has been installed and the environment variable
JAVA_HOME
has been configured.
-
The Hadoop service has been installed and started.
Note
- To generate SST files of other data sources, see documents of the corresponding data source and check the prerequisites.
- To generate SST files only, users do not need to install the Hadoop service on the machine where the Storage service is deployed.
- To delete the SST file after the ingest (data import) operation, add the configuration
-- move_Files =true
to the Storage Service configuration file.
Steps¶
Step 1: Create the Schema in NebulaGraph¶
Analyze the data to create a Schema in NebulaGraph by following these steps:
-
Identify the Schema elements. The Schema elements in the NebulaGraph are shown in the following table.
Element Name Property Tag player
name string, age int
Tag team
name string
Edge Type follow
degree int
Edge Type serve
start_year int, end_year int
-
Create a graph space basketballplayer in the NebulaGraph and create a Schema as shown below.
## Create a graph space nebula> CREATE SPACE basketballplayer \ (partition_num = 10, \ replica_factor = 1, \ vid_type = FIXED_STRING(30)); ## Use the graph space basketballplayer nebula> USE basketballplayer; ## Create the Tag player nebula> CREATE TAG player(name string, age int); ## Create the Tag team nebula> CREATE TAG team(name string); ## Create the Edge type follow nebula> CREATE EDGE follow(degree int); ## Create the Edge type serve nebula> CREATE EDGE serve(start_year int, end_year int);
For more information, see Quick start workflow.
Step 2: Process CSV files¶
Confirm the following information:
-
Process CSV files to meet Schema requirements.
Note
Exchange supports uploading CSV files with or without headers.
-
Obtain the CSV file storage path.
Step 3: Modify configuration files¶
After Exchange is compiled, copy the conf file target/classes/application.conf
to set SST data source configuration. In this example, the copied file is called sst_application.conf
. For details on each configuration item, see Parameters in the configuration file.
{
# Spark configuration
spark: {
app: {
name: NebulaGraph Exchange 3.6.1
}
master:local
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory:1G
}
cores:{
max: 16
}
}
# NebulaGraph configuration
nebula: {
address:{
graph:["192.8.168.XXX:9669"]
# the address of any of the meta services.
# if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta.
meta:["192.8.168.XXX:9559"]
}
user: root
pswd: nebula
space: basketballplayer
# SST file configuration
path:{
# The local directory that temporarily stores generated SST files
local:"/tmp"
# The path for storing the SST file in the HDFS
remote:"/sst"
# The NameNode address of HDFS, for example, "hdfs://<ip/hostname>:<port>"
hdfs.namenode: "hdfs://*.*.*.*:9000"
}
# The connection parameters of clients
connection: {
# The timeout duration of socket connection and execution. Unit: milliseconds.
timeout: 30000
}
error: {
# The maximum number of failures that will exit the application.
max: 32
# Failed import jobs are logged in the output path.
output: /tmp/errors
}
# Use Google's RateLimiter to limit requests to NebulaGraph.
rate: {
# Steady throughput of RateLimiter.
limit: 1024
# Get the allowed timeout duration from RateLimiter. Unit: milliseconds.
timeout: 1000
}
}
# Processing vertices
tags: [
# Set the information about the Tag player.
{
# Specify the Tag name defined in NebulaGraph.
name: player
type: {
# Specify the data source file format to CSV.
source: csv
# Specify how to import the data into NebulaGraph: Client or SST.
sink: sst
}
# Specify the path to the CSV file.
# If the file is stored in HDFS, use double quotation marks to enclose the file path, starting with hdfs://. For example, "hdfs://<ip/hostname>:port/xx/xx.csv".
path: "hdfs://*.*.*.*:9000/dataset/vertex_player.csv"
# If the CSV file does not have a header, use [_c0, _c1, _c2, ..., _cn] to represent its header and indicate the columns as the source of the property values.
# If the CSV file has a header, use the actual column name.
fields: [_c1, _c2]
# Specify the property name defined in NebulaGraph.
# The sequence of fields and nebula.fields must correspond to each other.
nebula.fields: [age, name]
# Specify a column of data in the table as the source of VIDs in NebulaGraph.
# The value of vertex must be consistent with the column name in the above fields or csv.fields.
# Currently, NebulaGraph 3.6.0 supports only strings or integers of VID.
vertex: {
field:_c0
# Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
# prefix:"tag1"
# Performs hashing operations on VIDs of type string.
# policy:hash
}
# The delimiter specified. The default value is comma.
separator: ","
# If the CSV file has a header, set the header to true.
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
# The number of data written to NebulaGraph in a single batch.
batch: 256
# The number of partitions to be created when the data is written to NebulaGraph.
partition: 32
# Whether to repartition data based on the number of partitions of graph spaces in NebulaGraph when generating the SST file.
repartitionWithNebula: false
}
# Set the information about the Tag Team.
{
name: team
type: {
source: csv
sink: sst
}
path: "hdfs://*.*.*.*:9000/dataset/vertex_team.csv"
fields: [_c1]
nebula.fields: [name]
vertex: {
field:_c0
}
separator: ","
header: false
batch: 256
partition: 32
repartitionWithNebula: false
}
# If more vertices need to be added, refer to the previous configuration to add them.
]
# Processing edges
edges: [
# Set the information about the Edge Type follow.
{
# The Edge Type name defined in NebulaGraph.
name: follow
type: {
# Specify the data source file format to CSV.
source: csv
# Specify how to import the data into NebulaGraph: Client or SST.
sink: sst
}
# Specify the path to the CSV file.
# If the file is stored in HDFS, use double quotation marks to enclose the file path, starting with hdfs://. For example, "hdfs://<ip/hostname>:port/xx/xx.csv".
path: "hdfs://*.*.*.*:9000/dataset/edge_follow.csv"
# If the CSV file does not have a header, use [_c0, _c1, _c2, ..., _cn] to represent its header and indicate the columns as the source of the property values.
# If the CSV file has a header, use the actual column name.
fields: [_c2]
# Specify the property name defined in NebulaGraph.
# The sequence of fields and nebula.fields must correspond to each other.
nebula.fields: [degree]
# Specify a column as the source for the source and destination vertices.
# The value of vertex must be consistent with the column name in the above fields or csv.fields.
# Currently, NebulaGraph 3.6.0 supports only strings or integers of VID.
source: {
field: _c0
# Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
# prefix:"tag1"
# Performs hashing operations on VIDs of type string.
# policy:hash
}
target: {
field: _c1
# Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
# prefix:"tag1"
# Performs hashing operations on VIDs of type string.
# policy:hash
}
# The delimiter specified. The default value is comma.
separator: ","
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# If the CSV file has a header, set the header to true.
# If the CSV file does not have a header, set the header to false. The default value is false.
header: false
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
# The number of data written to NebulaGraph in a single batch.
batch: 256
# The number of partitions to be created when the data is written to NebulaGraph.
partition: 32
# Whether to repartition data based on the number of partitions of graph spaces in NebulaGraph when generating the SST file.
repartitionWithNebula: false
}
# Set the information about the Edge Type serve.
{
name: serve
type: {
source: csv
sink: sst
}
path: "hdfs://*.*.*.*:9000/dataset/edge_serve.csv"
fields: [_c2,_c3]
nebula.fields: [start_year, end_year]
source: {
field: _c0
}
target: {
field: _c1
}
separator: ","
header: false
batch: 256
partition: 32
repartitionWithNebula: false
}
]
# If more edges need to be added, refer to the previous configuration to add them.
}
Step 4: Generate the SST file¶
Run the following command to generate the SST file from the CSV source file. For a description of the parameters, see Options for import.
${SPARK_HOME}/bin/spark-submit --master "local" --conf spark.sql.shuffle.partition=<shuffle_concurrency> --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-3.6.1.jar_path> -c <sst_application.conf_path>
Note
When generating SST files, the shuffle operation of Spark will be involved. Note that the configuration of spark.sql.shuffle.partition
should be added when you submit the command.
Note
JAR packages are available in two ways: compiled them yourself, or download the compiled .jar
file directly.
For example:
${SPARK_HOME}/bin/spark-submit --master "local" --conf spark.sql.shuffle.partition=200 --class com.vesoft.nebula.exchange.Exchange /root/nebula-exchange/nebula-exchange/target/nebula-exchange-3.6.1.jar -c /root/nebula-exchange/nebula-exchange/target/classes/sst_application.conf
After the task is complete, you can view the generated SST file in the /sst
directory (specified by the nebula.path.remote
parameter) on HDFS.
Note
If you modify the Schema, such as rebuilding the graph space, modifying the Tag, or modifying the Edge type, you need to regenerate the SST file because the SST file verifies the space ID, Tag ID, and Edge ID.
Step 5: Import the SST file¶
Note
Confirm the following information before importing:
- Confirm that the Hadoop service has been deployed on all the machines where the Storage service is deployed, and configure
HADOOP_HOME
andJAVA_HOME
.
- The
--ws_storage_http_port
in the Meta service configuration file (add it manually if it does not exist) is the same as the--ws_http_port
in the Storage service configuration file. For example, both are19779
.
- The
--ws_meta_http_port
in the Graph service configuration file (add it manually if it does not exist) is the same as the--ws_http_port
in the Meta service configuration file. For example, both are19559
.
Connect to the NebulaGraph database using the client tool and import the SST file as follows:
-
Run the following command to select the graph space you created earlier.
nebula> USE basketballplayer;
-
Run the following command to download the SST file:
nebula> SUBMIT JOB DOWNLOAD HDFS "hdfs://<hadoop_address>:<hadoop_port>/<sst_file_path>";
For example:
nebula> SUBMIT JOB DOWNLOAD HDFS "hdfs://*.*.*.*:9000/sst";
-
Run the following command to import the SST file:
nebula> SUBMIT JOB INGEST;
Note
- To download the SST file again, delete the
download
folder in the space ID in thedata/storage/nebula
directory in the NebulaGraph installation path, and then download the SST file again. If the space has multiple copies, thedownload
folder needs to be deleted on all machines where the copies are saved.
- If there is a problem with the import and re-importing is required, re-execute
SUBMIT JOB INGEST;
.
Step 6: (Optional) Validate data¶
Users can verify that data has been imported by executing a query in the NebulaGraph client (for example, NebulaGraph Studio). For example:
LOOKUP ON player YIELD id(vertex);
Users can also run the SHOW STATS
command to view statistics.
Step 7: (Conditional) Rebuild indexes in NebulaGraph¶
With the data imported, users can recreate and rebuild indexes in NebulaGraph. For details, see Index overview.