Stream-batch unification through Ibis

blog
flink
risingwave
streaming
Author

Chloe He

Published

February 26, 2024

One of my focuses in the past 10 months has been to implement the Flink backend for Ibis. I was working with Apache Flink and building a feature engineering tool, and we stumbled upon Ibis as we attempted to build our own translation layer that could turn user declarations into relation trees, then optimize and deploy the query plan, all while maintaining the underlying infrastructure for the user. We considered and prototyped with a number of tools and eventually chose Ibis. It had already established a position in the batch world and had support for 10+ of the most popular batch engines (at the time). We loved the idea of decoupling the user-facing interface from the execution engine, so that users can swap out the execution engine depending on their needs, without having to rewrite code. And, of course, it was open-source. It was everything we dreamed of.

A few months later, we started introducing Apache Flink as the first streaming backend into Ibis. We saw so much more that Ibis can do when it steps outside of batch.

Ibis 8.0 marks the official launch of the first streaming backends in Ibis (Apache Flink and RisingWave). This is a very significant milestone in Ibis development.

You may be wondering: what does this mean? Why is this such a big deal? I will be answering these questions in this blog post.

Ibis combines stream and batch into a single framework beyond version 8.0

Today, Ibis provides support for 20+ backends including Dask, DuckDB, PostgreSQL, PySpark, Snowflake, and others. However - before the introduction of Flink and RisingWave backends - all of the supported backends derive from a batch paradigm (aside from Spark, which does offer support for stream processing, albeit using micro-batches underneath the hood).

This means that Ibis is an extremely valuable tool, but it was limited to batch workloads. In the case of streaming workloads, where systems are designed with unbounded data in mind, the batch-oriented Ibis fell short. To deal with an infinite data stream, streaming data systems operate with unique concepts such as “event time”, “processing time”, “watermark”, etc. All of these were missing from Ibis.

At the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave, etc) have been gaining popularity. It drove the development of more mature technologies as well as new approaches to close the gap between batch and streaming worlds. Flink SQL, for example, was born as a part of such effort and, through allowing users to write streaming engines in a SQL-like manner, have been vastly successful in that regard. The success of Flink SQL both validates the potential of stream and batch unification and inspires the community to push for better standards, a vision that Ibis is at a unique and valuable position to help build.

Why is batch-stream unification significant?

Firstly, large companies that have both batch and streaming workloads often deploy Lambda architecture. In a Lambda infrastructure, batch and streaming pipelines are separate, which requires two codebases to be set up and maintained. If you’re a platform engineer, you have probably found yourself trying to duplicate batch workloads “in streaming code” and vice versa. If you have backfilled a streaming pipeline due to a bug and needed to reimplement the logic on a batch pipeline, you know how painful that all is :(

LinkedIn successfully reduced processing time by 94% and resource utilization by 50% after switching from a Lambda architecture to unified batch and streaming pipelines. A unified system also massively increased engineer productivity because they no longer needed to develop and maintain separate codebases for different environments. Uber, Alibaba, and Lyft have also adopted similar solutions.

Secondly, in the world of machine learning, it’s common for data scientists to develop locally and experiment with a sampled, batch dataset in Python. If the results look promising, the features and models would then be deployed into production. Oftentimes, there is a code handover in this process, and a dedicated team of developers would be responsible for rewriting the logic for production, as a streaming workload.

In both cases, there is a huge amount of technical overhead. If there is a streamlined architecture, using a unified API, much of this overhead can be avoided. As a platform engineer, you no longer need to worry about maintaining two separate architectures and codebases. As a data scientist or a machine learning engineer, you can write one single workload that can execute both on batch and streaming backends. Wouldn’t that be amazing?

Ibis unifies batch and streaming

Enter Ibis. Ibis unifies batch and streaming with a single API. It decouples the dataframe API from backend execution, so that the logic for defining data transformations is unaffected by implementation discrepancies across backend engines. There is also an ongoing effort to further increase interoperability across different languages and systems via a standard query plan intermediate representation (IR), using a library called Substrait.

What does this actually look like? For example, Ibis allows users to define window aggregations using the over() method. When executed on the Flink backend, this translates into Flink’s over aggregation query and outputs an aggregated value for every input row over a range of ordered rows. On streaming data, aggregation results are continuously computed and written into data sinks (e.g., Kafka, Redis) as records are received at and consumed from the upstream data source (e.g., Kafka, Change Data Capture). In pandas, the conceptual analog is windowing operation. Results are computed by looking back the length of the window from the current observation, but can be computed all at once because batch data is static.

Another great example is deduplication. In Flink SQL, this looks something like this:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

In a database like Postgres, this could be as simple as

SELECT DISTINCT t0.`string_col`, t0.`int_col`
FROM functional_alltypes t0

And in pandas, you would use the method drop_duplicates():

df.drop_duplicates()
Note

We’re working on supporting deduplication via distinct() in Flink backend and this feature should be available soon!

These underlying discrepancies are abstracted in such a way that you, as an Ibis user, will no longer find yourself struggling with bugs that are the result of subtleties across different engines and dialects. Need to rewrite your batch workload as a streaming one or vice versa? Rest assured, Ibis has you covered!

See it in action

Now, let’s walk through a code example together to see how simple this experience is!

Note

Prerequisites for running this example:

Note

This example is a hypothetical scenario and we will be using simulated data.

First, spin up the Docker containers by running docker compose up kafka init-kafka data-generator. This will set up a mocked Kafka source that contains records that look like the following:

{
    "createTime": "2023-09-20 22:19:02.224",
    "orderId": 1695248388,
    "payAmount": 88694.71922270155,
    "payPlatform": 0,
    "provinceId": 6,
}

This is a streaming data source. Commonly, to experiment with the data, we would extract a chunk of the data and load it in batch:

from kafka import KafkaConsumer

consumer = KafkaConsumer("payment_msg", auto_offset_reset="earliest")
rows = []
for _, msg in zip(range(100), consumer):
    rows.append(msg)

This is a tabular dataset and we can convert it into a pandas DataFrame:

import json

import pandas as pd

df = pd.DataFrame([json.loads(row.value) for row in rows])
df["createTime"] = pd.to_datetime(df["createTime"])
df
createTime orderId payAmount payPlatform provinceId
0 2024-02-24 07:15:40.896 1708758941 85955.211753 0 3
1 2024-02-24 07:15:41.402 1708758942 70006.918133 0 3
2 2024-02-24 07:15:41.905 1708758943 4352.885704 0 3
3 2024-02-24 07:15:42.405 1708758944 20850.068597 0 2
4 2024-02-24 07:15:42.909 1708758945 72829.020076 0 4
... ... ... ... ... ...
95 2024-02-24 07:16:28.636 1708759036 70241.647749 0 5
96 2024-02-24 07:16:29.139 1708759037 89075.951048 1 2
97 2024-02-24 07:16:29.653 1708759038 49521.531528 0 5
98 2024-02-24 07:16:30.154 1708759039 9171.094251 0 2
99 2024-02-24 07:16:30.655 1708759040 62199.631597 0 3

100 rows × 5 columns

We can connect to this DataFrame in Ibis in a local execution backend:

import ibis

con = ibis.get_backend()
con.create_table("payments", df)
DatabaseTable: payments
  createTime  timestamp(6)
  orderId     int64
  payAmount   float64
  payPlatform int64
  provinceId  int64
Note

The default execution engine for Ibis is DuckDB.

This is a series of records of order transactions. At Company Potclay, we have just deployed a new ad campaign, which is A/B tested by province, and we’re interested in the effectiveness of this ad campaign by monitoring data distribution shift over time. A crucial feature is the total transaction amount over the past minute, stratified by province. We would like to first experiment writing this feature on a smaller set of batch data. After we make sure that the logic looks correct and handles all edge cases appropriately, we want to deploy this as a streaming workload.

Ibis allows us to write transformations on top of so-called abstract or unbound tables (i.e., tables that are not bound to an actual data source). This separation between transformation logic and the underlying data and execution is one of the things that makes Ibis so powerful. It’s similar to dependency injection, but in this case the data is the dependency and is injected at runtime.

To write transformations on top of an unbound table, we need to first define an ibis.table() with a schema. Here is how we would write all of this in Ibis code:

import ibis.expr.schema as sch
import ibis.expr.datatypes as dt
from ibis import _

schema = sch.Schema(
    {
        "createTime": dt.timestamp(scale=3),
        "orderId": dt.int64,
        "payAmount": dt.float64,
        "payPlatform": dt.int32,
        "provinceId": dt.int32,
    }
)
unbound_table = ibis.table(schema, name="payments")
unbound_agged = unbound_table[
    "provinceId",
    _.payAmount.sum()
    .over(range=(-ibis.interval(seconds=10), 0), order_by=_.createTime)
    .name("pay_amount"),
]
unbound_agged
r0 := UnboundTable: payments
  createTime  timestamp(3)
  orderId     int64
  payAmount   float64
  payPlatform int32
  provinceId  int32

Project[r0]
  provinceId: r0.provinceId
  pay_amount: WindowFunction(func=Sum(r0.payAmount), frame=RangeWindowFrame(table=r0, start=WindowBoundary(value=10
s, preceding=True), end=WindowBoundary(Cast(0, to=interval('s'))), order_by=[asc r0.createTime]))

Carrying out the computations using the local execution backend that we connected to above is as simple as:

con.to_pandas(unbound_agged)
provinceId pay_amount
0 3 8.595521e+04
1 3 1.559621e+05
2 3 1.603150e+05
3 2 1.811651e+05
4 4 2.539941e+05
... ... ...
95 5 1.270405e+06
96 2 1.269567e+06
97 5 1.277924e+06
98 2 1.204362e+06
99 3 1.202042e+06

100 rows × 2 columns

Note

DuckDB is much faster than pandas, and using Ibis you don’t need to write SQL for it!

For local experimentation purposes, this DataFrame only consists of 100 rows, so doing this in memory is easy.

The outputs look correct and we didn’t run into any errors. We are now ready to deploy this as a streaming job in Flink!

First, let’s set up the Flink environment and connect to this Kafka source:

Note

Kafka connector is not part of the binary distribution, so we need to download and link it for cluster execution explicitly:

!wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.common import Configuration

source_schema = sch.Schema(
    {
        "createTime": dt.timestamp(scale=3),
        "orderId": dt.int64,
        "payAmount": dt.float64,
        "payPlatform": dt.int32,
        "provinceId": dt.int32,
    }
)

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_config = table_env.get_config()
config = Configuration()
config.set_string("parallelism.default", "1")
table_config.add_configuration(config)

connection = ibis.flink.connect(table_env)

# add the JAR downloaded above
connection.raw_sql("ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'")

source_configs = {
    "connector": "kafka",
    "topic": "payment_msg",
    "properties.bootstrap.servers": "localhost:9092",
    "properties.group.id": "test_3",
    "scan.startup.mode": "earliest-offset",
    "format": "json",
}

connection.create_table(
    "payments",
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="createTime", allowed_delay=ibis.interval(seconds=15)
    ),
)
DatabaseTable: payments
  createTime  timestamp(3)
  orderId     int64
  payAmount   float64
  payPlatform int32
  provinceId  int32

How would we write this in Flink SQL? Ibis makes this extremely easy by exposing a compile() API:

sql = connection.compile(unbound_agged)
print(sql)
SELECT
  `t0`.`provinceId`,
  SUM(`t0`.`payAmount`) OVER (ORDER BY `t0`.`createTime` ASC NULLS LAST RANGE BETWEEN INTERVAL '10' SECOND(2) preceding AND CURRENT ROW) AS `pay_amount`
FROM `payments` AS `t0`

Before we can execute this query, we need to first define a data sink where the results can be written:

sink_schema = sch.Schema(
    {
        "province_id": dt.int32,
        "pay_amount": dt.float64,
    }
)

kafka_sink_configs = {
    "connector": "kafka",
    "topic": "sink",
    "properties.bootstrap.servers": "localhost:9092",
    "format": "json",
}

connection.create_table(
    "kafka_sink", schema=sink_schema, tbl_properties=kafka_sink_configs
)
DatabaseTable: kafka_sink
  province_id int32
  pay_amount  float64

Now, let’s write the results into this sink. Note that we can directly reuse the transformation logic that we wrote above for the local execution backend!!

connection.insert("kafka_sink", unbound_agged)
<pyflink.table.table_result.TableResult at 0x15eea5900>
Tip

You can examine the results either using the Kafka console consumer CLI or the kafka-python library.

How easy was it to define both batch and streaming workloads using Ibis? Without Ibis, you would have needed to write a pandas/DuckDB workload and then convert it into Flink SQL manually.

Concluding thoughts

With the introduction of the first streaming backends, Ibis is now both a batch and a streaming Python DataFrame API and we’re excited about what’s to come next. We hope that Ibis can close the gap between batch and streaming in such a way that we no longer talk about the two separately, but, rather, as two parts of the same paradigm. Streaming naturally lends itself to batch: batch is technically just a special case of streaming, where the unbounded data flow stops at some point.

Of course, this is only the beginning. There are still technical challenges to be solved (e.g., backfill, window computations over large windows, GPU acceleration), and we’ll definitely have more exciting updates to share with the community soon!

Check out the new Apache Flink and RisingWave backends and let us know what you think!

Back to top