Monday, December 14, 2015

Getting close to Apache Flink, albeit in a Träge manner - 1

Of late, I have begun to read about Apache Flink. Apache Flink (just Flink hereafter), is an ‘open source platform for distributed stream and batch data processing’, to quote from the homepage.  What has caught my interest is Flink’s idea that, the ability operate on unit of data streaming in gives one the flexibility to decide what constitutes a batch: count of events or events collected during a pre-specified period of time. The system doesn’t dictate the way batches are formed and operated up on. This tenet of units giving rise to sums sounds more intuitive to me.


I am not an expert on distributed stream processing - let alone Flink - but I am quite curious to know more about the subject. That has led me to play around with it. This blog is more of a journal of my continuing attempts to study the behaviour of Flink’s Window APIs. The details of Flink are all available here.

Premise

So, the premise is this: from a source, pieces of data (datum, to be grammatically correct :-) ) are coming in, continuously. We want to be able to delicately place our application on this stream, in such a manner that we observe the flow without having to stall it. As we observe, we pick the data (make copies of, to be semantically accurate) we are interested about, _transform_ that to a shape which is more conducive to the way we want to process it, run various computations on it and store or push the result thus obtained.

Importance of Time and Count

We need to take a note of the fact that time and count have an inseparable association with the kind of processing we have outlined above. Pieces of data referred to earlier, form a continuous stream while, to be able to carry out meaningful computations, we need to have discretized view of the same. Time and Count give us the conceptual planks on which we can base the mechanism of discretization. At a busy crossing in a bustling city, if we are waiting for the green signal go red so that we can cross safely, and the green signal takes interminably long, we may want to be a little more inquisitive and count how many cars passe in either directions before the light changes from green to red, every time. That count gives us something to reason about and with - we can even take this up with the city council - not the endless ‘stream’ of cars that we observe indifferently. In another example, the bank officials may want to find out which are the peak hours of a Sunday that customers withdraw money from a specific ATM. So, they may want to observe the usage stream of that ATM, and collect only the withdrawal transactions taking place every hour of the day (please understand that the number of such transactions may and most possibly will vary). Depending on what they see about the peak usage hours, they may decide when to refill the ATM with banknotes. This being a Sunday, they may want to reduce the cost of hiring cash management and security services.


Back to Flink

To begin processing, we need to create an environment. Because, I run my experiment on my laptop, I create a local environment. A local environment allows us to use a single JVM for running the application. Such an arrangement facilitates local debugging too; just as any other Java application, it can be run from inside our favourite IDE. Even though it is somewhat patronizingly given the appellation of a local, this environment actually brings up the full Flink runtime, including a JobManager and a TaskManager which are essential when Flink runs in a true clustered environment (most likely, in production).


val env = StreamExecutionEnvironment.createLocalEnvironment(1)


The code above creates a local environment with a parallelism of 1. In effect, it uses only one thread inside the environment for execution. If no such instruction is given, Flink uses as many threads as the number of cores in the laptop (4, in my case).


Now that we have the environment created, we can begin to process a stream of data and discover the ways to do that in a Flink-way.


We are going to use a set of data available as a CSV file. I have taken this set of data from this pubnub page which offers these readings as a simulated IoT stream (thanks, https://twitter.com/ambertch) . Though my (greenhorn’s) Flink application subscribes to this stream of data without much problem, in this blog we are referring to a subset of that data, stored as CSV file. That way, it is easier to explain what the application does. The columns of the file are:


sensor_uuid,radiation_level,photosensor,humidity,timestamp,ambient_temperature


a sample row looks like this:
"probe-b0196da6",198,794.87,76.7054,1448028167,17.50


We want to read the records of this file, and feed them to Flink as if a stream of records are arriving:


val readings  = readIncomingReadings(env,"./sampleIOTTiny.csv")


Because we want to hold the data in a nicely formed POJO of ours, we create a case class and tuck in the fields of a record in it:


case class IncomingDataUnit (
                             sensorUUID: String, radiationLevel: Int,photoSensor: Float,
                             humidity: Float,timeStamp: Long, ambientTemperature: Float)
 extends Serializable

env.readTextFile(inputPath).map(datum => {
     val fields = datum.split(",")
     IncomingDataUnit(
       fields(0),              // sensorUUID
       fields(1).toInt,        // radiationLevel
       fields(2).toFloat,      // photoSensor
       fields(3).toFloat,      // humidity
       fields(4).toLong,       // timeStamp
       fields(5).toFloat       // ambientTemperature
     )
   })


Let us begin by observing the ambient temperatures that are arriving. We want to find out the maximum ambient temperature for every 4 readings. To accomplish this, we do the following:


val readings =
     readIncomingReadings(env,"./sampleIOTTiny.csv")
     .map(e => (e.sensorUUID,e.ambientTemperature))
     .countWindowAll(4)
     .max(1)


Function readIncomingReadings generates a stream of IncomingDataUnits. We extract a tuple of only two fields we are interested about. The important point is that even after we map, we still get a stream. The streaminess is not obliterated by the mapping action. In fact, Flink allows us to transform a stream of one type to another, as and when we need to do so. We will see more use of such transformations as we go further.
Now, we want to begin to observe these tuples and to do that we create a window which positions itself on the stream and keeps observing the tuples passing across. According to Flink, we assign a window to a stream. Function CountWindowAll(4) above creates and assigns the window to the stream.


It is rather obvious that just by observing the tuples wafting by, one achieves nothing. We need to take some action to extract some meaning out of the them. Let us say that intend to find out the highest ambient temperature amongst every 4 tuples that arrive and pass by: get the highest temperature from first 4 tuples, and use; get the highest from the next 4 tuples, and use; get the highest from the next 4 tuples, and use - this continues as long as the tuples arrive. Get the idea?


To get this done, while assigning an window, we leave it with the instruction that whenever it has observed 4 consecutive tuples, it should take an action: to find the maximum amongst them. The last two lines of the code snippet above do just that. An window is assigned by calling CountWindowAll(4) - this tells the window to temporarily hold maximum 4 tuples inside it before it takes action. Once 4 tuples are in, the maximum temperature from amongst them, is calculated. In the tuple, temperature is the 2nd field; hence, the parameter to max() is 1 (zero-based indexing and all that). When the program runs, it generates the following output:


(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-f4ef109e,32.28)
(probe-987f2cb6,29.43)
(probe-c027fdc9,31.05)


The datafile ‘sampleIOTTiny.csv’ is available in the code repository. One can check its contents to interpret the output better.


Easy-peasy, right?


This handy and terse function CountWindowAll(n) is available with Flink. In fact, Flink comes with a chest of such powerful yet semantically easy functions to get a large number of common use-cases done. We will meet some of them later (including future blogs that I intend to write on the subject).
To be a little more specific about behaviour of the function CountWindowAll(), let us take a second look at what it does. It sees one tuple in and check if the count of tuples has reached 4. If not, it waits for the next tuple to arrive. Upon its arrival, it sees this one in too and check the count again. It continues to do so till the count is 4. When the count is 4, it hands over the tuples to the max() function and simply cleanses itself of the tuples it has let in so far. With a clean slate, it then begins to wait for the next tuple to arrive. This act of automatic self-cleansing assures that the window doesn’t see the same tuple twice. This is what we expect when determining the highest temperature of every 4 readings: each quartet of reading must be disjoined from the other. In Flink’s world, such windows are given the epithet ‘Tumbling Windows’ because after a certain condition is reached, the window tumbles its contents into oblivion.


Let us see whether we can a derive a different interpretation of the highest temperature from the readings. To keep matters simple, let us assume that the following is a series of temperatures that arrive (the flow of tuples is from right to left; we are observing them at the leftmost point):


<observing here> 12,13,18,19,30,11,14,14,25,28,28,11 (← arriving from here)


If we follow the CountWindowAll() approach above, this is what the window sees (‘|’ is only for easier visualization of the quartets)


12, 13, 18, 19 | 30, 11, 14, 14 | 25, 28, 28, 11 |


So, maximum temperature is calculated as 19, 30 and 28. This is useful if we want to find only the highest that occurred in every compartment. This may not be so useful, however, if we want to know that 30 remains the highest for a number of consecutive readings. To do that, we need to check a rolling count of 4 readings. The window should see tuples in this manner:


12, 13, 18, 19 | 13, 18, 19, 30 | 18, 19, 30, 11 | 19, 30, 11, 14 | 30, 11, 14, 14 | ….


After it sees 4 tuples, the window should ease out the tuple that has come in the earliest, and then see next tuple that arrives, forming the next quartet. It should then calculate maximum temperature again.  Effectively, we need a sliding window of 4 tuples. Can Flink help us accomplish this?


It turns out that aforementioned function CountDownAll(m,n) takes two parameters (in an overloaded form). The first parameter is the maximum number of tuples to be held in it before max() is called, as we have seen earlier. The second is the number of tuples to be eased out of and seen in the window, each time max() is called. To be able to see the tuples as shown above, we should use CountDownAll() this way:


val readings =
     readIncomingReadings(env,"./sampleIOTTiny.csv")
     .map(e => (e.sensorUUID,e.ambientTemperature))
     .countWindowAll(4,1)
     .maxBy(1)


When run, the program produces the following output:


(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-f076c2b0,29.37)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-6c75cfbe,30.02)
(probe-400c5cdf,27.18)
(probe-400c5cdf,27.18)
(probe-42a9ddca,32.28)
(probe-42a9ddca,32.28)
(probe-42a9ddca,32.28)
(probe-42a9ddca,32.28)
(probe-6dd6fdc4,29.43)
(probe-6dd6fdc4,29.43)
(probe-6dd6fdc4,29.43)
(probe-6dd6fdc4,29.43)
(probe-960906ca,31.05)
(probe-960906ca,31.05)


Neat, no?
We have just scratched the surface of what Flink enables us to do with streamed data. In the blogs that follow this, I will share with you more about my journey along the streams, with Flink as the friend, philosopher and guide.


I have used Flink 0.10.0 for the code shown above. The code is available here!


Many thanks to the fantastic folks who not only are behind Flink, also in the front of it, tirelessly answering my newbie questions in the Flink User Group and prodding me to understand the concept. A special thanks to Fabian Hueske (@fhueske): his recent blog on Flink's handling of windows is a must-read.

3 comments:

  1. Thanks for the great post! Flink is already competing and in some use cases outperformed Hadoop's MR, Spark, Storm and Samza..however yet to receive major adoption..I guess the war is between "Approximate Stream Processing Vs Stream Processing"!

    ReplyDelete
  2. First of all thank you for sharing this informative blog.. This blog having more useful information that information explanation are step by step and very clear so easy and interesting to read.. After reading this blog i am strong in this topic which helpful to cracking the interview easily..

    hadoop training institute in chennai | big data training institute in chennai

    ReplyDelete
  3. Great and helpful blog to everyone.. Before reading this blog i have dont have a proper idea about hadoop but now i am very strong in topic which really helpful to update my knowledge of big data.. thanks a lot for sharing this blog to us..

    best hadoop training | best big data training

    ReplyDelete