Skip to content

Import data from Neo4j

This topic provides an example of how to use Exchange to import Nebula Graph data stored in Neo4j.

Implementation method

Exchange uses Neo4j Driver 4.0.1 to read Neo4j data. Before batch export, you need to write Cypher statements that are automatically executed based on labels and relationship types and the number of Spark partitions in the configuration file to improve data export performance.

When Exchange reads Neo4j data, it needs to do the following:

  1. The Reader in Exchange replaces the statement following the Cypher RETURN statement in the exec part of the configuration file with COUNT(*), and executes this statement to get the total amount of data, then calculates the starting offset and size of each partition based on the number of Spark partitions.

  2. (optional) If the user has configured the check_point_path directory, Reader reads the files in the directory. In the continued state, Reader calculates the offset and size that each Spark partition should have.

  3. In each Spark partition, The Exchange Reader adds different SKIP and LIMIT statements to the Cypher statement and calls the Neo4j Driver for parallel execution to distribute data to different Spark partitions.

  4. The Reader finally processes the returned data into a DataFrame.

At this point, Exchange has finished exporting the Neo4j data. The data is then written in parallel to the Nebula Graph database.

The whole process is illustrated below.

Nebula Graph® Exchange exports data from the Neo4j database and imports it into the Nebula Graph database in parallel

Data set

This topic takes the basketballplayer dataset as an example.

Environment

This example is done on MacOS. Here is the environment configuration information:

  • Hardware specifications:
    • CPU: 1.7 GHz Quad-Core Intel Core i7
    • CPU cores: 14
    • memory: 16 GB
  • Spark: Stand-alone, 2.4.6 pre-build for Hadoop 2.7
  • Neo4j: 3.5.20 Community Edition

Prerequisites

Before importing data, you need to confirm the following information:

  • Nebula Graph has been installed and deployed with the following information:

    • IP address and port of Graph and Meta services.
    • User name and password with Nebula Graph write permission.
  • Spark has been installed.
  • Learn about the Schema created in Nebula Graph, including Tag and Edge type names, properties, and more.

Steps

Step 1: Create the Schema in Nebula Graph

Analyze the data to create a Schema in Nebula Graph by following these steps:

  1. Identify the Schema elements. The Schema elements in the Nebula Graph are shown in the following table.

    Element name property
    Tag player name string, age int
    Tag team name string
    Edge Type follow degree int
    Edge Type serve start_year int, end_year int
  2. Create a graph space basketballplayer in the Nebula Graph and create a Schema as shown below.

    ## create graph space
    nebula> CREATE SPACE basketballplayer \
            (partition_num = 10, \
            replica_factor = 1, \
            vid_type = FIXED_STRING(30));
    
    ## use the graph space basketballplayer
    nebula> USE basketballplayer;
    
    ## create Tag player
    nebula> CREATE TAG player(name string, age int);
    
    ## create Tag team
    nebula> CREATE TAG team(name string);
    
    ## create Edge type follow
    nebula> CREATE EDGE follow(degree int);
    
    ## create Edge type serve
    nebula> CREATE EDGE serve(start_year int, end_year int);
    

For more information, see Quick start workflow.

Step 2: Configuring Source Data

To speed up the export of Neo4j data, create indexes for the corresponding properties in the Neo4j database. For more information, refer to the Neo4j user manual.

Step 3: Modify configuration file

After Exchange is compiled, copy the conf file target/classes/application.conf settings Neo4j data source configuration. In this case, the copied file is called neo4j_application.conf. For details on each configuration item, see Parameters in the configuration file.

{
  # Spark configuration
  spark: {
    app: {
      name: Nebula Exchange 2.5.1
    }

    driver: {
      cores: 1
      maxResultSize: 1G
    }

    executor: {
        memory:1G
    }

    cores:{
      max: 16
    }
  }


  # Nebula Graph configuration
  nebula: {
    address:{
      graph:["127.0.0.1:9669"]
      meta:["127.0.0.1:9559"]
    }
    user: root
    pswd: nebula
    space: basketballplayer

    connection {
      timeout: 3000
      retry: 3
    }

    execution {
      retry: 3
    }

    error: {
      max: 32
      output: /tmp/errors
    }

    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # Processing vertex
  tags: [


    # Set information about Tag player.
    {
      name: player
      type: {
        # Specify the data source file format, set to Neo4j.
        source: neo4j
         # Specifies how to import the data into Nebula Graph: Client or SST.
        sink: client
      }
      # Neo4j Server IP address and port number.
      server: "bolt://192.168.*.*:7687"

      user: neo4j
      password:neo4j

      database:neo4j

      exec: "match (n:player) return n.id as id, n.age as age, n.name as name"

      fields: [age,name]
      nebula.fields: [age,name]
      vertex: {
        field:id
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  # Set Tag Team information.
  {
      name: team
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (n:team) return n.id as id,n.name as name"
      fields: [name]
      nebula.fields: [name]
      vertex: {
        field:id
      }
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
   }
  ]

  # Processing edge
  edges: [
    # Set information about Edge Type follow
    {
      name: follow
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (a:player)-[r:follow]->(b:player) return a.id as src, b.id as dst, r.degree as degree  order by id(r)"
      fields: [degree]
      nebula.fields: [degree]
      source: {
        field: src
      }
      target: {
        field: dst
      }
      #ranking: rank
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   # Set information about Edge Type serve
   {
      name: serve
      type: {
        source: neo4j
        sink: client
      }
      server: "bolt://192.168.*.*:7687"
      user: neo4j
      password:neo4j
      database:neo4j
      exec: "match (a:player)-[r:serve]->(b:team) return a.id as src, b.id as dst, r.start_year as start_year, r.end_year as end_year  order by id(r)"
      fields: [start_year,end_year]
      nebula.fields: [start_year,end_year]
      source: {
        field: src
      }
      target: {
        field: dst
      }
      #ranking: rank
      partition: 10
      batch: 1000
      check_point_path: /tmp/test
    }
   ]
}

Exec Configuration Description

When configuring either the tags.exec or edges.exec parameters, you need to fill in the Cypher query. To prevent loss of data during import, it is strongly recommended to include ORDER BY clause in Cypher queries. Meanwhile, in order to improve data import efficiency, it is better to select indexed properties as sorting properties. If there is no index, users can also observe the default sort and select the appropriate properties for sorting to improve efficiency. If the default sort cannot be found, users can sort by the ID of the vertex or relationship and set the partition to a small value to reduce the sorting pressure of Neo4j.

Note

Using the ORDER BY clause lengthens the data import time.

Exchange needs to execute different SKIP and LIMIT Cypher statements on different Spark partitions, so SKIP and LIMIT clauses cannot be included in the Cypher statements corresponding to tags.exec and edges.exec.

tags.vertex or edges.vertex

Nebula Graph uses ID as the unique primary key when creating vertexes and edges, overwriting the data in that primary key if it already exists. So, if a Neo4j property value is given as the Nebula Graph'S ID and the value is duplicated in Neo4j, duplicate ids will result. One and only one of their corresponding data will be stored in the Nebula Graph, and the others will be overwritten. Because the data import process is concurrently writing data to Nebula Graph, the resulting saved data is not guaranteed to be the latest data in Neo4j.

check_point_path

If breakpoint continuation is enabled, to avoid data loss, the state of the database should not change between the breakpoint and the continuation, for example, data cannot be added or deleted, and the partition quantity configuration should not be changed.

Step 4: Import data into Nebula Graph

Run the following command to import Neo4j data into Nebula Graph. For a description of the parameters, see Options for import.

${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-2.5.1.jar_path> -c <neo4j_application.conf_path> 

Note

JAR packages are available in two ways: compiled them yourself, or download the compiled .jar file directly.

Example:

${SPARK_HOME}/bin/spark-submit  --master "local" --class com.vesoft.nebula.exchange.Exchange  /root/nebula-spark-utils/nebula-exchange/target/nebula-exchange-2.5.1.jar  -c /root/nebula-spark-utils/nebula-exchange/target/classes/neo4j_application.conf

You can search for batchSuccess.<tag_name/edge_name> in the command output to check the number of successes. For example, batchSuccess.follow: 300.

Step 5: (optional) Validation data

Users can verify that data has been imported by executing a query in the Nebula Graph client (for example, Nebula Graph Studio). Such as:

GO FROM "player100" OVER follow;

Users can also run the SHOW STATS command to view statistics.

Step 6: (optional) Rebuild indexes in Nebula Graph

With the data imported, users can recreate and rebuild indexes in Nebula Graph. For details, see Index overview.


Last update: October 9, 2021
Back to top