from pyflink.table import EnvironmentSettings, TableEnvironment
import ibis
= EnvironmentSettings.in_streaming_mode()
env_settings = TableEnvironment.create(env_settings)
table_env set("parallelism.default", "1")
table_env.get_config().= ibis.flink.connect(table_env) connection
A real-life use case: fraud detection
Imagine you’re a data scientist who works at a large bank. You have been tasked with one of the most challenging problems in banking today: identifying fraudulent transactions. The bank receives transaction details from its credit card customers in a Kafka topic, which include information about the transaction date and time, transaction amount, transaction location, merchant, category of purchase, and so on. Given the nature of the data, you want to use Apache Flink for its stream processing capabilities and to develop machine learning features that can be used to identify fraud.
Prerequisites
- Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka environment (including sample data generation) and a Flink cluster (for remote execution). You can download and install Docker Compose from the official website.
- JDK 11 release: Flink requires Java 11.
- Python 3.9 or 3.10.
- Follow the setup tutorial to install the Flink backend for Ibis.
- Clone the example repository.
Spinning up the services using Docker Compose
From your project directory, run docker compose up -d
to create Kafka topics, generate sample data, and launch a Flink cluster in the background.
Running docker compose up
with the -d
flag runs it in detached mode, where containers are run in the background. While this frees up the terminal for you to run other commands, it also hides error messages.
Depending on whether the container images are already available locally, setting up the containers may take anywhere from 10 seconds to a minute. If it’s your first time running this command, it’s best to run it in the foreground so that you can monitor the progress of setup.
This should set up a transaction
topic in the Kafka cluster that contains messages that look like the following:
{ "trans_date_trans_time": "2012-02-23 00:10:01", "cc_num":
4428780000000000000, "merchant": "fraud_Olson, Becker and Koch", "category":
"gas_transport", "amt": 82.55, "first": "Richard", "last": "Waters", "zipcode":
"53186", "dob": "1/2/46", "trans_num": "dbf31d83eebdfe96d2fa213df2043586",
"is_fraud": 0, "user_id": 7109464218691269943 }
Do not proceed to the next section until messages are flowing into the transaction
topic!
Connect to a Flink environment session
We can connect to a Flink environment session by creating a pyflink.table.TableEnvironment
and passing this to Flink backend’s connect
method. For this tutorial, we are going to use Flink in streaming mode.
The Kafka connector isn’t part of the binary distribution. In order to connect to a Kafka source/sink, we need to download the JAR file and manually add it into the classpath:
!wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar
"ADD JAR './flink-sql-connector-kafka-3.0.2-1.18.jar'") connection.raw_sql(
Now that we’ve set up the Flink table environment, we’re ready to connect to data!
Connect to a data source
In order to experiment with the data in the Kafka topic and create transformations on top of it, we need to first define and connect to the data source.
While we’re dealing with a continuous stream of data here, Flink and Ibis abstract differences in the underlying implementation between tables and streams, so that, conceptually, we can simply treat our Kafka topic as a table.
To connect to our transaction
Kafka topic, we need to provide a table name, schema of the data, and connector configurations. The schema of the data must contain a subset of the fields in the actual Kafka topic. Because this is a streaming job, we also want to define a watermark strategy for the data source by specifying the timestamp column (time_col
) and a time duration during which late events are accepted (allowed_delay
). (If you are not already familiar with these concepts, you can check out Flink’s documentation for more details.) Note that Flink requires the timestamp column to be of data type TIMESTAMP(3).
import ibis
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch
= sch.Schema(
source_schema
{"user_id": dt.int64,
"trans_date_trans_time": dt.timestamp(scale=3),
"cc_num": dt.int64,
"amt": dt.float64,
"trans_num": dt.str,
"merchant": dt.str,
"category": dt.str,
"is_fraud": dt.int32,
"first": dt.str,
"last": dt.str,
"dob": dt.str,
"zipcode": dt.str,
}
)
# Configure the source table with Kafka connector properties.
= {
source_configs "connector": "kafka",
"topic": "transaction",
"properties.bootstrap.servers": "localhost:9092",
"properties.group.id": "consumer_group_0",
"scan.startup.mode": "earliest-offset",
"format": "json",
}
# Create the source table using the defined schema, Kafka connector properties,
# and set watermarking for real-time processing with a 15-second allowed
# lateness.
= connection.create_table(
source_table "transaction",
=source_schema,
schema=source_configs,
tbl_properties=ibis.watermark(
watermark="trans_date_trans_time", allowed_delay=ibis.interval(seconds=15)
time_col
), )
We’re ready to write some transformations!
Create transformations
Which signs could be indicative of suspected fraud in a credit card? Oftentimes, we’re looking for abnormalities in user behaviors, for example, an excessively large transaction amount, unusually frequent transactions during a short period of time, etc. Based on this, the average transaction amount and the total transaction count over the past five hours may be useful features. Let’s write out each of these using Ibis API:
= source_table[
user_trans_amt_last_360m_agg
source_table.user_id,# Calculate the average transaction amount over the past six hours
source_table.amt.mean()
.over(
ibis.window(=source_table.user_id,
group_by=source_table.trans_date_trans_time,
order_byrange=(-ibis.interval(minutes=360), 0),
)
)"user_mean_trans_amt_last_360min"),
.name(# Calculate the total transaction count over the past six hours
source_table.amt.count()
.over(
ibis.window(=source_table.user_id,
group_by=source_table.trans_date_trans_time,
order_byrange=(-ibis.interval(minutes=360), 0),
)
)"user_trans_count_last_360min"),
.name(
source_table.trans_date_trans_time, ]
over()
creates an over aggregation in Flink, which computes an aggregated value for every input row. More specifically, this means that an aggregation result is computed and emitted for every new record flowing into the upstream Kafka topic.
The issue with over aggregation is that, if there is no new transaction for a specific user during a time window, there would be no aggregation result written to the sink. In other words, the user would never show up in the result table if they never made a transaction.
Alternatively, we can compute aggregations using Flink’s windowing table-valued functions. This allows more flexibility in defining windows and when results are computed and emitted into the sink. There are three types of windowing TVFs available in Flink: tumble, hop, and cumulate. Let’s define the same features with tumble windows:
= source_table.window_by(
windowed_stream =source_table.trans_date_trans_time,
time_col=ibis.interval(minutes=360))
).tumble(window_size
= windowed_stream.group_by(
user_trans_amt_last_360m_agg_windowed_stream "window_start", "window_end", "user_id"]
[
).agg(=windowed_stream.amt.mean(),
user_mean_trans_amt_last_360min=windowed_stream.amt.count(),
user_trans_count_last_360min )
Connect to a data sink
We’re creating streaming jobs to continuously process upstream data, which could be infinite. Therefore, we want to have the job continuously running and write results into a data sink. Here, we’re simply going to write results into a separate Kafka topic named user_trans_amt_last_360min
for convenient downstream processing.
We can define a data sink in virtually the same exact way in which we defined our data source:
= sch.Schema(
sink_schema
{"user_id": dt.int64,
"user_mean_trans_amt_last_360min": dt.float64,
"user_trans_count_last_360min": dt.int64,
"trans_date_trans_time": dt.timestamp(scale=3),
}
)
# Configure the sink table with Kafka connector properties for writing results.
= {
sink_configs "connector": "kafka",
"topic": "user_trans_amt_last_360min",
"properties.bootstrap.servers": "localhost:9092",
"format": "json",
}
= connection.create_table(
sink_table "user_trans_amt_last_360min",
=sink_schema,
schema=sink_configs,
tbl_properties )
The last step is to connect the pieces and actually write our query results into the sink table that we had just created:
"user_trans_amt_last_360min",
connection.insert( user_trans_amt_last_360m_agg)
<pyflink.table.table_result.TableResult at 0x16ba87820>
This step is exactly the same for windowing TVFs:
= sch.Schema(
sink_schema
{"window_start": dt.timestamp(scale=3),
"window_end": dt.timestamp(scale=3),
"user_id": dt.int64,
"user_mean_trans_amt_last_360min": dt.float64,
"user_trans_count_last_360min": dt.int64,
}
)
# Configure the sink table with Kafka connector properties for writing results.
= {
sink_configs "connector": "kafka",
"topic": "user_trans_amt_last_360min_windowed",
"properties.bootstrap.servers": "localhost:9092",
"format": "json",
}
= connection.create_table(
sink_table "user_trans_amt_last_360min_windowed",
=sink_schema,
schema=sink_configs,
tbl_properties
)
connection.insert("user_trans_amt_last_360min_windowed", user_trans_amt_last_360m_agg_windowed_stream
)
<pyflink.table.table_result.TableResult at 0x16bab58d0>
Expected output
Now, if everything is working correctly, you should expect to see results being streamed into the Kafka topic!
You can inspect the Kafka topic using the Python Kafka client if you have it installed or via console Kafka consumer:
from kafka import KafkaConsumer
= KafkaConsumer(
consumer "user_trans_amt_last_360min"
# or "user_trans_amt_last_360min_windowed"
) for _, msg in zip(range(10), consumer):
print(msg)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45959, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":6405112720489830090,"user_mean_trans_amt_last_360min":64.675,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-28 06:14:23"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=151, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45960, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":-4272782897993296960,"user_mean_trans_amt_last_360min":61.34000000000002,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-28 06:16:24"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45961, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":-8404995308417450558,"user_mean_trans_amt_last_360min":47.17000000000002,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:17:39"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45962, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":-6232014680614930868,"user_mean_trans_amt_last_360min":131.91,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:21:00"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=152, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45963, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":1091996638939120426,"user_mean_trans_amt_last_360min":39.93000000000001,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-28 06:21:06"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=162, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45964, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":8490370068944420319,"user_mean_trans_amt_last_360min":179.13999999999996,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:21:12"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45965, timestamp=1711571328876, timestamp_type=0, key=None, value=b'{"user_id":969103286379660111,"user_mean_trans_amt_last_360min":85.7200000000001,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:22:26"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=160, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45966, timestamp=1711571328877, timestamp_type=0, key=None, value=b'{"user_id":-8414965219550676411,"user_mean_trans_amt_last_360min":6.029999999999972,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:23:31"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45967, timestamp=1711571328877, timestamp_type=0, key=None, value=b'{"user_id":-2480376382981778282,"user_mean_trans_amt_last_360min":8.270000000000007,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:27:12"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=45968, timestamp=1711571328877, timestamp_type=0, key=None, value=b'{"user_id":-5007797165712520135,"user_mean_trans_amt_last_360min":199.33,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-28 06:28:10"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=152, serialized_header_size=-1)
Next steps
Woohoo, great job! Now that you’ve connected to Flink and learned the basics, you can query your own data. See the rest of the Ibis documentation or Flink documentation. You can open an issue if you run into one!