This document gives an introduction to the storage design of the graph database Nebula Graph.
The Storage Service of Nebula Graph is composed of two parts. One is
Meta Service that stores the meta data, the other is
Storage Service that stores the data. The two services are in two independent processes. The data directory and deployment are separated but their architectures are almost the same.
Fig. 1 The Architecture of Storage Service
As shown in Fig. 1, there are three layers in Storage Service. The bottom layer is the local storage engine, providing
delete operations on local data. The related interfaces are in
KVStore/KVEngine.h and users can develop their own local store plugins based on their needs. Currently, Nebula Graph provides store engine based on RocksDB.
Above the local storage engine is the consensus layer that implements multi group raft. Each partition corresponding to a Raft group, is for the data sharding. Currently, Nebula Graph uses
hash to shard data. When creating a space, users need to specify the partition number. Once set, partition number cannot be changed. Generally, the partition number must meet the need to scale-out in the future.
Above the consensus layer is the storage interface that defines a set of APIs that are related to graph. These API requests are translated into a set of kv operations to the corresponding partition. It is this layer that makes the storage service a real graph storage, otherwise it's just a kv storage. Nebula Graph doesn't use kv-store as an independent service as a graph query involves a lot of calculation that involves schema, which is not existed in the kv layer. Such architecture makes computation operation pushing down more easily.
Schema & Partition¶
As a graph database, Nebula Graph stores the vertices, edges and their properties. How to efficiently filtering or projecting is critical for a graph exploration.
Nebula Graph uses tags to indicate a vertex type. One vertex can have multiple types (and therefore multiple tags), and each tag defines its own properties. In the kv store, we use
vertex_ID + Tag_ID together as a key, and the corresponding value are the encoded property. The format is shown in Fig. 2:
Fig. 2 Vertex Key Format
Type: one byte, to indicate the key type. e.g. data, index, system, etc.
Part ID: three bytes, used to indicate the (sharding) partition id. It's designed for the data migration/balance operation by prefix-scanning all the data in a partition.
Vertex ID: eight bytes, used to indicate vertex ID. Two vertices with an identity vertexID are considered as the same one.
Tag ID: four bytes, used to indicate its tag's (encoded) ID.
Timestamp: eight bytes, not visible to users. Reserved for
Multiversion concurrency control (MVCC).
Each edge in Nebula Graph is modeled and stored as two independent key-values. One, namely the
out-edge, is stored in the same partition as the
source vertex. The other one, namely
in-edge, is stored in the same partition as the
destination vertex. So generally, out-key and in-key are in different partitions.
Between two vertices, edges with the same type are acceptable, and different types are legal as well. For example, by defining an edge type '
user A can transfer money to
user B at two timestamps. Thus a field, namely
rank, is added to (the key part of the timestamp to) distinguish which transfer records is referring. Edge key format is shown in Fig. 3:
Fig. 3 Edge Key Format
Type: one byte, used to indicate key type. E.g., data, index, system, etc.
Part ID: three bytes. The same as in Fig. 2.
Vertex ID: eight bytes, used to indicate
source vertex IDof an
out-edge(Fig. 4), and
destination vertex IDof an
in-edge(Fig. 5). See below.
Edge Type: four bytes, used to indicate (encoded) edge type id. A positive number means that this key is an
out-edge, and a negative number indicates that this is an
Rank: eight bytes, used in multiple edges with the same type. E.g., It can store transaction time, transaction amount, or edge weight.
Timestamp: eight bytes. The same as in Fig. 2.
Edge Type is positive, the corresponding edge key format is shown in Fig. 4; otherwise, the corresponding edge key format is shown in Fig. 5.
Fig. 4 Out-key format
Fig. 5 In-key format
Besides the key part above, the value part is the encoded properties (of a vertex or an edge). As a strong typed database, Nebula Graph gets the schema information from the Meta Service before encoding/decoding. And multi-version schema are also considered when altering schema.
Nebula Graph shards data through
modulo operation on
vertex ID. All the out-keys, in-keys and tag id are placed in the same partition. This improves query efficiency as a local/non-remote file access. Breadth-First-Search (BFS) expansion starting from a given vertex is a very common ad-hoc graph exploration. And during BFS, the performance of filtering out edge/vertex properties are time-consuming. Nebula Graph guarantees the operation efficiency by putting properties of a vertex and its edges locating near each other. It is worth noting that most graph databases vendors run their benchmarks with
Graph 500 or
Nebula Graph writes its own kv store to meet the performance needs:
- High performance, a pure high performance key value store.
- Provided as a library, as a strong typed database, the performance of storage layer is key to Nebula Graph.
- Strong data consistency, since Nebula Graph is a distribution system.
- Written in C++, as most of our developers are C++ programers.
For users who are not sensitive to performance or unwilling to migrate data from other storage systems, such as HBase or MySQL, Nebula Graph also provides a plugin over the kv store to replace its default RocksDB. Currently, HBase plugin has been released yet.
As RocksDB is the local storage engine, Nebula Graph can manage multiple hard disks to take full use of the parallel IO access. What a user needs to do is to configure multiple data directories.
Nebula Graph manages the distributed kv store in with meta service. All the partition distribution and cluster machine status can be found in the meta service. Users can input commands in the console to add or remove machines to generate and execute a balance plan in meta service.
Nebula Graph writes its own (Write-Ahead-Log, WAL) module to replace the default one in RocksDB. Since the WAL is used for (distributed system's) Raft consensus. Each partition has a WAL, so after a (crash and) reboot, the partition can catch up its own data, and there is no need to split WAL between several partitions.
Besides, Nebula Graph defines a special category, namely
Command Operation Log, to conduct some command operations. These logs are very short, with no real data, and are only used to inform all replicas to execute certain command operations with raft protocol. What's more, since the logs are serialized in the Raft protocol, Nebula Graph also provides another class, namely
Atomic Operation Log, to conduct the atomic operation between the replicas of a partitions. E.g., the compare-and-set (CAS) or read-modify-write operations are atomic in Nebula Graph per partition.
A Nebula Graph cluster can have multiple individual graph spaces. Each space has its own partition number and replica copies. Different spaces are isolated physically from each other in the same cluster. Besides, the spaces can also have very different storage engines and sharding strategies. E.g., One space can use HBase as its storage backend with alphabet ranging sharding, and the other space uses the default RocksDB with hashing sharding. And these two spaces are running in the same Nebula Graph cluster.
This part gives some details on how the raft protocol is implemented in Nebula Graph.
Multi Raft Group¶
According to Raft requirement, the log ID must be in a sequential order. Therefore, almost all the raft implementations will use
Multi Raft Group to increase the concurrency. Therefore, the number of partition will determine how many operations can be executed simultaneously. But you can not simply add too much partitions in the system, which can have some side affects. Each raft group stores many state information and (as mentioned earlier) it has a WAL file. Thus, the more partitions, the more footprint costs. Also, if the work load is low, the batch operation can not gain from the parallel. E.g., consider a system with ten thousand partitions. For every second, there are about ten thousands write-in requests. You can calculate that in average, for every partition, there is only one write-in request. So from the client side, it's a 100k batch write. But from the partition side, it's a single write.
There are two key challenges to implement the Multi Raft Group. First one is how to share the transport layer. Because each Raft Group sends messages to its corresponding peers, if the transport layer cannot be shared, the connection costs will be very high. Second one is how to design the multi-threading model. Raft Groups share the same thread pool to prevent starting too many threads and a high context switch cost.
For each Partition, it is necessary to do batch multiple operations together to improve throughput when writing WAL serially. In general, there is nothing special about batch, but Nebula Graph designs some special types of WAL based on each part serialization, which brings some challenges.
For example, Nebula Graph uses WAL to implement lock-free CAS operations. And every CAS operation will be executed until the previous WAL has been committed. So for a batch, if there are some logs contain CAS operation, we need to divide this batch into several smaller (sub)groups. And make sure these (sub)groups are executed in sequential order.
When a new machine is added to a cluster, it has to catch up data for quite a long time. And there may be accidents during this process. If this one directly joins the raft group as a follower role, it will dramatically reduce the availability of the entire cluster. Nebula Graph introduces the learner role, and it is implemented by the command WAL mentioned above. When a leader is writing WAL and meets an
add learner command, it will add the new coming-in learner to its peers list and mark it as a learner. The logs will send to all the peers, both the followers and the learner. But the learner can not vote for the leader's election.
Transfer leadership is extremely important during a data balance operation. When migrating a partition from one machine to another, Nebula Graph will first check if it is a leader. If so, another follower should be elected as a leader before the migration. Otherwise, the cluster service is affected since the leader is on migration. After the migration is done, a
BALANCE LEADER command is invoked, so that the work load on each machine can be balanced.
When transferring leadership, it is worth noting the timing when a leader abandons the leadership and when all the followers start a leader election. When a
transfer leadership command is committed, from the leader's view, it loses the leadership. From other followers' view, when receiving this command, it starts a new leader election. These two operations must be executed in the same process with a normal raft leader election. Otherwise, some corner cases can occur and they are very hard to test.
To avoid the brain-split, when Raft Group members changed, an intermediate state is required. In such state, the majority of the old group and new group always have an overlap. This majority overlap will prevent neither group from making decisions unilaterally. This is the
joint consensus as mentioned in the famous Raft thesis. To make it even simpler, Diego Ongaro suggests to add or remove only one peer at a time to ensure the overlap between the majority in his doctoral thesis. Nebula Graph's implementation also uses this approach, except that the implementation to add or remove member is different. For details, please refer to
addPeer/removePeer in Raft Part source code.
Take snapshot is a common command during daily DBA operations. But snapshot operation will introduce extra challenges when considering together with the raft protocol. It's very error-prone.
E.g., what if the leader loses its leadership in an election when sending a snapshot command. What should we do. In this situation, the follower may only receive half log of the snapshot command, should we cleanup and rollback? Because multiple partitions share a single storage, how to clean up the data is a cumbersome work. In addition, the snapshot process will start a heavy write to disks. To avoid slow down the frontend reads and writes, we do not want snapshot process to share the same IO threadPool with the normal Raft logs. Besides, snapshot also requires large footprint, which is critical for online service performance.
The Interfaces of Storage Service layer are:
Insert vertex/edge: insert a vertex or edge and its properties.
getNeighbors: get the in-edge or out-edge from a set of vertices. And return the edges and properties. Condition filtering are also considered.
getProps: get the properties of a vertex or an edge.
Graph semantics interfaces are translated into kv operations in this layer as well. In order to improve the performance, concurrent operations are also implemented in this layer.
Nebula Graph wrap up a set of meta-related interfaces from the kv store interface (as mentioned earlier). Meta service can support CRUD operation on schema, cluster administration and user privileges. Meta service can be deployed on a single host, but it is recommended to deploy on multiple hosts with at least three or five replicas to get a better availability and fault tolerance.