Skip to content

Import data from CSV files

This topic provides an example of how to use Exchange to import NebulaGraph data stored in HDFS or local CSV files.

Data set

This topic takes the basketballplayer dataset as an example.

Environment

This example is done on MacOS. Here is the environment configuration information:

  • Hardware specifications:
    • CPU: 1.7 GHz Quad-Core Intel Core i7
    • Memory: 16 GB
  • Spark: 2.4.7, stand-alone
  • Hadoop: 2.9.2, pseudo-distributed deployment

Prerequisites

Before importing data, you need to confirm the following information:

  • NebulaGraph has been installed and deployed with the following information:

    • IP addresses and ports of Graph and Meta services.
    • The user name and password with write permission to NebulaGraph.
  • Spark has been installed.
  • Learn about the Schema created in NebulaGraph, including names and properties of Tags and Edge types, and more.
  • If files are stored in HDFS, ensure that the Hadoop service is running normally.
  • If files are stored locally and NebulaGraph is a cluster architecture, you need to place the files in the same directory locally on each machine in the cluster.

Steps

Step 1: Create the Schema in NebulaGraph

Analyze the data to create a Schema in NebulaGraph by following these steps:

  1. Identify the Schema elements. The Schema elements in the NebulaGraph are shown in the following table.

    Element Name Property
    Tag player name string, age int
    Tag team name string
    Edge Type follow degree int
    Edge Type serve start_year int, end_year int
  2. Create a graph space basketballplayer in the NebulaGraph and create a Schema as shown below.

    ## Create a graph space.
    nebula> CREATE SPACE basketballplayer \
            (partition_num = 10, \
            replica_factor = 1, \
            vid_type = FIXED_STRING(30));
    
    ## Use the graph space basketballplayer.
    nebula> USE basketballplayer;
    
    ## Create the Tag player.
    nebula> CREATE TAG player(name string, age int);
    
    ## Create the Tag team.
    nebula> CREATE TAG team(name string);
    
    ## Create the Edge type follow.
    nebula> CREATE EDGE follow(degree int);
    
    ## Create the Edge type serve.
    nebula> CREATE EDGE serve(start_year int, end_year int);
    

For more information, see Quick start workflow.

Step 2: Process CSV files

Confirm the following information:

  1. Process CSV files to meet Schema requirements.

    Note

    Exchange supports uploading CSV files with or without headers.

  2. Obtain the CSV file storage path.

Step 3: Modify configuration files

After Exchange is compiled, copy the conf file target/classes/application.conf to set CSV data source configuration. In this example, the copied file is called csv_application.conf. For details on each configuration item, see Parameters in the configuration file.

{
  # Spark configuration
  spark: {
    app: {
      name: NebulaGraph Exchange 3.8.0
    }
    driver: {
      cores: 1
      maxResultSize: 1G
    }
    executor: {
        memory:1G
    }

    cores: {
      max: 16
    }
  }

  # NebulaGraph configuration
  nebula: {
    address:{
      # Specify the IP addresses and ports for Graph and Meta services.
      # If there are multiple addresses, the format is "ip1:port","ip2:port","ip3:port".
      # Addresses are separated by commas.
      graph:["127.0.0.1:9669"]
      # the address of any of the meta services.
      # if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta.
      meta:["127.0.0.1:9559"]
    }

    # The account entered must have write permission for the NebulaGraph space.
    user: root
    pswd: nebula
    # Whether to use a password encrypted with RSA.
    # enableRSA: true
    # The key used to encrypt the password using RSA.
    # privateKey: ""

    # Fill in the name of the graph space you want to write data to in the NebulaGraph.
    space: basketballplayer
    connection: {
      timeout: 3000
      retry: 3
    }
    execution: {
      retry: 3
    }
    error: {
      max: 32
      output: /tmp/errors
    }
    rate: {
      limit: 1024
      timeout: 1000
    }
  }

  # Processing vertexes
  tags: [
    # Set the information about the Tag player.
    {
      # Specify the Tag name defined in NebulaGraph.
      name: player
      type: {
        # Specify the data source file format to CSV.
        source: csv

        # Specify how to import the data into NebulaGraph: Client or SST.
        sink: client
      }

      # Specify the path to the CSV file.
      # If the file is stored in HDFS, use double quotation marks to enclose the file path, starting with hdfs://. For example: "hdfs://ip:port/xx/xx".
      # If the file is stored locally, use double quotation marks to enclose the file path, starting with file://. For example: "file:///tmp/xx.csv".
      path: "hdfs://192.168.*.*:9000/data/vertex_player.csv"

      # If the CSV file does not have a header, use [_c0, _c1, _c2, ..., _cn] to represent its header and indicate the columns as the source of the property values.
      # If the CSV file has headers, use the actual column names.
      fields: [_c1, _c2]

      # Specify the column names in the player table in fields, and their corresponding values are specified as properties in the NebulaGraph.
      # The sequence of fields and nebula.fields must correspond to each other.
      nebula.fields: [age, name]

      # Specify a column of data in the table as the source of vertex VID in the NebulaGraph.
      # The value of vertex must be the same as the column names in the above fields or csv.fields.
      # Currently, NebulaGraph master supports only strings or integers of VID.
      vertex: {
        field:_c0
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
      # prefix:"tag1"
      # Performs hashing operations on VIDs of type string.
      # policy:hash
      }

      # The delimiter specified. The default value is comma.
      separator: ","

      # If the CSV file has a header, set the header to true.
      # If the CSV file does not have a header, set the header to false. The default value is false.
      header: false

      # The filtering rule. The data that matches the filter rule is imported into NebulaGraph.
      # filter: "name='Tom'"

      # Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
      #writeMode: INSERT

      # Whether or not to delete the related incoming and outgoing edges of the vertices when performing a batch delete operation. This parameter takes effect when `writeMode` is `DELETE`.
      #deleteEdge: false

      # The number of data written to NebulaGraph in a single batch.
      batch: 256

      # The number of partitions to be created when the data is written to NebulaGraph.
      partition: 32
    }

    # Set the information about the Tag Team.
    {
      name: team
      type: {
        source: csv
        sink: client
      }
      path: "hdfs://192.168.*.*:9000/data/vertex_team.csv"
      fields: [_c1]
      nebula.fields: [name]
      vertex: {
        field:_c0
      }
      separator: ","
      header: false
      batch: 256
      partition: 32
    }


    # If more vertexes need to be added, refer to the previous configuration to add them.
  ]
  # Processing edges
  edges: [
    # Set the information about the Edge Type follow.
    {
      # Specify the Edge Type name defined in NebulaGraph.
      name: follow
      type: {
        # Specify the data source file format to CSV.
        source: csv

        # Specify how to import the data into NebulaGraph: Client or SST.
        sink: client
      }

      # Specify the path to the CSV file.
      # If the file is stored in HDFS, use double quotation marks to enclose the file path, starting with hdfs://. For example: "hdfs://ip:port/xx/xx".
      # If the file is stored locally, use double quotation marks to enclose the file path, starting with file://. For example: "file:///tmp/xx.csv".
      path: "hdfs://192.168.*.*:9000/data/edge_follow.csv"

      # If the CSV file does not have a header, use [_c0, _c1, _c2, ..., _cn] to represent its header and indicate the columns as the source of the property values.
      # If the CSV file has headers, use the actual column names.
      fields: [_c2]

      # Specify the column names in the edge table in fields, and their corresponding values are specified as properties in the NebulaGraph.
      # The sequence of fields and nebula.fields must correspond to each other.
      nebula.fields: [degree]

      # Specify a column as the source for the source and destination vertexes.
      # The value of vertex must be the same as the column names in the above fields or csv.fields.
      # Currently, NebulaGraph master supports only strings or integers of VID.
      source: {
        field: _c0
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
      # prefix:"tag1"
      # Performs hashing operations on VIDs of type string.
      # policy:hash
      }
      target: {
        field: _c1
      # udf:{
      #            separator:"_"
      #            oldColNames:[field-0,field-1,field-2]
      #            newColName:new-field
      #        }
      # Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
      # prefix:"tag1"
      # Performs hashing operations on VIDs of type string.
      # policy:hash
      }

      # The delimiter specified. The default value is comma.
      separator: ","

      # Specify a column as the source of the rank (optional).

      #ranking: rank

      # If the CSV file has a header, set the header to true.
      # If the CSV file does not have a header, set the header to false. The default value is false.
      header: false

      # The filtering rule. The data that matches the filter rule is imported into NebulaGraph.
      # filter: "name='Tom'"

      # Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
      #writeMode: INSERT

      # The number of data written to NebulaGraph in a single batch.
      batch: 256

      # The number of partitions to be created when the data is written to NebulaGraph.
      partition: 32
    }

    # Set the information about the Edge Type serve.
    {
      name: serve
      type: {
        source: csv
        sink: client
      }
      path: "hdfs://192.168.*.*:9000/data/edge_serve.csv"
      fields: [_c2,_c3]
      nebula.fields: [start_year, end_year]
      source: {
        field: _c0
      }
      target: {
        field: _c1
      }
      separator: ","
      header: false
      batch: 256
      partition: 32
    }

  ]
    # If more edges need to be added, refer to the previous configuration to add them.
}

Step 4: Import data into NebulaGraph

Run the following command to import CSV data into NebulaGraph. For descriptions of the parameters, see Options for import.

${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange.jar_path> -c <csv_application.conf_path> 

Note

JAR packages are available in two ways: compiled them yourself, or download the compiled .jar file directly.

For example:

${SPARK_HOME}/bin/spark-submit  --master "local" --class com.vesoft.nebula.exchange.Exchange  /root/nebula-exchange/nebula-exchange/target/nebula-exchange_spark_2.4-3.8.0.jar  -c /root/nebula-exchange/nebula-exchange/target/classes/csv_application.conf

You can search for batchSuccess.<tag_name/edge_name> in the command output to check the number of successes. For example, batchSuccess.follow: 300.

Access HDFS data with Kerberos certification

When using Kerberos for security certification, you can access the HDFS data in one of the following ways.

  • Configure the Kerberos configuration file in a command

    Configure --conf and --files in the command, for example:

    ${SPARK_HOME}/bin/spark-submit --master xxx  --num-executors 2 --executor-cores 2 --executor-memory 1g \
    --conf "spark.driver.extraJavaOptions=-Djava.security.krb5.conf=./krb5.conf" \
    --conf "spark.executor.extraJavaOptions=-Djava.security.krb5.conf=./krb5.conf" \
    --files /local/path/to/xxx.keytab,/local/path/to/krb5.conf \
    --class  com.vesoft.nebula.exchange.Exchange  \
    exchange.jar -c xx.conf
    

    The file path in --conf can be configured in two ways as follows:

    • Configure the absolute path to the file. All YARN or Spark machines are required to have the corresponding file in the same path.
    • (Recommended in YARN mode) Configure the relative path to the file (e.g. ./krb5.conf). The resource files uploaded via --files are located in the working directory of the Java virtual machine or JAR.

    The files in --files must be stored on the machine where the spark-submit command is executed.

  • Without commands

    Deploy the Spark and Kerberos-certified Hadoop in a same cluster to make them share HDFS and YARN, and then add the configuration export HADOOP_HOME=<hadoop_home_path> to spark-env.sh in Spark.

Step 5: (optional) Validate data

Users can verify that data has been imported by executing a query in the NebulaGraph client (for example, NebulaGraph Studio). For example:

LOOKUP ON player YIELD id(vertex);

Users can also run the SHOW STATS command to view statistics.

Step 6: (optional) Rebuild indexes in NebulaGraph

With the data imported, users can recreate and rebuild indexes in NebulaGraph. For details, see Index overview.


Last update: April 26, 2024