How to count google.com hits? What about how to store “likes” of popular users? In this article we will consider solutions for these tasks using CRDT (Conflict-free Replicated Data Types) and also a more general case — how to synchronize replicas in a multi-leader distributed system.
1. Introduction:
We used to use applications such us calendar or Evernote for a long time. They have something in common — at the same time all of them allow (in any combination)
- to work offline
- to access from different devices
- several people to modify the same data
The task the developers of those systems have to solve is how to ensure “smooth” data synchronization in such cases. Ideally, user interaction should not be required.
In the previous article we’ve considered an approach to solve a similar task — Operational Transformation. This time we are going to consider another approach which introduces a special set of base data types which resolves merge conflicts by themselves.
2. Strong eventual consistency:
Many works and many researches have been done recently about eventual consistency. In my opinion, the current trend is to shift from strong consistency to other possible consistency variations, to research what consistency model fits best in a particular system/situation, to rethink current definitions.
That leads us to some inconsistency, for example when some researchers consider an eventual consistency with a special property but at the same time, other authors already created a definition for that particular case.
There is a question raised by authors of one research about current definition of the eventual consistency. According to it, if your system always returns “42” to all requests — all is OK, it is eventually consistent.
I will use the following terminology without breaking the correctness either of this article or of original researches (please note these are not definitions!)
- Strong consistency (SC): all write operations are done strictly sequentially, read request on any replicas returns the same, last written result. A real-time consensus (with all its following consequences) is required to solve conflicts, allow n/2–1 nodes to be down
- Eventual consistency (EC): make updates on the local, then propagate updates. Read on some replicas can return obsolete state. Rollback or somehow decide what to do in case of conflicts. That means we still need consensus, but not in the real-time
- Strong eventual consistency (SEC): EC + replicas have a recipe to solve conflicts automatically. Therefore we do not require a consensus. Allows n-1 nodes to be down.
A quick note: if we loosen SC requirement in the CAP theorem then SEC solves it — all properties are satisfied.
So, we are happy to sacrifice SC and we want a set of base data types for our unstable and often partitioned distributed system. Also, we want those data types to resolve conflicts for us so we don’t need to interact with a user or query an arbiter node.
3. “Likes and hits” problems:
Of course, there are several ways to solve those problems, CRDT offers a simple and elegant solution.
Count google.com hits:
Google.com serves approximately 150000 requests per second from all places on Earth. It is clear — we need to update the counter asynchronously. Queues will help, but not entirely — imagine if we exposed an API to get counter’s value then we would need to do a replication, otherwise read requests can put the system down.
If we already do the replication — can we avoid queues?
Count a user’s likes:
The problem is similar to the previous one, but this time we need to count unique hits.
4. Terminology:
It is advised to be familiar with the following terminology:
- Idempotence The result doesn’t change if you apply the same operation several times.
Example:
- GET request
- f(x) = x + 0
- Commutative property f(x, y) = f(y, x)
- Partial order Reflexivity + Transitivity + Antisymmetry
- Semilattice Partially ordered set with a least upper bound
- Version vector
A vector with a size of the amount of nodes. Each node increments its vector element in case of some predefined events. During synchronization, these vectors are sent together with the payload and then by examining which node has greater value it can be decided which node has new/obsolete values. Thus it defines a total order.
5. Synchronisation models:
State-based:
Also called a passive synchronisation, forms Convergent Replicated Data Type — CvRDT.
Used in such file systems as NFS, AFS, Coda and in such KV-storages like Riak, Dynamo.
In this case, replicas propagated changes by sending a full state of the objects. A merge() function must be defined to merge incoming changes with the current state.
Following requirements have to be satisfied to ensure convergence for replicas:
- Data type (or states on replicas) forms a semilattice
- merge() function produces a least upper bound
- Replicas form a connected graph
Example:
- Data type: set of natural numbers N
- Minimal element — negative infinity
- merge(x, y) = max(x, y)
Such requirements give us a commutative and idempotent merge() function, which also is a monotonically increasing function on the given data type.
That guarantees all replicas will eventually converge and allows us not to worry about a transmission protocol — we can lose propagation updates, we can send them several times or even send them in any order.
Operation-based:
Also called an active synchronisation, forms Commutative Replicated Data Type — CmRDT.
Used in such cooperative systems, like Bayou, Rover, IceCube, Telex.
In this case replica propagates changes by sending the operation to all replicas. When a change is made on a replica, it:
- Executes generate() method which returns an effector() function to be called on other replicas. In other words, effector() is a closure to modify state on other replicas.
- Applies effector() to the local state.
- Propagates effector() to all other replicas
Following requirements have to be satisfied to ensure convergence for replicas:
- Reliable transmission protocol
- If effector() is delivered in causal order then concurrent effector() have to commute OR
- If effector() is delivered without respecting causal order then all effecter() have to commute
- effector() has to be idempotent if it can be delivered more than once.
Some realisations rely on reliable publish-subscribe systems (Kafka) as a part of the delivery.
Delta-based:
It can be easily noted considering State/Op based synchronisations that it doesn’t make sense to transmit a whole object’s state if a change affects only a part of it. Also, we can periodically send only one aggregated state if updates modify the same state — like counters.
Delta synchronisation combines both State/op approaches and propagates so-called delta-mutators which update the state accordingly to the last synchronisation date. You need to send a full state in case of the first ever synchronisation, however, some realisations actually consider the state of the remote replicas to lower the amount of needed data.
If delays are allowed then an op-based log compression could be the next optimization:
Pure operation-based:
There is a delay in the op-based synchronization to create an effector(). In some systems such delays are unacceptable. In this case an update must be propagated immediately at the cost of more complex organization protocol and more space requirements for metadata.
Typical usages:
- If in your system updates must be propagated immediately the state-based synchronization is the bad choice because it costs more to the whole state. Delta-based would be the better choice, however, in this particular case the difference with state-based won’t be that much.
- In case when you need to synchronize a replica after a failure — state-based / delta-based is the right choice. If you have to use the op-based, then there are options:
- Reply all missed changes since the failure
- Take a full copy of one of the replicas and apply all missed operations
- As it has been mentioned earlier, op-based synchronization requires effector() to be delivered only once to each replica. This requirement can be loosened by requiring effector() to be idempotent. In practice, it is much easier to implement the former than the latter.
The relation between Op-based and State-based:
Op-based and State-based synchronizations can be emulated by each other with keeping CRDT requirements. So further we are not going to pay much attention to the realization details except some interesting cases.
Now it’s time to talk about CRDT!
6. CRDT: Counter
An integer value with two operations: inc() and dec(). Let’s consider some implementations of op-based и state-based synchronisation:
6.1 Op-based counter:
Quite obvious, we just need to propagate updates. An example of inc():
generator() { return function (counter) { counter += 1 } }
6.2 State-based counter:
Tricky one as it is very unclear how to implement the merge() function. To address that some variations were proposed:
6.2.1 Increment only counter, G-Counter:
Let’s use a vector with the size of the amount of replicas (aka version vector) and each replica increases it’s vector element (by replica id) in the inc() operation. merge() function takes the maximum of corresponding vector items, the sum of all vector elements in the value of the counter
Also, G-Set can be used (see below)
Usages:
- Hits/likes counter (sic!)
6.2.2 PN-counter (Increment + Decrement):
Uses two G-Counters — one for increments, another one for decrements.
Usages:
- Amount of logged in users in a p2p network (Skype)
6.2.3 Non-negative counter:
Unfortunately, there is no simple implementation so far. Suggest your ideas in the comments to discuss.
7. CRDT: Register
A memory cell with two operations — assign() and value(). The issue is with the assign() operations — they do not commute. There are two approaches to solve this issue:
7.1 LWW-Register
Introduce a total order by generating unique ids (timestamps) on each operation.
Example: state-based, updates via tuples (value, id):
Usages:
- Columns in cassandra
- NFS — a whole file or a part of it
7.2 Multi-Value Register, MV-Register:
The approach is similar to the G-Counter — store set (value, version vector) per each node. The value of the MV-Register is all values, merge() function applies LWW approach to all vector elements.
Usages:
- Amazon shopping basket. It has a well-known bug when an item re-appears in the basket after deletion. The reason is that MV-Register doesn’t behave like a set even though it stores a set of values (see below). Amazon indeed doesn’t treat that as a bug — it actually increases sales.
- Riak. In the more general case we let the application to decide which values are the actual one (please note — we are not talking about conflicts!)
Amazon shopping basket bug:
8. CRDT: Set
A set has two non-commute operations — add() and rmv() and it is a base type for containers, maps, graphs, etc.
Let’s consider a naive set implementation where add() and rmv() are executed sequentially as they arrive: (First, we have concurrent add() on the 1st and the 2nd replicas, then rmv() arrives on the 1st replica)
Naive implementation
As you can see replicas diverged after synchronisation. Let’s consider possible correct set implementations:
8.1 Grow-Only Set, G-Set:
A very simple solution — to disallow rmv() operation at all. add() operations commute, the merge() function is just a set union.
8.2 2P-Set:
Allows rmv() operation but you can’t re-add an element after its deletion. An additional G-set can be used to track removed elements (also called tombstone set).
8.3 LWW-element Set:
Idea is to introduce a total order on a set. For example, generating timestamps. We need to have two sets: add-set and remove-set.
add() adds (element, unique_id()) to the add-set, rmv() adds to the remove-set. lookup() checks where the id is greater — in add-set or rmv-set.
8.4 PN-Set:
One more way with ordering a set — add a counter per each element. Increase it on add() operation, decrease on rmv(). Element to be considered in the set if and only if his counter is positive.
Notice an interesting side-effect: the element didn’t appear after it has been added in the 3rd replica
8.5 Observe-Remove Set, OR-Set, Add-Win Set:
add() has priority over rmv() In this data type. An example of possible implementation could be: add a unique tag (per element) to each newly added element. Then rmv() sends all seen tags of the element to other replicas. Replicas keep other tags.
8.6 Remove-win Set:
Same as above but rmv() has priority over add()
9. CRDT: Graph
The graph type is based on the set type. Here we have the following problem: if there are two concurrent addEdge(u, v) and removeVertex(u) operations — what should we do? There are 3 possible strategies:
- removeVertex() has priority, all incident edges are removed
- addEdge() has priority, all removed vertices are re-added
- Delay removeVertex() execution till all concurrent removeVertex() are executed.
The first one is the easiest to implement, for that we can just use two 2P-Sets. The resulted data type is called 2P2P-Graph
10. CRDT: Map
10.1 Map of literals:
With maps we have two problems to solve:
- How to deal with concurrent put() operations? We can do by analogy with counters and use LWW or MV semantics
- How to deal with concurrent put()/rmv() operations? We can do by analogy with sets and use put-wins or rmv-wins or last-put-wins semantics.
10.2 Map of CRDTs:
More interesting case as it allows nest other CRDT types. The important note here is that map data type does not deal with concurrent changes of its values — it must be done by the nested CRDT itself.
10.2.1 Remove-as-recursive-reset map:
In this data type rmv(k) operation “resets” the value of the CRDT object under given key k. For example, for a counter that would be zero value.
Consider the example — there is a shared shopping cart. One user adds more flour, the other one concurrently does a checkout (which causes deletion on all elements). After synchronisation we have one “unit” of the floor, which seems reasonable.
10.2.2 Remove-wins map:
In this case rmv() has priority over add()
Example: Player Alice has 10 coins and a hammer in an online game. Next two concurrent operations happen: on the replica A she has found a nail and on the replica B Alice has been removed (with deletion of all items)
10.2.3 Update-wins map:
add() has priority over rmv(), more precisely — add() cancels all previous concurrent rmv().
Consider the example: Player Alice in an online game is removed on the replica A because of the inactivity, at the same time she does some activity on the replica B. It is clear the rmv() operation must be cancelled.
Please note an interesting case: imagine we have two replicas A and B, and they store a set by the key k. If A removes the key k and B removes all elements from the set then, in the end, we’ll have an empty set under key k on both replicas.
One more important observation — we can’t just cancel all previous rmv() operations. Consider the following example — with such approach the synchronised state would be the same as initial, which is an incorrect result.
11. CRDT: List
The problem with this type is that elements indices will be different on different replicas after local update operations. To address this issue the Operational Transformation approach is used — the original index must be considered when applying received update operations.
12. Riak
As an example let’s consider what CRDT are used in Riak:
- Counter: PN-Counter
- Set: OR-Set
- Map: Update-wins Map of CRDTs
- (Boolean) Flag: OR-Set with no more than 1 element
- Register: tuples of (value, timestamp)
13. Usages of CRDT
Wikipedia has good examples
14. References
- Key-CRDT Stores
- A Conflict-Free Replicated JSON Datatype
- A comprehensive study of Convergent and Commutative Replicated Data Types
- Convergent and Commutative Replicated Data Type
- Conflict-free replicated data type
- A Bluffers Guide to CRDTs in Riak
- CRDTs: An UPDATE (or just a PUT)
- Conflict-free Replicated Data Types: An Overview
- Strong Eventual Consistency and Conflict-free Replicated Data Types
- Eventually-Consistent Data Structures