Cassandra Internals: Bloom filters

In a previous post I explained about SSTable and their role in the persistent storage of data in Cassandra.

Suppose that following query is received on a node (news_id is primary key):

SELECT * FROM tblNewsItems WHERE news_id = '8bacfe891fa89bfab98d9e99f9a9';

How can the node determine if this specific row exists in it’s local data? As we know we have in-memory tables (mem-table) but they are used as a caching mechanism and not the real storage. So any data lookup in these in-memory storages won’t be enough as they don’t have the whole range of data.

Another solution is to read the SSTable and check if the table contains the key that we want. Even in case of using effective search algorithms like binary search, this will be too expensive because it involves a lot of disk reads which may not be efficient enough for a system with a lot of data and a lot of read queries.

The solution is to use Bloom Filter data structure. This is an in-memory, probabilistic data structure which can help you find if an element is member of a set or no? Here the set is the Cassandra table (which is actually a set of rows) and the element is the primary key we are looking for. So we have these features:

  1. It is in-memory, so it will be fast and efficient.
  2. It is probabilistic means the result will not be guaranteed to be correct. There is a high chance that the result is correct and this is enough for us. This is the trade-off that we pay to have such an efficient in-memory data structure. As a result, if the bloom filter says a row is inside a set, there is a very low probability that this is not correct. But the other side of the result is guaranteed to be true. If it says some row ir NOT in the set, it definitely is not.

How bloom filter works

A bloom filter is a bit-map containing n bits and a set of hash functions which are expected to be independant. Let’s assume we have a bloom filter with 100 bits and 3 hash functions (f1, f2 and f3). In practice the size of bloom filter will be much larger (MB or GBs). Each of has functions will receive an arbitrary input (row primary key) and output a number between 1 to n. You can easily achieve this output by using modulo operator and common hash functions.

There are two algorithms for insertion of data into a bloom filter and checking whether something exists in the set or no. These alrogithms are pretty simple and straightforward. To insert data x into bloom filter:

  1. Apply hash functions on x. So we will have a=f1(x), b=f2(x) and c=f3(x).
  2. Set bits numbered a,b and c in the bloom filter to one (Note that they may have already been set to one but this doesn’t matter).

To check if data x exists in the bloom filter:

  1. Apply hash functions on y: So we will have d=f1(y), e=f2(y), f=f3(y).
  2. Check bits numbered d, e, and f in the bit-map of bloom filter. Are they all set to one? If so, the row probably exists in the set. If even one of those bits is zero, means that row definitely does not exist in the set.

How false positives are handled?

False positive for a bloom filter means cases where the filter indicates that a row exists in the table but it was not. Remmber that this is a probabilistic data structure so these cases may happen.

If there is a false positive, we won’t know about it until we scan the SSTable. So generally Cassandra will scan the SSTable looking for a specific row, if the bloom filter indicates the row exists in the table. If after scan completion, row is not found, this will be recorded as a false positive.

You can run ‘nodetool cfstats‘ command in a Cassandra node to view a lot of statistics about the node. One of those stats is for bloom filters which shows you the memory consumed by bloom filter and number of false positives.

 

Cassandra Consistency Levels

In this post, I will explain more about data consistency in Cassandra. In a non-distributed system, it rarely happens that some data is missing or corrupt. But in a large scale distributed database system, this will be the norm rather than the exception. So there should be mechanisms to detect, handle and fix these situations. Customizable consistency levels are one of those mechanisms.

Write Consistency

In the last post, I explained what happens when data is being written in Cassandra. But how Cassandra determines if a write operation is successful or not?

Each write operation can specify the consistency level it needs. This is a number which determines how many replicas have to send success reply to the coordinator so that the whole operation can be considered successful.

If for example this is set to ONE, with a replication factor of 3, the coordinator will ask three nodes to store the data and will return a success status upon receiving the first success response from any of these three nodes. This, in the worst case, may mean that the other two nodes have failed to write the data. This is not something which happens normally but in a large system, with a lot of nodes and a lot of data flying around, things may go wrong.

We can use the consistency level to adjust a trade-off between performance (lower consistency level = faster response time) vs. reliability (higher consistency level = prevent corrupt write).

Read Consistency

Same as what we have for write operations, we can specify the same thing when reading data. When data is written in Cassandra, it will be written to more than one node (Refer to this and this post). Now when we want to read the data back, how should we proceed? Which of those nodes should be contacted and what if some of the contacted nodes doesn’t return a response?

The Read Consistency Level determines how the coordinator node should respond to the cases where some of the nodes, don’t reply a READ request or reply too late. For example, if Read Consistency is set to ALL, this means that coordinator should wait to get a response from all replicas. This will provide the highest level of reliability but the lowest performance. You can set it to TWO or THREE so coordinator will wait for two or three nearest replicas to return a response.

Example

Above figure shows a cluster with 12 nodes and a replication factor of 3. This means that each row of data will be written to 3 nodes (R1, R2 and R3 in this figure). When a client asks the coordinator (Node 10) to read data with Consistency Level of ONE, it contacts the nearest node (R1) for the data. In the background, it will make sure R2 and R3 have the most recent data and if not, a read repair operation will be initiated.

Handling Inconsistency

If a coordinator gets different data from different replicas, which one should it pick? The answer is, Cassandra timestamps all the data so the coordinator can easily determine the most recent data which will be returned to the client. Also after this happens, the coordinator will start a read repair process in the background to make sure all replicas have up to date data.

 

How Cassandra Writes Data – Part 2

This is the second part in a two-part series about the internals of Apache Cassandra for writing data. Click here to read the first part.

Flow of execution

The process begins by a client sending a request to the coordinator containing an INSERT statement. Let’s assume we have a table named table1 according to below definition:

CREATE TABLE table1 (id int, name text, PRIMARY KET id);

And the cluster consists of five nodes, which we will call node1, ..., node5, and the replication factor is 3. The cluster will have a partitioner algorithm which given the primary key of the row, outputs a big random number. We call this random number, the identifier of the row. We assume an identifier number is a 128-bit number which means it will be between 0 and 2^128-1 (max number).  Now consider this as a range of (0, max). Upon cluster configuration, this range is divided into five equal sub-ranges (because the cluster has five nodes):

R1=(0, max/5), R2=(max/5+1, 2*max/5), ... .

Each node will have its own range. R1 will be assigned to node1, R2 to node2 etc.

All nodes in the cluster know about other nodes and their corresponding range.

Coordinator receives this CQL statement:

INSERT INTO table1(id, name) VALUES (100, 'mahdi');

First, it applies the partitioner algorithm on ‘100‘ (the primary key). The result will be a number. It then determines the corresponding range (Ri, i=1,..,5) within which the number lies. Let’s assume the result number lies within R2 range. So node2 will receive the first copy of the row. But we have a replication factor of 3 which means data needs to be stored on three different nodes. In a simple replication strategy, additional nodes will be next nodes after the original receiver of the data which are R3 and R4 in our example. So coordinator will send 3 requests to R2, R3 and R4 to store the values for the new row in their local data (There is another more complex strategy called NetworkTopologyStrategy, see here for more information).

Note that R1 and R5 know nothing about the new row but the whole cluster contains the new data and you can later query this data from any of the nodes in the cluster.

SSTable, commit log and memtable

In a node which belongs to a Cassandra Cluster, there are 3 components which are used to store data:

  • SSTable: The storage of database on the persistent disk (e.g. Hard Disk). When data is written to this storage, it is permanently persisted, but problem is, writing to this storage is expensive. That’s why we need the other two components.
  • Memtable: For each table in the database, there is a memory space allocated to store its data. It is extremely fast because it’s in memory but it is not reliable, because in case of a problem in the node, all it’s data will be cleared.
  • Commit log: This is a persistent data file written to the local storage which contains a copy of all the actions applied on the database. These data can be used to re-construct SSTable in case of a problem on the node.

When writing data to the database, the data is written on Memtable and then on Commit log. After that a successful response is sent to the requester indicating the write operation is done successfully. Note that data is not written to SStable but it’s on persistent storage (Commit log) so it is safe. Periodically, the node requests memtables to be flushed to SStable which will write all updates to SStable (the final permanent storage for the data).