Nebula Spark Connector¶
Nebula Spark Connector is a Spark connector application for reading and writing NebulaGraph data in Spark standard format. Nebula Spark Connector consists of two parts: Reader and Writer.
-
Reader
Provides a Spark SQL interface. This interface can be used to read NebulaGraph data. It reads one vertex or edge type data at a time and assemble the result into a Spark DataFrame.
-
Writer
Provides a Spark SQL interface. This interface can be used to write DataFrames into NebulaGraph in a row-by-row or batch-import way.
For more information, see Nebula Spark Connector.
Use cases¶
Nebula Spark Connector applies to the following scenarios:
- Migrate data between different NebulaGraph clusters.
- Migrate data between different graph spaces in the same NebulaGraph cluster.
- Migrate data between NebulaGraph and other data sources.
- Graph computing with Nebula Algorithm.
Benefits¶
The features of Nebula Spark Connector 3.0.0 are as follows:
- Supports multiple connection settings, such as timeout period, number of connection retries, number of execution retries, etc.
- Supports multiple settings for data writing, such as setting the corresponding column as vertex ID, starting vertex ID, destination vertex ID or attributes.
- Supports non-attribute reading and full attribute reading.
- Supports reading NebulaGraph data into VertexRDD and EdgeRDD, and supports non-Long vertex IDs.
- Unifies the extended data source of SparkSQL, and uses DataSourceV2 to extend NebulaGraph data.
- Three write modes,
insert,updateanddelete, are supported.insertmode will insert (overwrite) data,updatemode will only update existing data, anddeletemode will only delete data.
Release note¶
Get Nebula Spark Connector¶
Compile package¶
Note
Install Nebula Spark Connector of version 2.4.x.
-
Clone repository
nebula-spark-connector.$ git clone -b v3.0.0 https://github.com/vesoft-inc/nebula-spark-connector.git -
Make the
nebula-spark-connectordirectory the current working directory.$ cd nebula-spark-connector/nebula-spark-connector -
Compile package.
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true
After compilation, a similar file nebula-spark-connector-3.0.0-SHANPSHOT.jar is generated in the directory nebula-spark-connector/nebula-spark-connector/target/.
Download maven remote repository¶
How to use¶
When using Nebula Spark Connector to reading and writing NebulaGraph data, You can refer to the following code.
# Read vertex and edge data from NebulaGraph.
spark.read.nebula().loadVerticesToDF()
spark.read.nebula().loadEdgesToDF()
# Write dataframe data into NebulaGraph as vertex and edges.
dataframe.write.nebula().writeVertices()
dataframe.write.nebula().writeEdges()
nebula() receives two configuration parameters, including connection configuration and read-write configuration.
Reading data from NebulaGraph¶
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withConenctionRetry(2)
.withExecuteRetry(2)
.withTimeout(6000)
.build()
val nebulaReadVertexConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("person")
.withNoColumn(false)
.withReturnCols(List("birthday"))
.withLimit(10)
.withPartitionNum(10)
.build()
val vertex = spark.read.nebula(config, nebulaReadVertexConfig).loadVerticesToDF()
val nebulaReadEdgeConfig: ReadNebulaConfig = ReadNebulaConfig
.builder()
.withSpace("test")
.withLabel("knows")
.withNoColumn(false)
.withReturnCols(List("degree"))
.withLimit(10)
.withPartitionNum(10)
.build()
val edge = spark.read.nebula(config, nebulaReadEdgeConfig).loadEdgesToDF()
-
NebulaConnectionConfigis the configuration for connecting to the NebulaGraph, as described below.Parameter Required Description withMetaAddressYes Specifies the IP addresses and ports of all Meta Services. Separate multiple addresses with commas. The format is ip1:port1,ip2:port2,.... Read data is no need to configurewithGraphAddress.withConnectionRetryNo The number of retries that the Nebula Java Client connected to the NebulaGraph. The default value is 1.withExecuteRetryNo The number of retries that the Nebula Java Client executed query statements. The default value is 1.withTimeoutNo The timeout for the Nebula Java Client request response. The default value is 6000, Unit: ms.
-
ReadNebulaConfigis the configuration to read NebulaGraph data, as described below.Parameter Required Description withSpaceYes NebulaGraph space name. withLabelYes The Tag or Edge type name within the NebulaGraph space. withNoColumnNo Whether the property is not read. The default value is false, read property. If the value istrue, the property is not read, thewithReturnColsconfiguration is invalid.withReturnColsNo Configures the set of properties for vertex or edges to read. the format is List(property1,property2,...), The default value isList(), indicating that all properties are read.withLimitNo Configure the number of rows of data read from the server by the Nebula Java Storage Client at a time. The default value is 1000.withPartitionNumNo Configures the number of Spark partitions to read the NebulaGraph data. The default value is 100. This value should not exceed the number of slices in the graph space (partition_num).
Write data into NebulaGraph¶
Note
The values of columns in a dataframe are automatically written to the NebulaGraph as property values.
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.withConenctionRetry(2)
.build()
val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidPolicy("hash")
.withVidAsProp(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
.builder()
.withSpace("test")
.withEdge("friend")
.withSrcIdField("src")
.withSrcPolicy(null)
.withDstIdField("dst")
.withDstPolicy(null)
.withRankField("degree")
.withSrcAsProperty(true)
.withDstAsProperty(true)
.withRankAsProperty(true)
.withUser("root")
.withPasswd("nebula")
.withBatch(1000)
.build()
df.write.nebula(config, nebulaWriteEdgeConfig).writeEdges()
The default write mode is insert, which can be changed to update via withWriteMode configuration:
val config = NebulaConnectionConfig
.builder()
.withMetaAddress("127.0.0.1:9559")
.withGraphAddress("127.0.0.1:9669")
.build()
val nebulaWriteVertexConfig = WriteNebulaVertexConfig
.builder()
.withSpace("test")
.withTag("person")
.withVidField("id")
.withVidAsProp(true)
.withBatch(1000)
.withWriteMode(WriteMode.UPDATE)
.build()
df.write.nebula(config, nebulaWriteVertexConfig).writeVertices()
-
NebulaConnectionConfigis the configuration for connecting to the NebulaGraph, as described below.Parameter Required Description withMetaAddressYes Specifies the IP addresses and ports of all Meta Services. Separate multiple addresses with commas. The format is ip1:port1,ip2:port2,....withGraphAddressYes Specifies the IP addresses and ports of Graph Services. Separate multiple addresses with commas. The format is ip1:port1,ip2:port2,....withConnectionRetryNo Number of retries that the Nebula Java Client connected to the NebulaGraph. The default value is 1.
-
WriteNebulaVertexConfigis the configuration of the write vertex, as described below.Parameter Required Description withSpaceYes NebulaGraph space name. withTagYes The Tag name that needs to be associated when a vertex is written. withVidFieldYes The column in the DataFrame as the vertex ID. withVidPolicyNo When writing the vertex ID, NebulaGraph use mapping function, supports HASH only. No mapping is performed by default. withVidAsPropNo Whether the column in the DataFrame that is the vertex ID is also written as an property. The default value is false. If set totrue, make sure the Tag has the same property name asVidField.withUserNo NebulaGraph user name. If authentication is disabled, you do not need to configure the user name and password. withPasswdNo The password for the NebulaGraph user name. withBatchYes The number of rows of data written at a time. The default value is 1000.withWriteModeNo Write mode. The optional values are insertandupdate. The default value isinsert.
-
WriteNebulaEdgeConfigis the configuration of the write edge, as described below.Parameter Required Description withSpaceYes NebulaGraph space name. withEdgeYes The Edge type name that needs to be associated when a edge is written. withSrcIdFieldYes The column in the DataFrame as the vertex ID. withSrcPolicyNo When writing the starting vertex ID, NebulaGraph use mapping function, supports HASH only. No mapping is performed by default. withDstIdFieldYes The column in the DataFrame that serves as the destination vertex. withDstPolicyNo When writing the destination vertex ID, NebulaGraph use mapping function, supports HASH only. No mapping is performed by default. withRankFieldNo The column in the DataFrame as the rank. Rank is not written by default. withSrcAsPropertyNo Whether the column in the DataFrame that is the starting vertex is also written as an property. The default value is false. If set totrue, make sure Edge type has the same property name asSrcIdField.withDstAsPropertyNo Whether column that are destination vertex in the DataFrame are also written as property. The default value is false. If set totrue, make sure Edge type has the same property name asDstIdField.withRankAsPropertyNo Whether column in the DataFrame that is the rank is also written as property.The default value is false. If set totrue, make sure Edge type has the same property name asRankField.withUserNo NebulaGraph user name. If authentication is disabled, you do not need to configure the user name and password. withPasswdNo The password for the NebulaGraph user name. withBatchYes The number of rows of data written at a time. The default value is 1000.withWriteModeNo Write mode. The optional values are insertandupdate. The default value isinsert.