From the preceding post in this series
As a stream of events enter a Flink-based application, we can apply a transformation of CountWindow on it (there are many such transformations the Flink offers us, we will meet them as we go). CountWindow allows us to create a group of arriving events and use that group for some application-specific action (in the last blog, we found out the maximum temperature in the group of events). As long as those events arrive, we continue to receive the a series of maximum temperature and probably, take some further action on it.
To put it differently, we want to look at the events in terms of their count. We don’t care when the events enter our system or when they actually occur. It is quite possible that between the 2nd and 3rd event in a count-based window, the time lag is considerable. In other words, the time it takes for a window of count - say ‘5’ - to form may be arbitrary; yet we don’t really care. All we want is a collection of 5 events at our disposal before we proceed to make some meaning of them.
a case for time, rather than count
Referring to the case (from the preceding blog) where we want to know more about the cars passing by a particular street junction, we can make two types of observations: ( 1 ) for every 100 cars crossing the junction in either of the directions, how many times do the signals go RED and ( 2 ) in every 5 minutes, how many cars pass through the junction in either of the directions. The first-one is about a window of how many cars, ignoring the duration we may have to wait for 100 cars to pass by (on a holiday in the office district, perhaps). The second-one is about a window of a fixed time, not caring about the number of cars that may pass by (may be 0, 5 or 75). Flink gives us CountWindow for the earlier and TimeWindow for the latter. In this blog, I take a look at the TimeWindow feature of Flink.
It is important to refer to the aspect of ‘discretization’ that I have mentioned in the preceding blog. Arriving events form a continuous stream while, to be able to carry out meaningful computations, we need to have discretized view of the same. Time and Count give us the conceptual planks on which we can base the mechanism of discretization. Rather obviously, while dealing with TimeWindows, Flink uses ‘Time’ as the basis of discretization.
Time, in the world of Flink’s windows
In the world of Flink, ‘time’ that is associated with an ‘event’ has great significance. An event always has a timestamp. This timestamp helps Flink to decide how to treat the event (what transformation can be applied on it). If we think a little about it, it is rather intuitive. In an endless flow of events - which are anonymous and opaque to Flink when they arrive - the only determinable aspect that Flink can associate with each of them is when come into being. If the source - which is generating the events - is already putting such a timestamp with every event, then Flink’s job is less. In fact, there can be many situations where one needs to carry out an analysis of the events based upon when the event had been generated and not when they are seen by Flink. As an alternative, we can ask Flink to associate a timestamp to every event arriving. Either way, every upstream operation can safely assume that every event carries a timestamp with it and use that during the computations.
-
Processing time
This is the wall-clock time of the machine where the transformations are taking place. We don’t care when does an event come into being; we are only interested about exactly when the machine (running Flink’s transformations) is processing it. While this is perhaps the simplest notion to understand, it is also important to bear in mind, that in a distributed environment, it is not foreknown which machine is going to process that event and hence, what timestamp is associated with an event.
-
Event time
Event time is the time when the event comes into being, not when it enters Flink. Typically, such a time is only known to the source that generates the event and is responsible for gluing it on the event. While processing the event inside Flink, we can tap and use it. Because the event carries its own timestamp, it is possible to treat an event that arrives out-of-order (later than its followers).
-
Ingestion time
Ingestion time is what it indicates: the time at which the event ingested in Flink. It is what the wall-clock reads at the point when an event is ready at the source and is about to enter Flink’s operators. Put differently, an ingestion timestamp is put by Flink just before the event begins its journey along the Flink’s processing pipeline. It is more predictable than Processing Time mentioned earlier and is interpreted correctly even when the processing is distributed. From the (fantastic) blog of Fabian Hueske (https://twitter.com/fhueske): ‘..is a hybrid of processing and event time.’
Armed with these three distinct notions of time offered by Flink, let’s proceed.
Processing time
Event time
Ingestion time
Back to Flink
We are using the sample dataset obtained from pubnub <link here> as we have done earlier. This time, we are going to consider the timestamp that is a part of the data that we read. Here’s a row from the file sampleIOT.csv :
probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
2nd field from the right is the timestamp: it indicates the time at which this reading event has been generated by pubnub; the rightmost field is the reading for ambient temperature. We are going to instruct Flink that it should use this timestamp, to determine which window does the event belong to. The notion of time that we are resorting to is Event Time from above.
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3))
.maxBy(1)
We are instructing Flink to interpret the incoming events’ timestamp as EventTime. Also, we are calling assignAscendingTimeStamps() to indicate that the first field of the pair should be used as the timestamp and these can be assumed to be in the ascending order (the CSV file is duly sorted on the timeStamp field before being fed to Flink).
Then, we are creating a timeWindow which spans over a period of 3 milliseconds. So, Flink is taking in the records (events) that are coming in and keeping an eye on the timeStamp field of each of these. A record - whose timeStamp is within the first 3 milliseconds block - qualifies to be bunched with other such in that window. That bunch is then given to maxBy(), and the maximum ambient temperature amongst the readings in that bunch is picked up.
Let’s try and understand the way Flink creates the timeWindow. Here’s a portion of the data file:
1448028160,30.02
1448028160,25.92
1448028160,22.18
1448028161,16.18
1448028161,16.36
1448028161,19.19
1448028162,18.99
1448028162,27.62
1448028162,18.82
First window (of 3 milliseconds), the highest ambient temperature is 30.02
1448028163,20.44
1448028163,16.18
1448028163,21.57
1448028164,22.66
1448028164,27.83
1448028164,23.22
1448028165,19.69
1448028165,21.59
Second of (of 3 milliseconds), the highest ambient temperature is 27.83
And, so on. When our small program is run using the actual data file (SampleIOT.csv), Flink generates the following output:
(1448028166,32.06)
(1448028160,30.02)
(1448028163,27.83)
For good measure, if we create a timeWindow of 1 millisecond, thus:
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(1))
.maxBy(1)
Flink gives us this:
(1448028166,32.06)
(1448028162,27.62)
(1448028164,24.34)
(1448028167,26.37)
(1448028165,23.73)
(1448028163,27.83)
(1448028161,29.43)
(1448028160,30.02)
Effectively, we have been able determine, the highest ambient temperatures of every 1-millisecond time span.
At this point, I have not been able to figure out, how Flink decides the the order in which tuples are printed. Whenever I do that, I will update the blog.
probe-f076c2b0,201,842.53,75.5372,1448028160,29.37
|
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3))
.maxBy(1)
|
1448028160,30.02
1448028160,25.92
1448028160,22.18
1448028161,16.18
1448028161,16.36
1448028161,19.19
1448028162,18.99
1448028162,27.62
1448028162,18.82
|
First window (of 3 milliseconds), the highest ambient temperature is 30.02
|
1448028163,20.44
1448028163,16.18
1448028163,21.57
1448028164,22.66
1448028164,27.83
1448028164,23.22
1448028165,19.69
1448028165,21.59
|
Second of (of 3 milliseconds), the highest ambient temperature is 27.83
|
(1448028166,32.06)
(1448028160,30.02)
(1448028163,27.83)
|
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(1))
.maxBy(1)
|
(1448028166,32.06)
(1448028162,27.62)
(1448028164,24.34)
(1448028167,26.37)
(1448028165,23.73)
(1448028163,27.83)
(1448028161,29.43)
(1448028160,30.02)
|
Sliding Time Window
The timeWindow that we have used here is a tumbling window. We recall from the earlier blog-post that once we apply our computation (here, maxBy() ) to its contents, a tumbling window gets rid of (tumbles) its contents, by consigning them to oblivion.
As one expects, there is a variant of timeWindow which behaves as a sliding window. We can put it to use, thus:
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3),Time.milliseconds(1))
.maxBy(1)
When our small program is run using the actual data file (SampleIOT.csv), Flink generates the following output:
(1448028166,32.06)
(1448028166,32.06)
(1448028160,30.02)
(1448028167,26.37)
(1448028161,29.43)
(1448028160,30.02)
(1448028163,27.83)
(1448028166,32.06)
(1448028163,27.83)
(1448028160,30.02)
To understand how Flink generates the output above, let us use a smaller dataset (not what SampleIOT.csv contains). We consider a series of pairs of values; each pair consists of (timeStamp in ms,temperature recorded in degrees):
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05),(3,9.05),(3,13.05),(4,28.05),(4,29.05),(5,26.05)
Remember that we are using a sliding timeWindow(3,1) along with the ‘notion of time’ applicable being that of eventTime and instructing Flink to use the timeStamp of the pair as an ascending eventTime.
timeWindow sliding this way ->
timeStamp == 1
(1,20.5),(1,19.05),(1,21.05)
Flink prints:21.05
TimeStamp == 1,2
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05)
Flink prints: 21.05
TimeStamp == 1,2,3
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05),(3,9.05),(3,13.05)
First timeWindow created, Flink prints: 21.05
TimeStamp == 2,3,4
(2,11.05),(2,15.05),(3,9.05),(3,13.05),(4,28.05),(4,29.05)
Second timeWindow created (after slide), Flink prints 29.05
TimeStamp == 3,4,5
(3,9.05),(3,13.05)(4,28.05),(4,29.05),(5,26.05)
Third timeWindow created (after slide), Flink prints: 29.05
TimeStamp == 4,5
(4,28.05),(4,29.05),(5,26.05)
Fourth timeWindow created (after slide), Flink prints: 29.05
TimeStamp == 5
(5,26.05)
Fifth and final timeWindow created (after slide), Flink prints 26.05
This may not be the best representation of what really happens, but I hope that this provides sufficient visualization of the same! :-)
As we observe, in a sliding window, events are grouped according to their timestamps - in this case, 3 milliseconds. Also, an event can belong to more than window because the window slides progressively, by 1 millisecond in this case. From our small example dataset above, pairs (2,*) and (3,*) belong to two successive timeWindows.
I will share findings of further explorations in the world of Flink, in my upcoming blogs.
I have used Flink 0.10.0 for the code shown above. It can be accessed here and here.
Many thanks to fantastic folks are not only are behind Flink, but also in the front of it, tirelessly answering my newbie questions in the Flink User Group and prodding me to understand the concepts.
val readings =
readIncomingReadings(env,"./sampleIOT.csv")
.map(e => (e.timeStamp,e.ambientTemperature))
.assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
.timeWindowAll(Time.milliseconds(3),Time.milliseconds(1))
.maxBy(1)
|
(1448028166,32.06)
(1448028166,32.06)
(1448028160,30.02)
(1448028167,26.37)
(1448028161,29.43)
(1448028160,30.02)
(1448028163,27.83)
(1448028166,32.06)
(1448028163,27.83)
(1448028160,30.02)
|
timeWindow sliding this way ->
| |||||
timeStamp == 1
| |||||
(1,20.5),(1,19.05),(1,21.05)
| |||||
Flink prints:21.05
| |||||
TimeStamp == 1,2
| |||||
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05)
| |||||
Flink prints: 21.05
| |||||
TimeStamp == 1,2,3
| |||||
(1,20.5),(1,19.05),(1,21.05),(2,11.05),(2,15.05),(3,9.05),(3,13.05)
| |||||
First timeWindow created, Flink prints: 21.05
| |||||
TimeStamp == 2,3,4
| |||||
(2,11.05),(2,15.05),(3,9.05),(3,13.05),(4,28.05),(4,29.05)
| |||||
Second timeWindow created (after slide), Flink prints 29.05
| |||||
TimeStamp == 3,4,5
| |||||
(3,9.05),(3,13.05)(4,28.05),(4,29.05),(5,26.05)
| |||||
Third timeWindow created (after slide), Flink prints: 29.05
| |||||
TimeStamp == 4,5
| |||||
(4,28.05),(4,29.05),(5,26.05)
| |||||
Fourth timeWindow created (after slide), Flink prints: 29.05
| |||||
TimeStamp == 5
| |||||
(5,26.05)
| |||||
Fifth and final timeWindow created (after slide), Flink prints 26.05
|
I have used Flink 0.10.0 for the code shown above. It can be accessed here and here.
Many thanks to fantastic folks are not only are behind Flink, but also in the front of it, tirelessly answering my newbie questions in the Flink User Group and prodding me to understand the concepts.
"At this point, I have not been able to figure out, how Flink decides the the order in which tuples are printed"
ReplyDeleteThis is due to the way watermarks flow through operators. By default watermark is generated every 200(or 100) ms. In your toy example all windows are triggered together.
Yes, you are right. I should have edited it in time. I have been lazy. Thanks again.
ReplyDeleteI plan to take this up in a separate blog, later.