Import data from Neo4j¶
This topic provides an example of how to use Exchange to import NebulaGraph data stored in Neo4j.
Implementation method¶
Exchange uses Neo4j Driver 4.0.1 to read Neo4j data. Before batch export, you need to write Cypher statements that are automatically executed based on labels and relationship types and the number of Spark partitions in the configuration file to improve data export performance.
When Exchange reads Neo4j data, it needs to do the following:
-
The Reader in Exchange replaces the statement following the Cypher
RETURN
statement in theexec
part of the configuration file withCOUNT(*)
, and executes this statement to get the total amount of data, then calculates the starting offset and size of each partition based on the number of Spark partitions. -
(Optional) If the user has configured the
check_point_path
directory, Reader reads the files in the directory. In the transferring state, Reader calculates the offset and size that each Spark partition should have. -
In each Spark partition, the Reader in Exchange adds different
SKIP
andLIMIT
statements to the Cypher statement and calls the Neo4j Driver for parallel execution to distribute data to different Spark partitions. -
The Reader finally processes the returned data into a DataFrame.
At this point, Exchange has finished exporting the Neo4j data. The data is then written in parallel to the NebulaGraph database.
The whole process is illustrated below.
Data set¶
This topic takes the basketballplayer dataset as an example.
Environment¶
This example is done on MacOS. Here is the environment configuration information:
-
Hardware specifications:
- CPU:Intel(R) Xeon(R) CPU E5-2697 v3 @ 2.60GHz
- CPU cores: 14
- Memory: 251 GB
- Spark: Stand-alone, 2.4.6 pre-build for Hadoop 2.7
- Neo4j: 3.5.20 Community Edition
- NebulaGraph: 2.6.2. Deploy NebulaGraph with Docker Compose.
Prerequisites¶
Before importing data, you need to confirm the following information:
-
NebulaGraph has been installed and deployed with the following information:
- IP addresses and ports of Graph and Meta services.
- The user name and password with NebulaGraph write permission.
- Spark has been installed.
- Learn about the Schema created in NebulaGraph, including names and properties of Tags and Edge types, and more.
Steps¶
Step 1: Create the Schema in NebulaGraph¶
Analyze the data to create a Schema in NebulaGraph by following these steps:
-
Identify the Schema elements. The Schema elements in the NebulaGraph are shown in the following table.
Element Name Property Tag player
name string, age int
Tag team
name string
Edge Type follow
degree int
Edge Type serve
start_year int, end_year int
-
Create a graph space basketballplayer in the NebulaGraph and create a Schema as shown below.
## Create a graph space nebula> CREATE SPACE basketballplayer \ (partition_num = 10, \ replica_factor = 1, \ vid_type = FIXED_STRING(30)); ## Use the graph space basketballplayer nebula> USE basketballplayer; ## Create the Tag player nebula> CREATE TAG player(name string, age int); ## Create the Tag team nebula> CREATE TAG team(name string); ## Create the Edge type follow nebula> CREATE EDGE follow(degree int); ## Create the Edge type serve nebula> CREATE EDGE serve(start_year int, end_year int);
For more information, see Quick start workflow.
Step 2: Configuring source data¶
To speed up the export of Neo4j data, create indexes for the corresponding properties in the Neo4j database. For more information, refer to the Neo4j manual.
Step 3: Modify configuration files¶
After Exchange is compiled, copy the conf file target/classes/application.conf
to set Neo4j data source configuration. In this example, the copied file is called neo4j_application.conf
. For details on each configuration item, see Parameters in the configuration file.
{
# Spark configuration
spark: {
app: {
name: Nebula Exchange 2.6.1
}
driver: {
cores: 1
maxResultSize: 1G
}
executor: {
memory:1G
}
cores:{
max: 16
}
}
# NebulaGraph configuration
nebula: {
address:{
graph:["127.0.0.1:9669"]
meta:["127.0.0.1:9559"]
}
user: root
pswd: nebula
space: basketballplayer
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing vertexes
tags: [
# Set the information about the Tag player
{
name: player
type: {
source: neo4j
sink: client
}
server: "bolt://192.168.*.*:7687"
user: neo4j
password:neo4j
database:neo4j
exec: "match (n:player) return n.id as id, n.age as age, n.name as name"
fields: [age,name]
nebula.fields: [age,name]
vertex: {
field:id
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
# Set the information about the Tag Team
{
name: team
type: {
source: neo4j
sink: client
}
server: "bolt://192.168.*.*:7687"
user: neo4j
password:neo4j
database:neo4j
exec: "match (n:team) return n.id as id,n.name as name"
fields: [name]
nebula.fields: [name]
vertex: {
field:id
}
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
# Processing edges
edges: [
# Set the information about the Edge Type follow
{
name: follow
type: {
source: neo4j
sink: client
}
server: "bolt://192.168.*.*:7687"
user: neo4j
password:neo4j
database:neo4j
exec: "match (a:player)-[r:follow]->(b:player) return a.id as src, b.id as dst, r.degree as degree order by id(r)"
fields: [degree]
nebula.fields: [degree]
source: {
field: src
}
target: {
field: dst
}
#ranking: rank
partition: 10
batch: 1000
check_point_path: /tmp/test
}
# Set the information about the Edge Type serve
{
name: serve
type: {
source: neo4j
sink: client
}
server: "bolt://192.168.*.*:7687"
user: neo4j
password:neo4j
database:neo4j
exec: "match (a:player)-[r:serve]->(b:team) return a.id as src, b.id as dst, r.start_year as start_year, r.end_year as end_year order by id(r)"
fields: [start_year,end_year]
nebula.fields: [start_year,end_year]
source: {
field: src
}
target: {
field: dst
}
#ranking: rank
partition: 10
batch: 1000
check_point_path: /tmp/test
}
]
}
Exec configuration¶
When configuring either the tags.exec
or edges.exec
parameters, you need to fill in the Cypher query. To prevent loss of data during import, it is strongly recommended to include ORDER BY
clause in Cypher queries. Meanwhile, in order to improve data import efficiency, it is better to select indexed properties for ordering. If there is no index, users can also observe the default order and select the appropriate properties for ordering to improve efficiency. If the pattern of the default order cannot be found, users can order them by the ID of the vertex or relationship and set the partition
to a small value to reduce the ordering pressure of Neo4j.
Note
Using the ORDER BY
clause lengthens the data import time.
Exchange needs to execute different SKIP
and LIMIT
Cypher statements on different Spark partitions, so SKIP
and LIMIT
clauses cannot be included in the Cypher statements corresponding to tags.exec
and edges.exec
.
tags.vertex or edges.vertex configuration¶
NebulaGraph uses ID as the unique primary key when creating vertexes and edges, overwriting the data in that primary key if it already exists. So, if a Neo4j property value is given as the NebulaGraph'S ID and the value is duplicated in Neo4j, duplicate IDs will be generated. One and only one of their corresponding data will be stored in the NebulaGraph, and the others will be overwritten. Because the data import process is concurrently writing data to NebulaGraph, the final saved data is not guaranteed to be the latest data in Neo4j.
check_point_path configuration¶
If breakpoint transfers are enabled, to avoid data loss, the state of the database should not change between the breakpoint and the transfer. For example, data cannot be added or deleted, and the partition
quantity configuration should not be changed.
Step 4: Import data into NebulaGraph¶
Run the following command to import Neo4j data into NebulaGraph. For a description of the parameters, see Options for import.
${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-2.6.1.jar_path> -c <neo4j_application.conf_path>
Note
JAR packages are available in two ways: compiled them yourself, or download the compiled .jar
file directly.
For example:
${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange /root/nebula-exchange/nebula-exchange/target/nebula-exchange-2.6.1.jar -c /root/nebula-exchange/nebula-exchange/target/classes/neo4j_application.conf
You can search for batchSuccess.<tag_name/edge_name>
in the command output to check the number of successes. For example, batchSuccess.follow: 300
.
Step 5: (optional) Validate data¶
Users can verify that data has been imported by executing a query in the NebulaGraph client (for example, NebulaGraph Studio). For example:
GO FROM "player100" OVER follow;
Users can also run the SHOW STATS
command to view statistics.
Step 6: (optional) Rebuild indexes in NebulaGraph¶
With the data imported, users can recreate and rebuild indexes in NebulaGraph. For details, see Index overview.