Ibis goes real-time! Introducing the new Flink backend for Ibis

blog
flink
stream processing
Author

Deepyaman Datta

Published

February 12, 2024

Introduction

Ibis 8.0 marks the official release of the Apache Flink backend for Ibis. Ibis users can now manipulate data across streaming and batch contexts using the same interface. Flink is one of the most established stream-processing frameworks out there and a central part of the real-time data infrastructure at companies like DoorDash, LinkedIn, Netflix, and Uber. It is commonly applied in use cases such as fraud detection, anomaly detection, real-time recommendation, dynamic pricing, and online advertising. The Flink backend is also the first streaming backend Ibis supports. Follow along as we define and execute a simple streaming job using Ibis!

Installation prerequisites

Spinning up the services using Docker Compose

The ibis-project/ibis-flink-example GitHub repository includes the relevant Docker Compose configuration for this tutorial. Clone the repository, and run docker compose up from the cloned directory to create Kafka topics, generate sample data, and launch a Flink cluster:

git clone https://github.com/claypotai/ibis-flink-example.git
cd ibis-flink-example
docker compose up
Tip

If you don’t intend to try remote execution, you can start only the Kafka-related services with docker compose up kafka init-kafka data-generator.

After a few seconds, you should see messages indicating your Kafka environment is ready:

ibis-flink-example-init-kafka-1      | Successfully created the following topics:
ibis-flink-example-init-kafka-1      | payment_msg
ibis-flink-example-init-kafka-1      | sink
ibis-flink-example-init-kafka-1 exited with code 0
ibis-flink-example-data-generator-1  | Connected to Kafka
ibis-flink-example-data-generator-1  | Producing 20000 records to Kafka topic payment_msg

This example uses mock payments data. The payment_msg Kafka topic contains messages in the following format:

{
    "createTime": "2023-09-20 22:19:02.224",
    "orderId": 1695248388,
    "payAmount": 88694.71922270155,
    "payPlatform": 0,
    "provinceId": 6
}

In a separate terminal, we can explore what these messages look like:

from itertools import islice

from kafka import KafkaConsumer

consumer = KafkaConsumer("payment_msg")
for msg in islice(consumer, 3):
    print(msg)
ConsumerRecord(topic='payment_msg', partition=0, offset=1087, timestamp=1707754299796, timestamp_type=0, key=None, value=b'{"createTime": "2024-02-12 16:11:39.795", "orderId": 1707754842, "payAmount": 56028.520857965006, "payPlatform": 0, "provinceId": 3}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)
ConsumerRecord(topic='payment_msg', partition=0, offset=1088, timestamp=1707754300298, timestamp_type=0, key=None, value=b'{"createTime": "2024-02-12 16:11:40.298", "orderId": 1707754843, "payAmount": 55081.020819104546, "payPlatform": 0, "provinceId": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)
ConsumerRecord(topic='payment_msg', partition=0, offset=1089, timestamp=1707754300800, timestamp_type=0, key=None, value=b'{"createTime": "2024-02-12 16:11:40.800", "orderId": 1707754844, "payAmount": 30998.71866136802, "payPlatform": 0, "provinceId": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=131, serialized_header_size=-1)

Running the tutorial

This tutorial uses Ibis with the Flink backend to process the aforementioned payment messages. You can choose to either run it locally or submit a job to an already-running Flink cluster.

Local execution

The simpler option is to run the example using the Flink mini cluster.

Create a table environment

The table environment serves as the main entry point for interacting with the Flink runtime. The flink backend does not create TableEnvironment objects; you must create a TableEnvironment and pass that to ibis.flink.connect:

import ibis
from pyflink.table import EnvironmentSettings, TableEnvironment

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.get_config().set("parallelism.default", "1")

con = ibis.flink.connect(table_env)
1
write all the data to one file

Flink’s streaming connectors aren’t part of the binary distribution. Link the Kafka connector for cluster execution by adding the JAR file from the cloned repository. Ibis exposes the raw_sql method for situations like this, where you need to run arbitrary SQL that cannot be modeled as a table expression:

con.raw_sql("ADD JAR 'flink-sql-connector-kafka-3.0.2-1.18.jar'")

Create the source and sink tables

Use create_table to register tables. Notice the new top-level ibis.watermark API for specifying a watermark strategy.

source_schema = ibis.schema(
    {
        "createTime": "timestamp(3)",
        "orderId": "int64",
        "payAmount": "float64",
        "payPlatform": "int32",
        "provinceId": "int32",
    }
)

source_configs = {
    "connector": "kafka",
    "topic": "payment_msg",
    "properties.bootstrap.servers": "localhost:9092",
    "properties.group.id": "test_3",
    "scan.startup.mode": "earliest-offset",
    "format": "json",
}

t = con.create_table(
    "payment_msg",
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="createTime", allowed_delay=ibis.interval(seconds=15)
    ),
)

sink_schema = ibis.schema(
    {
        "province_id": "int32",
        "pay_amount": "float64",
    }
)

sink_configs = {
    "connector": "kafka",
    "topic": "sink",
    "properties.bootstrap.servers": "localhost:9092",
    "format": "json",
}

con.create_table(
    "total_amount_by_province_id", schema=sink_schema, tbl_properties=sink_configs
)
1
create source Table
2
create sink Table
DatabaseTable: total_amount_by_province_id
  province_id int32
  pay_amount  float64

Perform calculations

Compute the total pay amount per province in the past 10 seconds (as of each message, for the province in the incoming message):

agged = t.select(
    province_id=t.provinceId,
    pay_amount=t.payAmount.sum().over(
        range=(-ibis.interval(seconds=10), 0),
        group_by=t.provinceId,
        order_by=t.createTime,
    ),
)

Finally, emit the query result to the sink table:

con.insert("total_amount_by_province_id", agged)
<pyflink.table.table_result.TableResult at 0x168d2f520>

Remote execution

You can also submit the example to the remote cluster started using Docker Compose. The window_aggregation.py file in the cloned repository contains the same steps that we performed for local execution. We will use the method described in the official Flink documentation.

Tip

You can find the ./bin/flink executable with the following command:

python -c'from pathlib import Path; import pyflink; print(Path(pyflink.__spec__.origin).parent / "bin" / "flink")'

My full command looks like this:

/opt/miniconda3/envs/ibis-dev/lib/python3.10/site-packages/pyflink/bin/flink run --jobmanager localhost:8081 --python window_aggregation.py

The command will exit after displaying a submission message:

Job has been submitted with JobID b816faaf5ef9126ea5b9b6a37012cf56

Viewing the results

Similar to how we viewed messages in the payment_msg topic, we can print results from the sink topic:

consumer = KafkaConsumer("sink")
for msg in islice(consumer, 10):
    print(msg)
ConsumerRecord(topic='sink', partition=0, offset=0, timestamp=1707754308001, timestamp_type=0, key=None, value=b'{"province_id":0,"pay_amount":34598.581794063946}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=1, timestamp=1707754308005, timestamp_type=0, key=None, value=b'{"province_id":3,"pay_amount":97439.80920527918}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=2, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":5,"pay_amount":94291.98939770168}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=3, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":2,"pay_amount":67249.40686490892}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=4, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":0,"pay_amount":75995.17413984003}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=5, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":6,"pay_amount":73913.82666977578}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=6, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":6,"pay_amount":110483.70544553592}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=7, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":3,"pay_amount":146010.17295813354}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=8, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{"province_id":3,"pay_amount":200463.85497267012}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)
ConsumerRecord(topic='sink', partition=0, offset=9, timestamp=1707754308007, timestamp_type=0, key=None, value=b'{"province_id":3,"pay_amount":208752.74642677643}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)

Voilà! You’ve run your first streaming application using Ibis.

Shutting down the Compose environment

Press Ctrl+C to stop the Docker Compose containers. Once stopped, run docker compose down to remove the services created for this tutorial.

Back to top