Using Apache Flink with Java

Gitesh Dhore 02 Sep, 2022 • 6 min read

This article was published as a part of the Data Science Blogathon.

Introduction

Apache Flink is a big data framework that allows programmers to process huge amounts of data in a very efficient and scalable way. This article will introduce some basic API concepts and standard data transformations available in the Apache Flink Java API. The fluid style of this API makes it easy to work with Flink’s central construct – a distributed collection. First, we’ll look at Flink’s DataSet API transformations and use them to implement a word-counting program. Then we’ll take a brief look at Flink’s DataStream API, which allows you to process event streams in real time.

Apache Flink
https://en.wikipedia.org/wiki/Apache_Flink

 

Dependency on Maven

To get started, we’ll need to add the Maven dependencies to the link-java and flink-test-utils libraries:
    org.apache.flink
    flink-java
    1.2.0


    org.apache.flink
    flink-test-utils_2.10
    1.2.0
    test

 

Basic API Concepts

When working with Flink, we need to know a few things related to its API: Various data transformation functions are available, including filtering, mapping, joining, grouping, and aggregation sink operation in Flink initiates the execution of a stream to produce the desired result of the program, such as saving the result to the file system or printing it to standard output. Flink transformations are lazy and not executed until the sink operation is invoked.
The  API has two modes of operation, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API. To process unlimited data streams in real-time, you must use the DataStream API.

DataSet API Transformation

The entry point to a Flink program is an instance of the ExecutionEnvironment class — this defines the context in which the program is executed.
Let’s create an ExecutionEnvironment to start processing:
ExecutionEnvironment env
  = ExecutionEnvironment.getExecutionEnvironment();
Note that running the application on the local machine will do the processing on the local JVM. If you wanted to start processing on a cluster of machines, you would need to install Apache Flink and configure the ExecutionEnvironment accordingly.
Create a dataset
We need to supply data to our program to perform data transformations.
DataSet amounts = env.from elements(1, 29, 40, 50);
You can create a DataSet from multiple sources, such as Apache Kafka, CSV, a file, or virtually any other data source.
Filter and reduce
Let’s say you want to filter out numbers above a certain threshold and then sum them all. You can use filter() and reduce() transformations to achieve this:
int threshold = 30;
List collect = amounts
  .filter(a -> a > threshold)
  .reduce((integer, t1) -> integer + t1)
  .collect();
claimThis(collect.get(0)).isEqualTo(90);
Note that the collect() method is the sink operation that initiates the actual data transformations.
Map
Let’s say you have a DataSet of Person objects:
private static class Person {
    private int age;
    private String name;
    // standard constructors/getters/setters
}
Next, we create a DataSet from these objects:
DataSet personDataSource = env.from collection(
  Arrays.asList(
    new Person(23, "Tom"),
    new Person(75, "Michael")));
Suppose you want to extract only the age field from each collection object. You can only get certain fields of the Person class using the map() transformation:
List ages = personDataSource
  .map(p -> p.age)
  .collect();
claim this(ages).size(2);
assert(ages).contains(23, 75);
 

Connect

When you have two data sets, you might want to join them in some id field. You can use the join() transformation for this.
Let’s create a collection of transactions and user addresses:
Tuple3 address
  = new Tuple3(1, "5th Avenue", "London");
DataSet<Tuple3> addresses
  = env.from elements(address);
Tuple2 first transaction
  = new Tuple2(1, "Transaction_1");
Transaction DataSet<Tuple2>
  = env.from elements(first transaction, new Tuple2(12, "Transaction_2"));

The first field in both tuples is of type Integer and is the id field on which we want to join the two datasets.

To perform the actual connection logic, we need to implement the KeySelector interface for the address and transaction:
private static class IdKeySelectorTransaction
  implements KeySelector<Tuple2, Integer> {
    @Overwrite
    public Integer getKey(Tuple2 value) {
        return value.f0;
    }
}
private static class IdKeySelectorAddress
  implements KeySelector<Tuple3, Integer> {
    @Overwrite
    public Integer getKey(Tuple3 value) {
        return value.f0;
    }
}
Each selector returns only the field on which the join is to be performed.
Unfortunately, it’s not possible to use lambda expressions here because Flink needs generic type information.
free star
Next, let’s implement the merge logic using these selectors:
List<Nice2<Nice2, Nice3>>
  connected = transaction.connect(addresses)
  .where(new IdKeySelectorTransaction())
  .equalTo(new IdKeySelectorAddress())
  .collect();
claim this(joined).hasSize(1);
claim that(joined).contains(new Tuple2(first transaction, address));
 

Arrange

Let’s say you have the following Tuple2 collection:
Tuple2 secondPerson = new Tuple2(4, "Tom");
Tuple2 third person = new Tuple2(5, "Scott");
Tuple2 fourth person = new Tuple2(200, "Michael");
Tuple2 firstPerson = new Tuple2(1, "Jack");
DataSet<Tuple2> transaction = env.from elements(
  fourth person, second person, third person, first-person);
If you want to sort this collection by the first field of the tuple, you can use the sortPartitions() transform:
List<Nice2> sorted = transactions
  .sortPartition(new IdKeySelectorTransaction(), Order.ASCENDING)
  .collect();
claim that (sorted)
  .containsExactly(first person, second-person, third-person, fourth person);

 

Word Count

The word count problem is a problem commonly used to demonstrate the capabilities of big data processing frameworks. The basic solution involves counting the occurrences of words in the text input.
As the first step in our solution, we create a LineSplitter class that splits our input into tokens (words), collecting a Tuple2 of key-value pairs for each token. In each of these tuples, the key is a word found in the text, and the value is an integer (1).
This class implements the FlatMapFunction interface, which takes a string as input and creates a Tuple2:
public class LineSplitter implements FlatMapFunction<String, Tuple2> {
    @Overwrite
    public void flatMap(String value, Collector<Nice2> out) {
        Stream.of(value.toLowerCase().split("\W+"))
          .filter(t -> t.length() > 0)
          .forEach(token -> out.collect(new Tuple2(token, 1)));
    }
}
We call the collect() method on the Collector class to push the data forward in the processing process.
Our next and final step is to group the tuples by their first elements (words) and then perform a sum on the second element to produce the number of occurrences of the words:
public static DataSet<Tuple2> startWordCount(
  ExecutionEnvironment env, List lines) throws Exception {
    DataSet text = env.fromCollection(lines);
    return text.flatMap(new LineSplitter())
      .groupBy(0)
      .aggregate(Aggregations.SUM, 1);
}
We use three types of Flink transformations: flatMap(), groupBy(), and aggregate().
Let’s write a test to confirm that the word count implementation works as expected:
List lines = Arrays.asList(
  "This is the first sentence",
  "This is the second one-word sentence");
DataSet<Tuple2> result = WordCount.startWordCount(env, lines);
List<Tuple2> collect = result.collect();
assert that(collect).containsExactlyInAnyOrder(
  new Tuple2("a", 3), new Tuple2("sentence", 2), new Tuple2("word", 1),
  new Tuple2("is", 2), new Tuple2("that", 2), new Tuple2("other", 1),
  new Tuple2("first", 1), new Tuple2("with", 1), new Tuple2("one", 1));

 

DataStream API

Creating a data stream
Apache Flink also supports event stream processing through the DataStream API. If we want to start consuming events, we must first use the StreamExecutionEnvironment class:
StreamExecutionEnvironment execution environment
 = StreamExecutionEnvironment.getExecutionEnvironment();
Furthermore, we can create a stream of events using the runtime from various sources. It could be some message bus like Apache Kafka, but in this example, we simply create a feed from a few string elements:
DataStream dataStream = execution environment.form elements(
  "This is the first sentence",
  "This is the second one-word sentence");
We can apply transformations to each element of the DataStream as in a normal DataSet class:
SingleOutputStreamOperator uppercase = text.map(String::toUpperCase);
To trigger the execution, we need to call a sink operation like print(), which just prints the result of the transformations to standard output, followed by the execute() method in the StreamExecutionEnvironment class:
uppercase.print();
env.execute();
It produces the following output:
1> THIS IS THE FIRST SENTENCE
2> THIS IS THE SECOND SENTENCE WITH ONE WORD
 

Events window

When processing a real-time stream of events, it may sometimes be necessary to group events and apply some calculation to a window of those events.
Suppose we have a stream of events, where each event is a pair consisting of an event number and a timestamp of when the event was sent to our system, and that we can tolerate events that are out of sequence, but only if they are not more than twenty seconds late.
In this example, first, create a stream simulating two events that are several minutes apart, and define a timestamp extractor that determines our delay threshold:
SingleOutputStreamOperator<Tuple2> in window
  = env.from elements(
  new Tuple2(16, ZonedDateTime.now().plusMinutes(25).toInstant().getEpochSecond()),
  new Tuple2(15, ZonedDateTime.now().plusMinutes(2).toInstant().getEpochSecond()))
  .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor
      <Tuple2>(Time.seconds(20)) {
        @Overwrite
        public long extract timestamp(Tuple2 element) {
          return element.f1 * 1000;
        }
    });
Next, we define a window operation to group our events into five-second windows and apply a transformation to those events:
SingleOutputStreamOperator<Tuple2> reduced = in window
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
  .maxBy(0, true);
reduced.print();
It gets the last element of each five-second window, so it prints:
1> (15.1491221519)
We don’t see the second event because it arrived later than the specified delay threshold.

Conclusion

In this article, we introduced the Apache Flink framework and looked at some of the transformations that come with its API.
  • We implemented a word count program using Flink’s smooth and functional DataSet API. We then looked at the DataStream API and implemented a simple real-time transformation to an event stream.
  • The implementation of all these examples and code snippets can be found on GitHub – this is a Maven project, so it should be easy to import and run as is.
  • Flink transformations are lazy and not executed until the sink operation is invoked. The  API has two modes of operations, i.e., batch and real-time. If you’re dealing with a limited data source that can be processed in batch mode, you’ll use the DataSet API.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion. 

Gitesh Dhore 02 Sep 2022

Frequently Asked Questions

Lorem ipsum dolor sit amet, consectetur adipiscing elit,

Responses From Readers

Clear