Import data from Hive¶
This topic provides an example of how to use Exchange to import NebulaGraph data stored in Hive.
Data set¶
This topic takes the basketballplayer dataset as an example.
In this example, the data set has been stored in Hive. All vertexes and edges are stored in the player
, team
, follow
, and serve
tables. The following are some of the data for each table.
scala> spark.sql("describe basketball.player").show
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|playerid| string| null|
| age| bigint| null|
| name| string| null|
+--------+---------+-------+
scala> spark.sql("describe basketball.team").show
+----------+---------+-------+
| col_name|data_type|comment|
+----------+---------+-------+
| teamid| string| null|
| name| string| null|
+----------+---------+-------+
scala> spark.sql("describe basketball.follow").show
+----------+---------+-------+
| col_name|data_type|comment|
+----------+---------+-------+
|src_player| string| null|
|dst_player| string| null|
| degree| bigint| null|
+----------+---------+-------+
scala> spark.sql("describe basketball.serve").show
+----------+---------+-------+
| col_name|data_type|comment|
+----------+---------+-------+
| playerid| string| null|
| teamid| string| null|
|start_year| bigint| null|
| end_year| bigint| null|
+----------+---------+-------+
Note
The Hive data type bigint
corresponds to the NebulaGraph int
.
Environment¶
This example is done on MacOS. Here is the environment configuration information:
- Hardware specifications:
- CPU: 1.7 GHz Quad-Core Intel Core i7
- Memory: 16 GB
- Spark: 2.4.7, stand-alone
- Hadoop: 2.9.2, pseudo-distributed deployment
- Hive: 2.3.7, Hive Metastore database is MySQL 8.0.22
- NebulaGraph: 3.8.0. Deploy NebulaGraph with Docker Compose.
Prerequisites¶
Before importing data, you need to confirm the following information:
-
NebulaGraph has been installed and deployed with the following information:
- IP addresses and ports of Graph and Meta services.
- The user name and password with write permission to NebulaGraph.
- Spark has been installed.
- Learn about the Schema created in NebulaGraph, including names and properties of Tags and Edge types, and more.
- The Hive Metastore database (MySQL in this example) has been started.
Steps¶
Step 1: Create the Schema in NebulaGraph¶
Analyze the data to create a Schema in NebulaGraph by following these steps:
-
Identify the Schema elements. The Schema elements in the NebulaGraph are shown in the following table.
Element Name Property Tag player
name string, age int
Tag team
name string
Edge Type follow
degree int
Edge Type serve
start_year int, end_year int
-
Create a graph space basketballplayer in the NebulaGraph and create a Schema as shown below.
## Create a graph space nebula> CREATE SPACE basketballplayer \ (partition_num = 10, \ replica_factor = 1, \ vid_type = FIXED_STRING(30)); ## Use the graph space basketballplayer nebula> USE basketballplayer; ## Create the Tag player nebula> CREATE TAG player(name string, age int); ## Create the Tag team nebula> CREATE TAG team(name string); ## Create the Edge type follow nebula> CREATE EDGE follow(degree int); ## Create the Edge type serve nebula> CREATE EDGE serve(start_year int, end_year int);
For more information, see Quick start workflow.
Step 2: Use Spark SQL to confirm Hive SQL statements¶
After the Spark-shell environment is started, run the following statements to ensure that Spark can read data in Hive.
scala> sql("select playerid, age, name from basketball.player").show
scala> sql("select teamid, name from basketball.team").show
scala> sql("select src_player, dst_player, degree from basketball.follow").show
scala> sql("select playerid, teamid, start_year, end_year from basketball.serve").show
The following is the result read from the table basketball.player
.
+---------+----+-----------------+
| playerid| age| name|
+---------+----+-----------------+
|player100| 42| Tim Duncan|
|player101| 36| Tony Parker|
|player102| 33|LaMarcus Aldridge|
|player103| 32| Rudy Gay|
|player104| 32| Marco Belinelli|
+---------+----+-----------------+
...
Step 3: Modify configuration file¶
After Exchange is compiled, copy the conf file target/classes/application.conf
to set Hive data source configuration. In this example, the copied file is called hive_application.conf
. For details on each configuration item, see Parameters in the configuration file.
{
# Spark configuration
spark: {
app: {
name: NebulaGraph Exchange 3.8.0
}
driver: {
cores: 1
maxResultSize: 1G
}
cores: {
max: 16
}
}
# If Spark and Hive are deployed in different clusters, you need to configure the parameters for connecting to Hive. Otherwise, skip these configurations.
#hive: {
# waredir: "hdfs://NAMENODE_IP:9000/apps/svr/hive-xxx/warehouse/"
# connectionURL: "jdbc:mysql://your_ip:3306/hive_spark?characterEncoding=UTF-8"
# connectionDriverName: "com.mysql.jdbc.Driver"
# connectionUserName: "user"
# connectionPassword: "password"
#}
# NebulaGraph configuration
nebula: {
address:{
# Specify the IP addresses and ports for Graph and all Meta services.
# If there are multiple addresses, the format is "ip1:port","ip2:port","ip3:port".
# Addresses are separated by commas.
graph:["127.0.0.1:9669"]
# the address of any of the meta services.
# if your NebulaGraph server is in virtual network like k8s, please config the leader address of meta.
meta:["127.0.0.1:9559"]
}
# The account entered must have write permission for the NebulaGraph space.
user: root
pswd: nebula
# Whether to use a password encrypted with RSA.
# enableRSA: true
# The key used to encrypt the password using RSA.
# privateKey: ""
# Fill in the name of the graph space you want to write data to in the NebulaGraph.
space: basketballplayer
connection: {
timeout: 3000
retry: 3
}
execution: {
retry: 3
}
error: {
max: 32
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing vertexes
tags: [
# Set the information about the Tag player.
{
# The Tag name in NebulaGraph.
name: player
type: {
# Specify the data source file format to Hive.
source: hive
# Specify how to import the data into NebulaGraph: Client or SST.
sink: client
}
# Set the SQL statement to read the data of player table in basketball database.
exec: "select playerid, age, name from basketball.player"
# Specify the column names in the player table in fields, and their corresponding values are specified as properties in the NebulaGraph.
# The sequence of fields and nebula.fields must correspond to each other.
# If multiple column names need to be specified, separate them by commas.
fields: [age,name]
nebula.fields: [age,name]
# Specify a column of data in the table as the source of vertex VID in the NebulaGraph.
vertex:{
field:playerid
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
# Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
# prefix:"tag1"
# Performs hashing operations on VIDs of type string.
# policy:hash
}
# The filtering rule. The data that matches the filter rule is imported into NebulaGraph.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
# Whether or not to delete the related incoming and outgoing edges of the vertices when performing a batch delete operation. This parameter takes effect when `writeMode` is `DELETE`.
#deleteEdge: false
# The number of data written to NebulaGraph in a single batch.
batch: 256
# The number of partitions to be created when the data is written to NebulaGraph.
partition: 32
}
# Set the information about the Tag Team.
{
name: team
type: {
source: hive
sink: client
}
exec: "select teamid, name from basketball.team"
fields: [name]
nebula.fields: [name]
vertex: {
field: teamid
}
batch: 256
partition: 32
}
]
# Processing edges
edges: [
# Set the information about the Edge Type follow.
{
# The corresponding Edge Type name in NebulaGraph.
name: follow
type: {
# Specify the data source file format to Hive.
source: hive
# Specify how to import the Edge type data into NebulaGraph.
# Specify how to import the data into NebulaGraph: Client or SST.
sink: client
}
# Set the SQL statement to read the data of follow table in the basketball database.
exec: "select src_player, dst_player, degree from basketball.follow"
# Specify the column names in the follow table in Fields, and their corresponding values are specified as properties in the NebulaGraph.
# The sequence of fields and nebula.fields must correspond to each other.
# If multiple column names need to be specified, separate them by commas.
fields: [degree]
nebula.fields: [degree]
# In source, use a column in the follow table as the source of the edge's starting vertex.
# In target, use a column in the follow table as the source of the edge's destination vertex.
source: {
field: src_player
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
# Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
# prefix:"tag1"
# Performs hashing operations on VIDs of type string.
# policy:hash
}
target: {
field: dst_player
# udf:{
# separator:"_"
# oldColNames:[field-0,field-1,field-2]
# newColName:new-field
# }
# Add the specified prefix to the VID. For example, if the VID is `12345`, adding the prefix `tag1` will result in `tag1_12345`. The underscore cannot be modified.
# prefix:"tag1"
# Performs hashing operations on VIDs of type string.
# policy:hash
}
# (Optional) Specify a column as the source of the rank.
#ranking: rank
# The filtering rule. The data that matches the filter rule is imported into NebulaGraph.
# filter: "name='Tom'"
# Batch operation types, including INSERT, UPDATE, and DELETE. defaults to INSERT.
#writeMode: INSERT
# The number of data written to NebulaGraph in a single batch.
batch: 256
# The number of partitions to be created when the data is written to NebulaGraph.
partition: 32
}
# Set the information about the Edge Type serve.
{
name: serve
type: {
source: hive
sink: client
}
exec: "select playerid, teamid, start_year, end_year from basketball.serve"
fields: [start_year,end_year]
nebula.fields: [start_year,end_year]
source: {
field: playerid
}
target: {
field: teamid
}
# (Optional) Specify a column as the source of the rank.
#ranking: rank
batch: 256
partition: 32
}
]
}
Step 4: Import data into NebulaGraph¶
Run the following command to import Hive data into NebulaGraph. For a description of the parameters, see Options for import.
${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange.jar_path> -c <hive_application.conf_path> -h
Note
JAR packages are available in two ways: compiled them yourself, or download the compiled .jar
file directly.
For example:
${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange /root/nebula-exchange/nebula-exchange/target/nebula-exchange_spark_2.4-3.8.0.jar -c /root/nebula-exchange/nebula-exchange/target/classes/hive_application.conf -h
You can search for batchSuccess.<tag_name/edge_name>
in the command output to check the number of successes. For example, batchSuccess.follow: 300
.
Access HDFS data with Kerberos certification¶
When using Kerberos for security certification, you can access the HDFS data in one of the following ways.
-
Configure the Kerberos configuration file in a command
Configure
--conf
and--files
in the command, for example:${SPARK_HOME}/bin/spark-submit --master xxx --num-executors 2 --executor-cores 2 --executor-memory 1g \ --conf "spark.driver.extraJavaOptions=-Djava.security.krb5.conf=./krb5.conf" \ --conf "spark.executor.extraJavaOptions=-Djava.security.krb5.conf=./krb5.conf" \ --files /local/path/to/xxx.keytab,/local/path/to/krb5.conf \ --class com.vesoft.nebula.exchange.Exchange \ exchange.jar -c xx.conf
The file path in
--conf
can be configured in two ways as follows:- Configure the absolute path to the file. All YARN or Spark machines are required to have the corresponding file in the same path.
- (Recommended in YARN mode) Configure the relative path to the file (e.g.
./krb5.conf
). The resource files uploaded via--files
are located in the working directory of the Java virtual machine or JAR.
The files in
--files
must be stored on the machine where thespark-submit
command is executed.
-
Without commands
Deploy the Spark and Kerberos-certified Hadoop in a same cluster to make them share HDFS and YARN, and then add the configuration
export HADOOP_HOME=<hadoop_home_path>
tospark-env.sh
in Spark.
Step 5: (optional) Validate data¶
Users can verify that data has been imported by executing a query in the NebulaGraph client (for example, NebulaGraph Studio). For example:
LOOKUP ON player YIELD id(vertex);
Users can also run the SHOW STATS command to view statistics.
Step 6: (optional) Rebuild indexes in NebulaGraph¶
With the data imported, users can recreate and rebuild indexes in NebulaGraph. For details, see Index overview.