A real-life use case: fraud detection

Imagine you’re a data scientist who works at a large bank. You have been tasked with one of the most challenging problems in banking today: identifying fraudulent transactions. The bank receives transaction details from its credit card customers in a Kafka topic, which include information about the transaction date and time, transaction amount, transaction location, merchant, category of purchase, and so on. Given the nature of the data, you want to use Apache Flink for its stream processing capabilities and to develop machine learning features that can be used to identify fraud.

Prerequisites

Spinning up the services using Docker Compose

From your project directory, run docker compose up -d to create Kafka topics, generate sample data, and launch a Flink cluster in the background.

Tip

Running docker compose up with the -d flag runs it in detached mode, where containers are run in the background. While this frees up the terminal for you to run other commands, it also hides error messages.

Depending on whether the container images are already available locally, setting up the containers may take anywhere from 10 seconds to a minute. If it’s your first time running this command, it’s best to run it in the foreground so that you can monitor the progress of setup.

This should set up a transaction topic in the Kafka cluster that contains messages that look like the following:

{ "trans_date_trans_time": "2012-02-23 00:10:01", "cc_num":
4428780000000000000, "merchant": "fraud_Olson, Becker and Koch", "category":
"gas_transport", "amt": 82.55, "first": "Richard", "last": "Waters", "zipcode":
"53186", "dob": "1/2/46", "trans_num": "dbf31d83eebdfe96d2fa213df2043586",
"is_fraud": 0, "user_id": 7109464218691269943 }
Warning

Do not proceed to the next section until messages are flowing into the transaction topic!

Connect to a data source

In order to experiment with the data in the Kafka topic and create transformations on top of it, we need to first define and connect to the data source.

While we’re dealing with a continuous stream of data here, Flink and Ibis abstract differences in the underlying implementation between tables and streams, so that, conceptually, we can simply treat our Kafka topic as a table.

To connect to our transaction Kafka topic, we need to provide a table name, schema of the data, and connector configurations. The schema of the data must contain a subset of the fields in the actual Kafka topic. Because this is a streaming job, we also want to define a watermark strategy for the data source by specifying the timestamp column (time_col) and a time duration during which late events are accepted (allowed_delay). (If you are not already familiar with these concepts, you can check out Flink’s documentation for more details.) Note that Flink requires the timestamp column to be of data type TIMESTAMP(3).

import ibis
import ibis.expr.datatypes as dt
import ibis.expr.schema as sch

source_schema = sch.Schema(
    {
        "user_id": dt.int64,
        "trans_date_trans_time": dt.timestamp(scale=3),
        "cc_num": dt.int64,
        "amt": dt.float64,
        "trans_num": dt.str,
        "merchant": dt.str,
        "category": dt.str,
        "is_fraud": dt.int32,
        "first": dt.str,
        "last": dt.str,
        "dob": dt.str,
        "zipcode": dt.str,
    }
)

# Configure the source table with Kafka connector properties.
source_configs = {
    "connector": "kafka",
    "topic": "transaction",
    "properties.bootstrap.servers": "localhost:9092",
    "properties.group.id": "consumer_group_0",
    "scan.startup.mode": "earliest-offset",
    "format": "json",
}

# Create the source table using the defined schema, Kafka connector properties,
# and set watermarking for real-time processing with a 15-second allowed
# lateness.
source_table = connection.create_table(
    "transaction",
    schema=source_schema,
    tbl_properties=source_configs,
    watermark=ibis.watermark(
        time_col="trans_date_trans_time", allowed_delay=ibis.interval(seconds=15)
    ),
)

We’re ready to write some transformations!

Create transformations

Which signs could be indicative of suspected fraud in a credit card? Oftentimes, we’re looking for abnormalities in user behaviors, for example, an excessively large transaction amount, unusually frequent transactions during a short period of time, etc. Based on this, the average transaction amount and the total transaction count over the past five hours may be useful features. Let’s write out each of these using Ibis API:

user_trans_amt_last_360m_agg = source_table[
    source_table.user_id,
    # Calculate the average transaction amount over the past six hours
    source_table.amt.mean()
    .over(
        ibis.window(
            group_by=source_table.user_id,
            order_by=source_table.trans_date_trans_time,
            range=(-ibis.interval(minutes=360), 0),
        )
    )
    .name("user_mean_trans_amt_last_360min"),
    # Calculate the total transaction count over the past six hours
    source_table.amt.count()
    .over(
        ibis.window(
            group_by=source_table.user_id,
            order_by=source_table.trans_date_trans_time,
            range=(-ibis.interval(minutes=360), 0),
        )
    )
    .name("user_trans_count_last_360min"),
    source_table.trans_date_trans_time,
]

over() creates an over aggregation in Flink, which computes an aggregated value for every input row. More specifically, this means that an aggregation result is computed and emitted for every new record flowing into the upstream Kafka topic.

The issue with over aggregation is that, if there is no new transaction for a specific user during a time window, there would be no aggregation result written to the sink. In other words, the user would never show up in the result table if they never made a transaction.

Alternatively, we can compute aggregations using Flink’s windowing table-valued functions. This allows more flexibility in defining windows and when results are computed and emitted into the sink. There are three types of windowing TVFs available in Flink: tumble, hop, and cumulate. Let’s define the same features with tumble windows:

windowed_stream = source_table.window_by(
    time_col=source_table.trans_date_trans_time,
).tumble(window_size=ibis.interval(minutes=360))

user_trans_amt_last_360m_agg_windowed_stream = windowed_stream.group_by(
    ["window_start", "window_end", "user_id"]
).agg(
    user_mean_trans_amt_last_360min=windowed_stream.amt.mean(),
    user_trans_count_last_360min=windowed_stream.amt.count(),
)

Connect to a data sink

We’re creating streaming jobs to continuously process upstream data, which could be infinite. Therefore, we want to have the job continuously running and write results into a data sink. Here, we’re simply going to write results into a separate Kafka topic named user_trans_amt_last_360min for convenient downstream processing.

We can define a data sink in virtually the same exact way in which we defined our data source:

sink_schema = sch.Schema(
    {
        "user_id": dt.int64,
        "user_mean_trans_amt_last_360min": dt.float64,
        "user_trans_count_last_360min": dt.int64,
        "trans_date_trans_time": dt.timestamp(scale=3),
    }
)

# Configure the sink table with Kafka connector properties for writing results.
sink_configs = {
    "connector": "kafka",
    "topic": "user_trans_amt_last_360min",
    "properties.bootstrap.servers": "localhost:9092",
    "format": "json",
}

sink_table = connection.create_table(
    "user_trans_amt_last_360min",
    schema=sink_schema,
    tbl_properties=sink_configs,
)

The last step is to connect the pieces and actually write our query results into the sink table that we had just created:

connection.insert("user_trans_amt_last_360min",
user_trans_amt_last_360m_agg)
<pyflink.table.table_result.TableResult at 0x15455a5f0>

This step is exactly the same for windowing TVFs:

sink_schema = sch.Schema(
    {
        "window_start": dt.timestamp(scale=3),
        "window_end": dt.timestamp(scale=3),
        "user_id": dt.int64,
        "user_mean_trans_amt_last_360min": dt.float64,
        "user_trans_count_last_360min": dt.int64,
    }
)

# Configure the sink table with Kafka connector properties for writing results.
sink_configs = {
    "connector": "kafka",
    "topic": "user_trans_amt_last_360min_windowed",
    "properties.bootstrap.servers": "localhost:9092",
    "format": "json",
}

sink_table = connection.create_table(
    "user_trans_amt_last_360min_windowed",
    schema=sink_schema,
    tbl_properties=sink_configs,
)

connection.insert(
    "user_trans_amt_last_360min_windowed", user_trans_amt_last_360m_agg_windowed_stream
)
<pyflink.table.table_result.TableResult at 0x1545adc00>

Expected output

Now, if everything is working correctly, you should expect to see results being streamed into the Kafka topic!

Tip

You can inspect the Kafka topic using the Python Kafka client if you have it installed or via console Kafka consumer:

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    "user_trans_amt_last_360min"
)  # or "user_trans_amt_last_360min_windowed"
for _, msg in zip(range(10), consumer):
    print(msg)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48125, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":-312211619698468596,"user_mean_trans_amt_last_360min":1507.78,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-29 03:26:20"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=152, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48126, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":-1298429917834413049,"user_mean_trans_amt_last_360min":182.82,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-29 03:26:56"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=152, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48127, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":-4266863151675529758,"user_mean_trans_amt_last_360min":2.829999999999986,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-29 03:28:15"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48128, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":8211595333002534503,"user_mean_trans_amt_last_360min":3.49,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-29 03:28:42"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=149, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48129, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":-8800824711477190666,"user_mean_trans_amt_last_360min":70.93999999999998,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-29 03:28:51"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48130, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":-6588488932184900316,"user_mean_trans_amt_last_360min":22.05999999999996,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-29 03:29:12"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=163, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48131, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":-494959696861882363,"user_mean_trans_amt_last_360min":53.07000000000003,"user_trans_count_last_360min":1,"trans_date_trans_time":"2012-01-29 03:31:18"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=162, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48132, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":6819872428711972947,"user_mean_trans_amt_last_360min":31.635,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-29 03:32:20"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=151, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48133, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":5518566883052153333,"user_mean_trans_amt_last_360min":5.799999999999996,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-29 03:33:14"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=162, serialized_header_size=-1)
ConsumerRecord(topic='user_trans_amt_last_360min', partition=0, offset=48134, timestamp=1706914332495, timestamp_type=0, key=None, value=b'{"user_id":2295915180219832597,"user_mean_trans_amt_last_360min":31.93,"user_trans_count_last_360min":2,"trans_date_trans_time":"2012-01-29 03:33:21"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=150, serialized_header_size=-1)

Next steps

Woohoo, great job! Now that you’ve connected to Flink and learned the basics, you can query your own data. See the rest of the Ibis documentation or Flink documentation. You can open an issue if you run into one!

Back to top