Using one Python dataframe API to take the billion row challenge with DuckDB, Polars, and DataFusion

blog
duckdb
polars
datafusion
portability
Author

Cody

Published

January 22, 2024

Overview

This is an implementation of the The One Billion Row Challenge:

Let’s kick off 2024 true coder style—​I’m excited to announce the One Billion Row Challenge (1BRC), running from Jan 1 until Jan 31.

Your mission, should you decide to accept it, is deceptively simple: write a Java program for retrieving temperature measurement values from a text file and calculating the min, mean, and max temperature per weather station. There’s just one caveat: the file has 1,000,000,000 rows!

I haven’t written Java since dropping a computer science course my second year of college that forced us to do functional programming exclusively in Java. However, I’ll gladly take the challenge in Python using Ibis! In fact, I did something like this (generating a billion rows with 26 columns of random numbers and doing basic aggregations) to test out DuckDB and Polars.

In this blog, we’ll demonstrate how Ibis provides a single Python dataframe API to take the billion row challenge with DuckDB, Polars, and DataFusion.

Setup

We need to generate the data from the challenge. First, clone the repo:

gh repo clone gunnarmorling/1brc

Then change into the Python directory and run the generation script with the number of rows you want to generate:

cd 1brc/src/main/python
python create_measurements.py 1_000_000_000

This will generate a file called measurements.txt in the data directory at the root of the repo. It is 15GB on disk:

(venv) cody@voda 1brc % du 1brc/data/*
 15G    1brc/data/measurements.txt
808K    1brc/data/weather_stations.csv

And consists of one billion rows with two columns separated by a semicolon:

(venv) cody@voda 1brc % head 1brc/data/measurements.txt
Kusugal;-67.2
Ipil;-88.6
Sohna;-31.2
Lubuagan;-2.3
Szentes;29.2
Sylvan Lake;-70.7
Ambato;-35.2
Berkine;97.0
Wernau;73.4
Kennewick;-19.9

Also, you’ll need to install Ibis with the three backends we’ll use:

pip install 'ibis-framework[duckdb,polars,datafusion]'

Understanding Ibis

Ibis provides a standard dataframe API decoupled from the execution engine. It compiles Ibis expressions to a form of intermediary representation (often SQL) that can be executed by different backends.

This allows us to write a single Ibis expression to complete the challenge with many different execution engine backends.

Warning

While Ibis does its best to abstract away the differences between backends, this cannot be done in some areas like data input and output. For example, the read_csv function across various backends (in their SQL and Python forms) have different parameters. We’ll handle that with different kwargs dictionaries for these backends in this post.

In general, besides creating a connection and data input/output, the Ibis API is the same across backends.

Completing the challenge thrice

We’ll use three great options for local backends – DuckDB, Polars, and DataFusion – to complete the challenge.

Setup

Before we get started, we’ll make some imports, turn on interactive mode, and define the kwargs dictionary for the backends corresponding to their read_csv function:

import ibis
import polars as pl
import pyarrow as pa

ibis.options.interactive = True

duckdb_kwargs = {
    "delim": ";",
    "header": False,
    "columns": {"station": "VARCHAR", "temperature": "DOUBLE"},
}

polars_kwargs = {
    "separator": ";",
    "has_header": False,
    "new_columns": ["station", "temperature"],
    "schema": {"station": pl.Utf8, "temperature": pl.Float64},
}

datafusion_kwargs = {
    "delimiter": ";",
    "has_header": False,
    "schema": pa.schema(
        [
            (
                "station",
                pa.string(),
            ),
            (
                "temperature",
                pa.float64(),
            ),
        ]
    ),
    "file_extension": ".txt",
}

Let’s define a function to run the same code with each backend to complete the challenge:

def run_challenge(t):
    res = (
        t.group_by(ibis._.station)
        .agg(
            min_temp=ibis._.temperature.min(),
            mean_temp=ibis._.temperature.mean(),
            max_temp=ibis._.temperature.max(),
        )
        .order_by(ibis._.station.desc())
    )
    return res

Completing the challenge

Let’s complete the challenge with each backend.

Note

The results are the same across backends but look suspicious. It is noted in the repository that the Python generation code is “unofficial”, so may have some problems. Given this is a contrived example of generated data, I’m not going to worry about it.

The point is that we can easily complete the challenge with the same code across many backends, letting them worry about the details of execution. For this reason, I’m also not providing execution times. Try it out yourself!

First let’s set the backend to DuckDB (redundantly since it’s the default) and the kwargs dictionary:

ibis.set_backend("duckdb")
kwargs = duckdb_kwargs
1
Redundant given DuckDB is the default

Next, we’ll read in the data and take a look at the table:

t = ibis.read_csv("1brc/data/measurements.txt", **kwargs)
t.limit(3)
┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ station      temperature ┃
┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ stringfloat64     │
├─────────────┼─────────────┤
│ Lívingston -21.0 │
│ Annūr      -33.4 │
│ Beni Douala16.5 │
└─────────────┴─────────────┘

Then let’s confirm it’s a billion rows:

f"{t.count().to_pandas():,}"
'1,000,000,000'

Finally, we’ll compute the min, mean, and max temperature per weather station:

res = run_challenge(t)
res
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━┓
┃ station         min_temp  mean_temp  max_temp ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━┩
│ stringfloat64float64float64  │
├────────────────┼──────────┼───────────┼──────────┤
│ ’s-Gravendeel -99.90.11218899.9 │
│ ’Aïn el Hammam-99.9-0.22528999.9 │
│ ’Aïn Roua     -99.9-0.19824199.9 │
│ ‘Ibrī         -99.90.00949999.9 │
│ ‘Ayn al ‘Arab -99.90.12473099.9 │
│ ‘Akko         -99.9-0.08718499.9 │
│ ‘Afrīn        -99.9-0.01332299.9 │
│ Ấp Tân Ngãi   -99.90.34408999.9 │
│ Ẕefat         -99.90.01776799.9 │
│ Ḩīsh          -99.90.01880499.9 │
│  │
└────────────────┴──────────┴───────────┴──────────┘

First let’s set the backend to Polars and the kwargs dictionary:

ibis.set_backend("polars")
kwargs = polars_kwargs
1
Set Polars as the default backend used

Next, we’ll read in the data and take a look at the table:

t = ibis.read_csv("1brc/data/measurements.txt", **kwargs)
t.limit(3)
┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ station      temperature ┃
┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ stringfloat64     │
├─────────────┼─────────────┤
│ Lívingston -21.0 │
│ Annūr      -33.4 │
│ Beni Douala16.5 │
└─────────────┴─────────────┘

Then let’s confirm it’s a billion rows:

f"{t.count().to_pandas():,}"
'1,000,000,000'

Finally, we’ll compute the min, mean, and max temperature per weather station:

res = run_challenge(t)
res
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━┓
┃ station         min_temp  mean_temp  max_temp ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━┩
│ stringfloat64float64float64  │
├────────────────┼──────────┼───────────┼──────────┤
│ ’s-Gravendeel -99.90.11218899.9 │
│ ’Aïn el Hammam-99.9-0.22528999.9 │
│ ’Aïn Roua     -99.9-0.19824199.9 │
│ ‘Ibrī         -99.90.00949999.9 │
│ ‘Ayn al ‘Arab -99.90.12473099.9 │
│ ‘Akko         -99.9-0.08718499.9 │
│ ‘Afrīn        -99.9-0.01332299.9 │
│ Ấp Tân Ngãi   -99.90.34408999.9 │
│ Ẕefat         -99.90.01776799.9 │
│ Ḩīsh          -99.90.01880499.9 │
│  │
└────────────────┴──────────┴───────────┴──────────┘

First let’s set the backend to DataFusion and the kwargs dictionary:

ibis.set_backend("datafusion")
kwargs = datafusion_kwargs
1
Set DataFusion as the default backend used

Next, we’ll read in the data and take a look at the table:

t = ibis.read_csv("1brc/data/measurements.txt", **kwargs)
t.limit(3)
┏━━━━━━━━━━━━━┳━━━━━━━━━━━━━┓
┃ station      temperature ┃
┡━━━━━━━━━━━━━╇━━━━━━━━━━━━━┩
│ stringfloat64     │
├─────────────┼─────────────┤
│ Lívingston -21.0 │
│ Annūr      -33.4 │
│ Beni Douala16.5 │
└─────────────┴─────────────┘

Then let’s confirm it’s a billion rows:

f"{t.count().to_pandas():,}"
'1,000,000,000'

Finally, we’ll compute the min, mean, and max temperature per weather station:

res = run_challenge(t)
res
┏━━━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━┓
┃ station         min_temp  mean_temp  max_temp ┃
┡━━━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━┩
│ stringfloat64float64float64  │
├────────────────┼──────────┼───────────┼──────────┤
│ ’s-Gravendeel -99.90.11218899.9 │
│ ’Aïn el Hammam-99.9-0.22528999.9 │
│ ’Aïn Roua     -99.9-0.19824199.9 │
│ ‘Ibrī         -99.90.00949999.9 │
│ ‘Ayn al ‘Arab -99.90.12473099.9 │
│ ‘Akko         -99.9-0.08718499.9 │
│ ‘Afrīn        -99.9-0.01332299.9 │
│ Ấp Tân Ngãi   -99.90.34408999.9 │
│ Ẕefat         -99.90.01776799.9 │
│ Ḩīsh          -99.90.01880499.9 │
│  │
└────────────────┴──────────┴───────────┴──────────┘

Conclusion

While the one billion row challenge isn’t a great benchmark, it’s a fun way to demonstrate how Ibis provides a single Python dataframe API to take the billion row challenge with DuckDB, Polars, and DataFusion. Feel free to try it out with other backends!

Happy coding!

Bonus: more billion row data generation

While we’re here, I’ll share the code I’ve used in the past to generate a billion rows of random data:

import ibis

con = ibis.connect("duckdb://data.ddb")

ROWS = 1_000_000_000

sql_str = ""
sql_str += "select\n"
for c in list(map(chr, range(ord("a"), ord("z") + 1))):
    sql_str += f"  random() as {c},\n"
sql_str += f"from generate_series(1, {ROWS})"

t = con.sql(sql_str)
con.create_table("billion", t, overwrite=True)

Nowadays I’d convert that to an Ibis expression:

Note

This is a slightly different result with a monotonic index column, but I prefer it anyway. You could drop that column or adjust the expression.

import ibis

con = ibis.connect("duckdb://data.ddb")

ROWS = 1_000_000_000

t = (
    ibis.range(ROWS)
    .unnest()
    .name("index")
    .as_table()
    .mutate(**{c: ibis.random() for c in list(map(chr, range(ord("a"), ord("z") + 1)))})
)
con.create_table("billion", t, overwrite=True)

But if you do need to construct a programmatic SQL string, it’s cool that you can!

Back to top