Import data from Kafka¶
This topic provides a simple guide to importing Data stored on Kafka into Nebula Graph using Exchange.
Environment¶
This example is done on MacOS. Here is the environment configuration information:
- Hardware specifications:
- CPU: 1.7 GHz Quad-Core Intel Core i7
- memory: 16 GB
- Spark: 2.4.7, Stand-alone
- Nebula Graph: 2.5.1 (Deploy Nebula Graph with Docker Compose)
Prerequisites¶
Before importing data, you need to confirm the following information:
-
Nebula Graph has been installed and deployed with the following information:
- IP address and port of Graph and Meta services.
- User name and password with Nebula Graph write permission.
- Spark has been installed.
- Learn about the Schema created in Nebula Graph, including Tag and Edge type names, properties, and more.
- The Kafka service has been installed and started.
Steps¶
Step 1: Create the Schema in Nebula Graph¶
Analyze the data to create a Schema in Nebula Graph by following these steps:
-
Identify the Schema elements. The Schema elements in the Nebula Graph are shown in the following table.
Element name property Tag player
name string, age int
Tag team
name string
Edge Type follow
degree int
Edge Type serve
start_year int, end_year int
-
Create a graph space basketballplayer in the Nebula Graph and create a Schema as shown below.
## create graph space nebula> CREATE SPACE basketballplayer \ (partition_num = 10, \ replica_factor = 1, \ vid_type = FIXED_STRING(30)); ## use the graph space basketballplayer nebula> USE basketballplayer; ## create Tag player nebula> CREATE TAG player(name string, age int); ## create Tag team nebula> CREATE TAG team(name string); ## create Edge type follow nebula> CREATE EDGE follow(degree int); ## create Edge type serve nebula> CREATE EDGE serve(start_year int, end_year int);
For more information, see Quick start workflow.
Step 2: Modify configuration file¶
Note
If some data is stored in Kafka's value field, you need to modify the source code, get the value from Kafka, parse the value through the from_JSON function, and return it as a Dataframe.
After Exchange is compiled, copy the conf file target/classes/application.conf
settings Kafka data source configuration. In this case, the copied file is called kafka_application.conf
. For details on each configuration item, see Parameters in the configuration file.
{
# Spark configuration
spark: {
app: {
name: Nebula Exchange 2.5.1
}
driver: {
cores: 1
maxResultSize: 1G
}
cores {
max: 16
}
}
# Nebula Graph configuration
nebula: {
address:{
# Specify the IP addresses and ports for Graph and all Meta services.
# If there are multiple addresses, the format is "ip1:port","ip2:port","ip3:port".
# Addresses are separated by commas.
graph:["127.0.0.1:9669"]
meta:["127.0.0.1:9559"]
}
# The account entered must have write permission for the Nebula Graph space.
user: root
pswd: nebula
# Fill in the name of the graph space you want to write data to in the Nebula Graph.
space: basketballplayer
connection {
timeout: 3000
retry: 3
}
execution {
retry: 3
}
error: {
max: 32
output: /tmp/errors
}
rate: {
limit: 1024
timeout: 1000
}
}
# Processing vertex
tags: [
# Set information about Tag player.
{
name: player
type: {
# Specify the data source file format, set to Kafka.
source: kafka
# Specifies how to import the data into Nebula Graph: Client or SST.
sink: client
}
# Kafka server address.
service: "127.0.0.1:9092"
# Message category.
topic: "topic_name1"
# Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType.
# If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas.
# Specify the field name in fields, for example key for name in Nebula and value for age in Nebula, as shown in the following.
fields: [key,value]
nebula.fields: [name,age]
# Specify a column of data in the table as the source of vertex VID in the Nebula Graph.
# The key is the same as the value above, indicating that key is used as both VID and attribute name.
vertex:{
field:key
}
# Number of pieces of data written to Nebula Graph in a single batch.
batch: 10
# Number of Spark partitions
partition: 10
# Read message interval. Unit: second.
interval.seconds: 10
}
# Set Tag Team information.
{
name: team
type: {
source: kafka
sink: client
}
service: "127.0.0.1:9092"
topic: "topic_name2"
fields: [key]
nebula.fields: [name]
vertex:{
field:key
}
batch: 10
partition: 10
interval.seconds: 10
}
]
# Processing edge
edges: [
# Set information about Edge Type follow
{
# The corresponding Edge Type name in Nebula Graph.
name: follow
type: {
# Specify the data source file format, set to Kafka.
source: kafka
# Specifies how to import the data into Nebula Graph: Client or SST.
sink: client
}
# Kafka server address.
service: "127.0.0.1:9092"
# Message category.
topic: "topic_name3"
# Kafka data has a fixed domain name: key, value, topic, partition, offset, timestamp, timestampType.
# If multiple fields need to be specified after Spark reads as DataFrame, separate them with commas.
# Specify the field name in fields, for example key for degree in Nebula, as shown in the following.
fields: [key]
nebula.fields: [degree]
# In source, use a column in the topic as the source of the edge's starting vertex.
source:{
field:timestamp
}
# In target, use a column in the topic as the source of the edge's destination vertex.
target:{
field:offset
}
# Number of pieces of data written to Nebula Graph in a single batch.
batch: 10
# Number of Spark partitions
partition: 10
# Read message interval. Unit: second.
interval.seconds: 10
}
# Set information about Edge Type serve
{
name: serve
type: {
source: kafka
sink: client
}
service: "127.0.0.1:9092"
topic: "topic_name4"
fields: [timestamp,offset]
nebula.fields: [start_year,end_year]
source:{
field:key
}
target:{
field:value
}
batch: 10
partition: 10
interval.seconds: 10
}
]
}
Step 3: Import data into Nebula Graph¶
Run the following command to import Kafka data into Nebula Graph. For a description of the parameters, see Options for import.
${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange <nebula-exchange-2.5.1.jar_path> -c <kafka_application.conf_path>
Note
JAR packages are available in two ways: compiled them yourself, or download the compiled .jar
file directly.
Example:
${SPARK_HOME}/bin/spark-submit --master "local" --class com.vesoft.nebula.exchange.Exchange /root/nebula-spark-utils/nebula-exchange/target/nebula-exchange-2.5.1.jar -c /root/nebula-spark-utils/nebula-exchange/target/classes/kafka_application.conf
You can search for batchSuccess.<tag_name/edge_name>
in the command output to check the number of successes. For example, batchSuccess.follow: 300
.
Step 4: (optional) Validation data¶
Users can verify that data has been imported by executing a query in the Nebula Graph client (for example, Nebula Graph Studio). Such as:
GO FROM "player100" OVER follow;
Users can also run the SHOW STATS command to view statistics.
Step 5: (optional) Rebuild indexes in Nebula Graph¶
With the data imported, users can recreate and rebuild indexes in Nebula Graph. For details, see Index overview.