NebulaGraph Flink Connector¶
NebulaGraph Flink Connector is a connector that helps Flink users quickly access NebulaGraph. NebulaGraph Flink Connector supports reading data from the NebulaGraph database or writing other external data to the NebulaGraph database.
For more information, see NebulaGraph Flink Connector.
Use cases¶
NebulaGraph Flink Connector applies to the following scenarios:
- Read data from NebulaGraph for analysis and computation.
- Write data back to NebulaGraph after analysis and computation.
- Migrate the data of NebulaGraph.
Release note¶
Version compatibility¶
The correspondence between the NebulaGraph Flink Connector version and the NebulaGraph core version is as follows.
| Flink Connector version | NebulaGraph version | 
|---|---|
| 3.0-SNAPSHOT | nightly | 
| 3.8.0 | 3.x.x | 
| 3.5.0 | 3.x.x | 
| 3.3.0 | 3.x.x | 
| 3.0.0 | 3.x.x | 
| 2.6.1 | 2.6.0, 2.6.1 | 
| 2.6.0 | 2.6.0, 2.6.1 | 
| 2.5.0 | 2.5.0, 2.5.1 | 
| 2.0.0 | 2.0.0, 2.0.1 | 
Prerequisites¶
- Java 8 or later is installed.
- Flink 1.11.x is installed.
Get NebulaGraph Flink Connector¶
Configure Maven dependency¶
Add the following dependency to the Maven configuration file pom.xml to automatically obtain the Flink Connector.
<dependency>
    <groupId>com.vesoft</groupId>
    <artifactId>nebula-flink-connector</artifactId>
    <version>3.8.0</version>
</dependency>
Compile and package¶
Follow the steps below to compile and package the Flink Connector.
- 
Clone repository nebula-flink-connector.$ git clone -b release-3.8 https://github.com/vesoft-inc/nebula-flink-connector.git
- 
Enter the nebula-flink-connectordirectory.
- 
Compile and package. $ mvn clean package -Dmaven.test.skip=true
After compilation, a file similar to nebula-flink-connector-3.8.0.jar is generated in the directory connector/target of the folder.
How to use¶
Write data into NebulaGraph¶
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setGraphAddress("127.0.0.1:9669")
                .setMetaAddress("127.0.0.1:9559")
                .build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);
VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("player")
                .setIdIndex(0)
                .setFields(Arrays.asList("name", "age"))
                .setPositions(Arrays.asList(1, 2))
                .setBatchSize(2)
                .build();
NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(
                graphConnectionProvider, metaConnectionProvider, executionOptions);
NebulaSinkFunction<Row> nebulaSinkFunction = new NebulaSinkFunction<>(outputFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
            Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")
Read data from NebulaGraph¶
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
        .setMetaAddress("127.0.0.1:9559")
        .build();
storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
VertexExecutionOptions vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
        .setGraphSpace("flinkSource")
        .setTag("person")
        .setNoColumn(false)
        .setFields(Arrays.asList())
        .setLimit(100)
        .build();
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider)
        .setExecutionOptions(vertexExecutionOptions);
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
dataStreamSource.map(row -> {
    List<ValueWrapper> values = row.getValues();
    Row record = new Row(15);
    record.setField(0, values.get(0).asLong());
    record.setField(1, values.get(1).asString());
    record.setField(2, values.get(2).asString());
    record.setField(3, values.get(3).asLong());
    record.setField(4, values.get(4).asLong());
    record.setField(5, values.get(5).asLong());
    record.setField(6, values.get(6).asLong());
    record.setField(7, values.get(7).asDate());
    record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr());
    record.setField(9, values.get(9).asLong());
    record.setField(10, values.get(10).asBoolean());
    record.setField(11, values.get(11).asDouble());
    record.setField(12, values.get(12).asDouble());
    record.setField(13, values.get(13).asTime().getUTCTimeStr());
    record.setField(14, values.get(14).asGeography());
    return record;
}).print();
env.execute("NebulaStreamSource");
Parameter descriptions¶
- 
NebulaClientOptionsis the configuration for connecting to NebulaGraph, as described below.Parameter Type Required Description setGraphAddressString Yes The Graph service address of NebulaGraph. setMetaAddressString Yes The Meta service address of NebulaGraph. 
- 
VertexExecutionOptionsis the configuration for reading vertices from and writing vertices to NebulaGraph, as described below.Parameter Type Required Description setGraphSpaceString Yes The graph space name. setTagString Yes The tag name. setIdIndexInt Yes The subscript of the stream data field that is used as the VID when writing data to NebulaGraph. setFieldsList Yes A collection of the property names of a tag. It is used to write data to or read data from NebulaGraph. Make sure the setNoColumnisfalsewhen reading data; otherwise, the configuration is invalid. If this parameter is empty, all properties are read when reading data from NebulaGraph.setPositionsList Yes A collection of the subscripts of the stream data fields. It indicates that the corresponding field values are written to NebulaGraph as property values. This parameter needs to correspond to setFields.setBatchSizeString No The maximum number of data records to write to NebulaGraph at a time. The default value is 2000.setNoColumnString No The properties are not to be read if set to truewhen reading data. The default value isfalse.setLimitString No The maximum number of data records to pull at a time when reading data. The default value is 2000.
- 
EdgeExecutionOptionsis the configuration for reading edges from and writing edges to NebulaGraph, as described below.Parameter Type Required Description setGraphSpaceString Yes The graph space name. setEdgeString Yes The edge type name. setSrcIndexInt Yes The subscript of the stream data field that is used as the VID of the source vertex when writing data to NebulaGraph. setDstIndexInt Yes The subscript of the stream data field that is used as the VID of the destination vertex when writing data to NebulaGraph. setRankIndexInt Yes The subscript of the stream data field that is used as the rank of the edge when writing data to NebulaGraph. setFieldsList Yes A collection of the property names of an edge type. It is used to write data to or read data from NebulaGraph. Make sure the setNoColumnisfalsewhen reading data; otherwise, the configuration is invalid. If this parameter is empty, all properties are read when reading data from NebulaGraph.setPositionsList Yes A collection of the subscripts of the stream data fields. It indicates that the corresponding field values are written to NebulaGraph as property values. This parameter needs to correspond to setFields.setBatchSizeString No The maximum number of data records to write to NebulaGraph at a time. The default value is 2000.setNoColumnString No The properties are not to be read if set to truewhen reading data. The default value isfalse.setLimitString No The maximum number of data records to pull at a time when reading data. The default value is 2000.
Example¶
- 
Create a graph space. NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog( "NebulaCatalog", "default", "root", "nebula", "127.0.0.1:9559", "127.0.0.1:9669"); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() .build(); TableEnvironment tableEnv = TableEnvironment.create(settings); tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog); tableEnv.useCatalog(CATALOG_NAME); String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`" + " COMMENT 'space 1'" + " WITH (" + " 'partition_num' = '100'," + " 'replica_factor' = '3'," + " 'vid_type' = 'FIXED_STRING(10)'" + ")"; tableEnv.executeSql(createDataBase);
- 
Create a tag. tableEnvironment.executeSql("CREATE TABLE `person` (" + " vid BIGINT," + " col1 STRING," + " col2 STRING," + " col3 BIGINT," + " col4 BIGINT," + " col5 BIGINT," + " col6 BIGINT," + " col7 DATE," + " col8 TIMESTAMP," + " col9 BIGINT," + " col10 BOOLEAN," + " col11 DOUBLE," + " col12 DOUBLE," + " col13 TIME," + " col14 STRING" + ") WITH (" + " 'connector' = 'nebula'," + " 'meta-address' = '127.0.0.1:9559'," + " 'graph-address' = '127.0.0.1:9669'," + " 'username' = 'root'," + " 'password' = 'nebula'," + " 'data-type' = 'vertex'," + " 'graph-space' = 'flink_test'," + " 'label-name' = 'person'" + ")" );
- 
Create an edge type. tableEnvironment.executeSql("CREATE TABLE `friend` (" + " sid BIGINT," + " did BIGINT," + " rid BIGINT," + " col1 STRING," + " col2 STRING," + " col3 BIGINT," + " col4 BIGINT," + " col5 BIGINT," + " col6 BIGINT," + " col7 DATE," + " col8 TIMESTAMP," + " col9 BIGINT," + " col10 BOOLEAN," + " col11 DOUBLE," + " col12 DOUBLE," + " col13 TIME," + " col14 STRING" + ") WITH (" + " 'connector' = 'nebula'," + " 'meta-address' = '127.0.0.1:9559'," + " 'graph-address' = '127.0.0.1:9669'," + " 'username' = 'root'," + " 'password' = 'nebula'," + " 'graph-space' = 'flink_test'," + " 'label-name' = 'friend'," + " 'data-type'='edge'," + " 'src-id-index'='0'," + " 'dst-id-index'='1'," + " 'rank-id-index'='2'" + ")" );
- 
Queries the data of an edge type and inserts it into another edge type. Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`"); table.executeInsert("`friend_sink`").await();