Home   About Me   Blog  

Exploring consensus via python.(24 Mar 2018)

Intro
How do you get different computers that are part of a larger cluster to agree on what the value of something is?
Suppose a client sent a request to server 1 to set the value of a key foo to the value bar. However, server 1 is part of a cluster of N servers(there are good reasons for having servers be part of a cluster, but we wont got into that)

client request

Thus the request; set foo=bar needs to be propagated to all the servers. A naive approach would go something like;
When server 1 gets the request, it also sends that request to sever 2 which in turn sends the request to server 3 ...
However, a couple of problems arise with that approach;
- What happens if there is some network error anywhere in that chain of requests?
- Does the client have to wait for all those round-trips of requests to finish before it can get a response?
- What if a 3 letter named goverment agency intercepts the request and changes it from foo=bar to foo=baz?
- And so forth

How we get computers to agree on the value of some data, is through consensus.
There are a number of consensus protocols with varying degrees of usage in the real world. Here is a wikipedia rabbit hole to get you started.
For the remainder of this post, we will explore one of those algorithms; CASPaxos.

CASPaxos
CASPaxos is a consensus algorithm by Denis Rystsov, the white paper is available from arxiv.
The paper is surprisingly simple and also easy to comprehend.
In a nutshell, the algorithm goes like this;

A. Prepare phase: CASPaxos prepare
1. client sends request to server 1. Server 1 generates a ballot number and sends that ballot to all other servers
2. The other servers will return a conflict(to server 1) if they already saw a greater ballot number. Persists the ballot number and returns a confirmation either with an empty value (if they haven't accepted any value yet) or with a tuple of an accepted value and its ballot number.
3. Server 1 waits for a majority of servers to respond with confirmation.

B. Accept phase: CASPaxos accept
4. Server 1 sends the earlier generated ballot number and value(ie foo=bar) to all other servers.
5. The servers will return a conflict if they already saw a greater ballot number. They mark the received ballot number and value as the accepted value and returns a confirmation.

C. End:
CASPaxos end
6. Server 1 waits for a majority of servers to respond with confirmation.
7. Returns a response to the client.

Disclaimer: The description above is not 100% correct. I have left out a number of important details(eg in the paper, the client doesn't send a request like, set foo=bar, instead it sends a change function). You should read the actual paper for full details. However, the description above will suffice for now.

python
Lets explore how the algorithm achieves consensus, we will use python to try and demonstrate this.
We are going to build a proto-type of a distributed storage system, and we will use CASPaxos to achieve consensus among the various servers in our storage system.
First of all, the code for the proposer(server1 in our CASPaxos description above.)


import time
import random

class Proposer(object):
    def __init__(self, acceptors):
        if not isinstance(acceptors, list):
            raise ValueError(
                "acceptors ought to be a list of child classes of Acceptor object")
        self.acceptors = acceptors
        # since we need to have 2F+1 acceptors to tolerate F failures, then:
        self.F = (len(self.acceptors) - 1) / 2
        self.state = 0
        
    def receive(self, state):
        """
        receives request from client. The request will be a dictionary like; state={'foo': 'bar'}
        """
        ballot_number = self.generate_ballot_number()
        self.send_prepare(ballot_number=ballot_number)
        result = self.send_accept(state, ballot_number)
        return result

    def generate_ballot_number(self, notLessThan=0):
        # we should never generate a random number that is equal to zero
        # since Acceptor.promise defaults to 0
        ballot_number = random.randint(notLessThan + 1, 100)
        return ballot_number

    def send_prepare(self, ballot_number):
        # list of tuples conatining accepted (value, ballotNumOfAcceptedValue)
        confirmations = []
        for acceptor in self.acceptors:
            confirmation = acceptor.prepare(ballot_number=ballot_number)
            if confirmation[0] == "CONFLICT":
                # CONFLICT, do something
                pass
            else:
                confirmations.append(confirmation)

        # Wait for the F + 1 confirmations
        while True:
            if len(confirmations) >= self.F + 1:
                break
            else:
                # sleep then check again
                time.sleep(5)

        total_list_of_confirmation_values = []
        for i in confirmations:
            total_list_of_confirmation_values.append(i[0])

        if sum(total_list_of_confirmation_values) == 0:
            # we are using 0 as PHI
            self.state = 0
        else:
            highest_confirmation = self.get_highest_confirmation(confirmations)
            self.state = highest_confirmation[0]

    def get_highest_confirmation(self, confirmations):
        ballots = []
        for i in confirmations:
            ballots.append(i[1])
        ballots = sorted(ballots)
        highestBallot = ballots[len(ballots) - 1]

        for i in confirmations:
            if i[1] == highestBallot:
                return i

    def send_accept(self, state, ballot_number):
        self.state = state
        acceptations = []
        for acceptor in self.acceptors:
            acceptation = acceptor.accept(
                ballot_number=ballot_number, new_state=self.state)
            if acceptation[0] == "CONFLICT":
                # CONFLICT, do something
                pass
            else:
                acceptations.append(acceptation)

        # Wait for the F + 1 confirmations
        while True:
            if len(acceptations) >= self.F + 1:
                break
            else:
                # sleep then check again
                time.sleep(5)

        # Returns the new state to the client.
        return self.state
                

And the code for the Acceptors(the other servers in our CASPaxos description above).

class Acceptor(object):
    promise = 0  # ballot number
    accepted = (0, 0)

    def __init__(self, name):
        self.name = name

    def prepare(self, ballot_number):
        if self.promise > ballot_number:
            return ("CONFLICT", "CONFLICT")

        # this ought to be flushed to disk
        self.promise = ballot_number
        return self.accepted

    def accept(self, ballot_number, new_state):
        if self.promise > ballot_number:
            return ("CONFLICT", "CONFLICT")
        elif self.accepted[1] > ballot_number:
            return ("CONFLICT", "CONFLICT")

        # these two ought to be flushed to disk
        # http://rystsov.info/2015/09/16/how-paxos-works.html
        self.promise = 0
        self.accepted = (new_state, ballot_number)
        return ("CONFIRM", "CONFIRM")
                

We are going to use python class instances to represent(in place of) actual servers.
We are going to create five servers named a1 .... a5 of type Acceptor.
We will also create another one server of type Proposer.
We will have a client send a request to the proposer, then CASPaxos algorithm will take over.
At the end of it all, the client's request will have been persisted in all the 5 servers.

a1 = Acceptor(name='a1')
a2 = Acceptor(name='a2')
a3 = Acceptor(name='a3')
a4 = Acceptor(name='a4')
a5 = Acceptor(name='a5')

acceptorsList = [a1, a2, a3, a4, a5]
p = Proposer(acceptors=acceptorsList)

response = p.receive(state={'foo': 'bar'})
print "response:", response

for acceptor in acceptorsList:
    print "value persisted by acceptor={0} is {1}".format(acceptor.name, acceptor.accepted[0])
                

And the result:

response: {'foo': 'bar'}
value persisted by acceptor=a1 is {'foo': 'bar'}
value persisted by acceptor=a2 is {'foo': 'bar'}
value persisted by acceptor=a3 is {'foo': 'bar'}
value persisted by acceptor=a4 is {'foo': 'bar'}
value persisted by acceptor=a5 is {'foo': 'bar'}
                
The full code can be found on github.

Conclusion
CASPaxos can help you achieve consensus in your distributed systems and make failure handling a bit easier.
The protocol also has sections that cover other system administration aspects of distributed systems such as Adding and removing servers among others.
I invite you to go read the paper.
It's author, Denis Rystsov, is on twitter @rystsov and I'm grateful to him for answering my queries whenever I called on him.

CASPaxos, however, won't help you when 3 letter named agencies decide to tamper with your requests.