Saturday, April 8, 2017

Akka's CircuitBreaker: how to incorporate in your application, by passing messages and handling futures

Background

These days, it is quite common to build an application, which makes use of various other services, to augment its own service. In fact, the value of such a service lies in this aggregation: an aggregation, if and when thoughtfully done - and its result is brought to the users - lifts the usefulness of the application by a few notches.
So, in a typical case of such aggregation, a call to our application give rise to a number of calls to services external to itself. Let's assume that all such calls follow the REST style, over HTTP. Our  application makes a series of calls (may be concurrently) , waits for them to return, processes/transforms the values that are returned and possibly, performs an aggregation. Regular stuff, nothing surprising here.
What if one or more of these external calls fail (unreachable host, as an example)?
The call from our application times out eventually, and our fallback logic takes over; perhaps, a default or an exception-indicating value is returned. Again, this is regular stuff too. We all encounter such cases and take requisite actions. This is certainly not a blocker, by itself.
However, if our application is widely popular (why shouldn't it be?), then it is quite likely to be the target of numerous simultaneous calls from its users; perhaps the result of a sudden surge. All these incoming calls are going to result in outbound calls to the external services. If an external service is unavailable, each of these incoming calls (call chains, one may say) is going to waste some time waiting for the responses.

The problem at hand

The point is this: if 2 or 3 successive incoming calls find an outbound call failing, then it is reasonable to assume that a persistent problem exists at the external service's endpoint. Therefore, it is to our application's (and to the users of our application, by extension) benefit that we instruct the subsequent incoming calls to bypass this.  This decision may result in disappointing (or irritating) a few users with missing pieces of information but overall throughput of our application is not compromised.
A Circuit Breaker is a handy pattern to apply to put this approach in practice.

Experts’ take on CircuitBreaker

I came to learn about the CircuitBreaker pattern, from Martin Fowler's blog on the subject: https://martinfowler.com/bliki/CircuitBreaker.html This still is a very good point to start, in my opinion.
Wikipedia has a detailed treatise on it. I also found this blog, to be quite informative:  

CircuitBreaker in Akka

This blog is about the support that Akka has, for this particular pattern. From whatever I have seen so far, a good example of how a CircuitBreaker works in Akka is hard to come by. Akka's own documentation on the subject, is woefully short of what one looks for. Here's my attempt to fill in the void. Let's jump into that.
Akka provides a CircuitBreaker. If an actor calls a service using a CircuitBreaker, then successful and failed calls are tracked. If a certain number of successive calls fails, CircuitBreaker prevents any further request - opens the circuit as it were - to the service, till a certain amount of time elapses. The first request after this duration is allowed to call the service. If this is successful, the CircuitBreaker opens the circuit temporarily; we say the circuit is half-open now. If the next request is successful, then the CircuitBreaker assumes that the external service is now available and the circuit is closed; if the next request fails again, the circuit becomes  open again.
To demonstrate this behaviour, let us implement a small application. It makes use of the information about leading soccer clubs, available at http://clubinfo.com. If we place a call to this REST api, to retrieve information about a soccer club we form a HTTP GET request:
where 7 represents a particular club (in this case, Hamburger SV)
The response is a JSON string, which looks like this:
Selection_283.png

We employ the following actors to implement the flow:
  • Requestor
  • SoccerClubInfoGetter
  • SysAdminConsole and
  • CallWastePreventor
Various actors and messages which our sample application employs

The code is here: https://github.com/nsengupta/Akka-CircuitBreaker-Demo
Requestor is our point of entry; anybody looking for the details of a club, must send a message to this actor. Requestor plays the role of a mediator; it calls either SoccerClubInfoGetter or CallWastePreventor. Let me explain.
SoccerClubInfoGetter encapsulates the actual call made to the external REST endpoint, asynchronously:

CompletableFuture.supplyAsync(
new Supplier<ClubDetailsFromXternalSource>() {
        @Override
         public ClubDetailsFromXternalSource get() {
String s = Http.get(clubInfoAskedFor).text();
return (
new   
                         InteractionProtocol.ClubDetailsFromXternalSource(s,originalSender)
                        );
                    }
}),
getContext().system().dispatcher()

I am using a simple REST endpoint accessing tool, named javalite (Http.get()) Any such tool can be used here.
SoccerClubInfoGetter simply pipes the response to javalite's (Http.get()) call above (available as a JSON):
pipe (
CompletableFuture.supplyAsync(
               // As shown above
           )
        ).to(getSender()
    );

In our case, Requestor is the recipient of the piped response above, because it is the sender.
In order to emulate a failing call to the external REST endpoint, we feign a longish sleep, so that the caller decides to give up and return, convinced that the external service is unavailable. Because all clubs are identified by a non-zero integer, we take such a step when the identifier of the club is passed as zero!
else { // Emulating a failed call to the external service
Thread.sleep(2000);
getSender().tell(
  new InteractionProtocol.UnavailableClubDetails("timed out"),
  getSelf()
      }

Take a look at SoccerClubInfoGetter to understand how is it enacting the role of a ‘failure generator’, if you will.
Requestor relies on the SoccerClubInfoGetterActor for getting the job done. If SoccerClubInfoGetterActor sleeps for 2 seconds, Requestor's ask() times out (ASK_TIMEOUT value is set to 1 second).

We convert the Object it returns, to a ClubDetailsFromXternalSource message using a Function object. If ask() fails, we absorb the Exception and return a TimedOutClubDetails message. In both of these cases, the Requestor feeds the message to itself, by calling a getSelf().
Requestor's receive() function is equipped with the logic of handling either of the two messages mentioned in the paragraph above:

Requestor remembers the ActorRef of the actor who has sought the club's information in the first place, which it uses to provide the appropriate response. In case, the ask() has timed out, the response is a piece of helpful information ("Service unresponsive, try again later"). In both the cases, we are returning a String type; so, the compiler is happy too.
Take a look at RESTDriver, to understand the behaviour of Requestor and SoccerClubInfoGetter.

Failure is emulated

Let's revisit the following code snippet from SoccerClubInfoGetter:
It is an emulation of an unpredictable situation: the call to the external API may or may not fail; we don't know. To bring in an element of determinism, we are targeting a clubID of zero to force the logic follow this path. By making it sleep, we are delaying the thread long enough to cause the ask() from Requestor to time out. In a real-life situation, this determinism is absent; so, it is possible that a series of calls to the external API fails after timing out. This is a waste for sure. Our application's throughput is adversely affected and users are unhappy.

A CircuitBreaker comes as an aid

A CircuitBreaker helps in a situation like this. When we instantiate a CircuitBreaker, we give it a number that represents the tolerance level of failures and a duration for which it remains . If we set it to 3, then we are telling the CircuitBreaker that
  1. If 3 successive calls through you fail, consider that target of these calls is in trouble.
  2. Cause the circuit to open and leave it at that for the finite duration you are initialized with
  3. Any call that reaches you during this duration, send it back immediately, citing a failure
  4. Once the duration elapses, be ready the for next call and let it go through.
  5. If this call fails, go back to step 2 above.
To execute this cycle predictably and flawlessly, a CircuitBreaker follows a series of well-defined State Transitions. This page from Akka's documentation provides a clear illustration.
If you follow the diagram, you can see that the step '4' above, leaves the story, tad incomplete. What actually happens is that at the step 4, the CircuitBreaker causes the circuit to open in half. If the very next call (step '5' above) is successful, then the CircuitBreaker causes the circuit to close; otherwise, it causes the circuit to be open again (step '2' again).
In our application, we implement a CircuitBreaker in the CallWastePreventor actor (code here).
The key aspect of CallWastePreventor's receive() function is a block, which is - unsurprisingly - very similar to SoccerClubInfoGetter:

workingCallable parameter to the circuit breaker is an instance of JDK 8's Callable. As the  pipe() call expects a CompletionStage, this Callable has to produce a CompletionStage. Moreover, as the pipe() call has to finally pipe a message of the type ClubDetailsFromXternalSource, the Callable has to produce a CompletionStage of ClubDetailsFromXternalSource:

nonWorkingCallable is initialized in an equivalent manner.
In its role as the mediator, the Requestor makes no difference between a SoccerClubInfoGetter and a CallWastePreventor. The mechanism it resorts to is the same: it asks whichever ActorRef it is injected with, (the construction parameter circuitBreakerJeeves) for the information about a particular club and deals with the response it eventually receives.
To get an idea of the way the application works, take a look at RESTDriver (uses SoccerClubInfoGetter) and CBDriver (uses CallWastePreventor).

CircuitBreaker’s Transition handlers

A CircuitBreaker provides a facility to register callback functions, namely onOpen, onHalfOpen and onClose, to mark each of the state-transitions mentioned earlier. We employ a SysAdminConsole actor; this actor is notified every time the CircuitBreaker's state changes. Using a specific message called AdminQueryMessage, we get to know what is the current state, the CircuitBreaker is in, at the moment. We use this to our advantage while testing the application. Take a look at CircuitBreakerTest (in the usual location of src/test/java).

Importance of durations

It is a common (and important) knowledge to realize the importance of duration in the behaviour of an Actor-based application. In our demonstrative application, it assumes even greater significance, because the basis of the emulation of failure in it, is occurrence of a Time-Out. For the sake of simplicity (and, some laziness too, I admit), some of these durations are initialized using hard-coded values. As a result, other components, whose behaviour depends on the aforementioned durations, also use hard-coded values. This understanding will help, when you go through CircuitBreakerTest, CBDriver and RESTDriver.

Let me know if you find this article helpful, or if you think something specific is missing, whose presence would have helped in grasping the behaviour of Akka’s CircuitBreaker.