Ibis versus X: Performance across the ecosystem part 1

blog
case study
ecosystem
performance
Author

Phillip Cloud

Published

December 6, 2023

TL; DR: Ibis has a lot of great backends. They’re all good at different things. For working with local data, it’s hard to beat DuckDB on feature set and performance.

Buckle up, it’s going to be a long one.

Motivation

Ibis maintainer Gil Forsyth recently wrote a post on our blog replicating another blog post but using Ibis instead of raw SQL.

I thought it would be interesting to see how other tools compare to this setup, so I decided I’d try to do the same workflow on the same machine using a few tools from across the ecosystem.

I chose two incumbents–pandas and dask–to see how they compare to Ibis + DuckDB on this workload. In part 2 of this series I will compare two newer engines–Polars and DataFusion–to Ibis + DuckDB.

I’ve worked on both pandas and Dask in the past but it’s been such a long time since I’ve used these tools for data analysis that I consider myself rather naive about how to best use them today.

Initially I was interested in API comparisons since usability is really where Ibis shines, but as I started to explore things, I was unable to complete my analysis in some cases due to running out of memory.

This is not a forum to trash the work of others.

I’m not interested in tearing down other tools.

Ibis has backends for each of these tools and it’s in everyone’s best interest that all of the tools discussed here work to their full potential.

I show each tool using its native API, in an attempt to compare ease-of-use out of the box and maximize each library’s ability to complete the workload.

Let’s dig in.

Setup

I ran all of the code in this blog post on a machine with these specs.

All OS caches were cleared before running this document with

$ sudo sysctl -w vm.drop_caches=3
Clearing operating system caches does not represent a realistic usage scenario

It is a method for putting the tools here on more equal footing. When you’re in the thick of an analysis you’re not going to artificially limit any OS optimizations.

Component Specification
CPU AMD EPYC 7B12 (64 threads)
RAM 94 GiB
Disk 1.5 TiB SSD
OS NixOS (Linux 6.1.69)

Soft constraints

I’ll introduce some soft UX constraints on the problem, that I think help convey the perspective of someone who wants to get started quickly with a data set:

  1. I don’t want to get another computer to run this workload.
  2. I want to use the data as is, that is, without altering the files I already have.
  3. I’d like to run this computation with the default configuration. Ideally configuration isn’t required to complete this workload out of the box.

Library versions

Here are the versions I used to run this experiment at the time of writing.

Dependency Version
Python 3.10.13 (main, Aug 24 2023, 12:59:26) [GCC 12.3.0]
dask 2023.12.1
distributed 2023.12.1
duckdb 0.9.2
ibis 1a5a42081
pandas 2.1.4
pyarrow 14.0.2

Data

I used the files here in this link to run my experiment.

Here’s a summary of the data set’s file sizes:

$ du -h /data/pypi-parquet/*.parquet
1.8G    /data/pypi-parquet/index-12.parquet
1.7G    /data/pypi-parquet/index-10.parquet
1.9G    /data/pypi-parquet/index-2.parquet
1.9G    /data/pypi-parquet/index-0.parquet
1.8G    /data/pypi-parquet/index-5.parquet
1.7G    /data/pypi-parquet/index-13.parquet
1.7G    /data/pypi-parquet/index-9.parquet
1.8G    /data/pypi-parquet/index-6.parquet
1.7G    /data/pypi-parquet/index-7.parquet
1.7G    /data/pypi-parquet/index-8.parquet
800M    /data/pypi-parquet/index-14.parquet
1.8G    /data/pypi-parquet/index-4.parquet
1.8G    /data/pypi-parquet/index-11.parquet
1.9G    /data/pypi-parquet/index-3.parquet
1.9G    /data/pypi-parquet/index-1.parquet

Recapping the original Ibis post

Check out the original blog post if you haven’t already!

Here’s the Ibis + DuckDB code, along with a timed execution of the query:

from __future__ import annotations

import ibis
from ibis import _, udf


@udf.scalar.builtin
def flatten(x: list[list[str]]) -> list[str]:
    ...


expr = (
    ibis.read_parquet("/data/pypi-parquet/*.parquet")
    .filter(
        [
            _.path.re_search(
                r"\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"
            ),
            ~_.path.re_search(r"(^|/)test(|s|ing)"),
            ~_.path.contains("/site-packages/"),
        ]
    )
    .group_by(
        month=_.uploaded_on.truncate("M"),
        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1),
    )
    .aggregate(projects=_.project_name.collect().unique())
    .order_by(_.month.desc())
    .mutate(
        ext=_.ext.re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
        .re_replace("^f.*$", "Fortran")
        .replace("rs", "Rust")
        .replace("go", "Go")
        .replace("asm", "Assembly")
        .nullif(""),
    )
    .group_by(["month", "ext"])
    .aggregate(project_count=flatten(_.projects.collect()).unique().length())
    .dropna("ext")
    .order_by([_.month.desc(), _.project_count.desc()])
)
1
We’ve since implemented a flatten method on array expressions so it’s no longer necessary to define a UDF here. I’ll leave this code unchanged for this post. This has no effect on the performance of the query. In both cases the generated code contains a DuckDB-native call to its flatten function.
2
This is a small change from the original query that adds a final sort key to make the results deterministic.
%time df = expr.to_pandas()
df
CPU times: user 19min 17s, sys: 1min 9s, total: 20min 26s
Wall time: 27 s
month ext project_count
0 2023-11-01 C/C++ 836
1 2023-11-01 Rust 190
2 2023-11-01 Fortran 48
3 2023-11-01 Go 33
4 2023-11-01 Assembly 10
... ... ... ...
794 2005-08-01 C/C++ 7
795 2005-07-01 C/C++ 4
796 2005-05-01 C/C++ 1
797 2005-04-01 C/C++ 1
798 2005-03-01 C/C++ 1

799 rows × 3 columns

Let’s show peak memory usage in GB as reported by the resource module:

import resource

rss_kb = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
rss_mb = rss_kb / 1e3
rss_gb = rss_mb / 1e3

print(round(rss_gb, 1), "GB")
8.6 GB

Pandas

Let’s try to replicate this workflow using pandas.

I started with this code:

import pandas as pd

df = pd.read_parquet("/data/pypi-parquet/*.parquet")
FileNotFoundError: [Errno 2] No such file or directory: '/data/pypi-parquet/*.parquet'

Looks like pandas doesn’t support globs. That’s fine, we can use the builtin glob module.

import glob

df = pd.read_parquet(glob.glob("/data/pypi-parquet/*.parquet"))

This eventually triggers the Linux OOM killer after some minutes, so I can’t run the code.

Let’s try again with just a single file. I’ll pick the smallest file, to avoid any potential issues with memory and give pandas the best possible shot.

import os

smallest_file = min(glob.glob("/data/pypi-parquet/*.parquet"), key=os.path.getsize)

The smallest file is 799 MiB on disk.

%time df = pd.read_parquet(smallest_file)
df
CPU times: user 25.6 s, sys: 12.6 s, total: 38.1 s
Wall time: 26.6 s
project_name project_version project_release uploaded_on path archive_path size hash skip_reason lines repository
0 zyte-spider-templates 0.1.0 zyte_spider_templates-0.1.0-py3-none-any.whl 2023-10-26 07:29:49.894 packages/zyte-spider-templates/zyte_spider_tem... zyte_spider_templates/spiders/ecommerce.py 5748 b'\xe0\xa6\x9bd\xc0+\xe0\xf8$J2\xb3\xf8\x8c\x9... 160 237
1 zyte-spider-templates 0.1.0 zyte_spider_templates-0.1.0-py3-none-any.whl 2023-10-26 07:29:49.894 packages/zyte-spider-templates/zyte_spider_tem... zyte_spider_templates/spiders/base.py 4160 b'\x1ck\xd46={\x7f`\xbe\xfaIg*&\x977T\xdb\x8fJ' 122 237
2 zyte-spider-templates 0.1.0 zyte_spider_templates-0.1.0-py3-none-any.whl 2023-10-26 07:29:49.894 packages/zyte-spider-templates/zyte_spider_tem... zyte_spider_templates/spiders/__init__.py 0 b'\xe6\x9d\xe2\x9b\xb2\xd1\xd6CK\x8b)\xaewZ\xd... empty 0 237
3 zyte-spider-templates 0.1.0 zyte_spider_templates-0.1.0-py3-none-any.whl 2023-10-26 07:29:49.894 packages/zyte-spider-templates/zyte_spider_tem... zyte_spider_templates/page_objects/product_nav... 3528 b'\xcd\xc9\xfc[\xda\xcf!\x94\x1b\x92\xffbJC\xf... 106 237
4 zyte-spider-templates 0.1.0 zyte_spider_templates-0.1.0-py3-none-any.whl 2023-10-26 07:29:49.894 packages/zyte-spider-templates/zyte_spider_tem... zyte_spider_templates/page_objects/__init__.py 75 b'r\xb9\xc1\xcf2\xa7\xdc?\xd1\xa8\xfcc+`\xf3\x... 1 237
... ... ... ... ... ... ... ... ... ... ... ...
35468828 1AH22CS174 1.81.11 1AH22CS174-1.81.11.tar.gz 2023-11-19 13:30:00.113 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 1AH22CS174-1.81.11/1AH22CS174.egg-info/top_lev... 1 b"\x8b\x13x\x91y\x1f\xe9i'\xadx\xe6K\n\xad{\xd... 1 242
35468829 1AH22CS174 1.81.11 1AH22CS174-1.81.11.tar.gz 2023-11-19 13:30:00.113 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 1AH22CS174-1.81.11/1AH22CS174.egg-info/require... 16 b"qG\xad\xc3:.'q\xde\xaa\xac\x91\x89\xf7S\xcb\... 2 242
35468830 1AH22CS174 1.81.11 1AH22CS174-1.81.11.tar.gz 2023-11-19 13:30:00.113 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 1AH22CS174-1.81.11/1AH22CS174.egg-info/depende... 1 b"\x8b\x13x\x91y\x1f\xe9i'\xadx\xe6K\n\xad{\xd... 1 242
35468831 1AH22CS174 1.81.11 1AH22CS174-1.81.11.tar.gz 2023-11-19 13:30:00.113 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 1AH22CS174-1.81.11/1AH22CS174.egg-info/SOURCES... 187 b'\xa2O$4|X\x15,\xb0\x9a\x07\xe6\x81[\x15\x1f|... 7 242
35468832 1AH22CS174 1.81.11 1AH22CS174-1.81.11.tar.gz 2023-11-19 13:30:00.113 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 1AH22CS174-1.81.11/1AH22CS174.egg-info/PKG-INFO 509 b'\xee\xbe\xbaoh*\xacA\xb0\x8a}\xb5\x00\xcbpz\... 16 242

35468833 rows × 11 columns

Loading the smallest file from the dataset is already pretty close to the time it took Ibis and DuckDB to execute the entire query.

Let’s give pandas a leg up and tell it what columns to use to avoid reading in a bunch of data we’re not going to use.

We can determine what these columns are by inspecting the Ibis code above.

columns = ["path", "uploaded_on", "project_name"]

%time df = pd.read_parquet(smallest_file, columns=columns)
df
CPU times: user 13.3 s, sys: 7.21 s, total: 20.5 s
Wall time: 16.2 s
path uploaded_on project_name
0 packages/zyte-spider-templates/zyte_spider_tem... 2023-10-26 07:29:49.894 zyte-spider-templates
1 packages/zyte-spider-templates/zyte_spider_tem... 2023-10-26 07:29:49.894 zyte-spider-templates
2 packages/zyte-spider-templates/zyte_spider_tem... 2023-10-26 07:29:49.894 zyte-spider-templates
3 packages/zyte-spider-templates/zyte_spider_tem... 2023-10-26 07:29:49.894 zyte-spider-templates
4 packages/zyte-spider-templates/zyte_spider_tem... 2023-10-26 07:29:49.894 zyte-spider-templates
... ... ... ...
35468828 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 2023-11-19 13:30:00.113 1AH22CS174
35468829 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 2023-11-19 13:30:00.113 1AH22CS174
35468830 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 2023-11-19 13:30:00.113 1AH22CS174
35468831 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 2023-11-19 13:30:00.113 1AH22CS174
35468832 packages/1AH22CS174/1AH22CS174-1.81.11.tar.gz/... 2023-11-19 13:30:00.113 1AH22CS174

35468833 rows × 3 columns

Sweet, read times improved!

Let’s peek at the memory usage of the DataFrame.

print(round(df.memory_usage(deep=True).sum() / (1 << 30), 1), "GiB")
8.7 GiB

I still have plenty of space to do my analysis, nice!

First, filter the data:

%%time
df = df[
    (
        df.path.str.contains(r"\.(?:asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$")
        & ~df.path.str.contains(r"(?:^|/)test(?:|s|ing)|/site-packages/")
    )
]
df
1
I altered the original query here to avoid creating an unnecessary intermediate Series object.
CPU times: user 2min 22s, sys: 288 ms, total: 2min 23s
Wall time: 2min 23s
path uploaded_on project_name
1462 packages/zipline-tej/zipline_tej-0.0.50-cp38-c... 2023-10-27 02:23:07.153 zipline-tej
1470 packages/zipline-tej/zipline_tej-0.0.50-cp38-c... 2023-10-27 02:23:07.153 zipline-tej
1477 packages/zipline-tej/zipline_tej-0.0.50-cp38-c... 2023-10-27 02:23:07.153 zipline-tej
1481 packages/zipline-tej/zipline_tej-0.0.50-cp38-c... 2023-10-27 02:23:07.153 zipline-tej
1485 packages/zipline-tej/zipline_tej-0.0.50-cp38-c... 2023-10-27 02:23:07.153 zipline-tej
... ... ... ...
35460320 packages/atomicshop/atomicshop-2.5.12-py3-none... 2023-11-19 14:29:22.109 atomicshop
35460515 packages/atomicshop/atomicshop-2.5.11-py3-none... 2023-11-19 11:58:09.589 atomicshop
35460710 packages/atomicshop/atomicshop-2.5.10-py3-none... 2023-11-19 11:48:16.980 atomicshop
35463761 packages/ai-flow-nightly/ai_flow_nightly-2023.... 2023-11-19 16:06:36.819 ai-flow-nightly
35464036 packages/ai-flow-nightly/ai_flow_nightly-2023.... 2023-11-19 16:06:33.327 ai-flow-nightly

7166291 rows × 3 columns

We’ve blown way past our Ibis + DuckDB latency budget.

Let’s keep going!

Next, group by and aggregate:

%%time
df = (
    df.groupby(
        [
            df.uploaded_on.dt.floor("M").rename("month"),
            df.path.str.extract(r"\.([a-z0-9]+)$", 0, expand=False).rename("ext"),
        ]
    )
    .agg({"project_name": lambda s: list(set(s))})
    .sort_index(level="month", ascending=False)
)
df
ValueError: <MonthEnd> is a non-fixed frequency

Here we hit the first API issue going back to an old pandas issue: we can’t truncate a timestamp column to month frequency.

Let’s try the solution recommended in that issue.

%%time
df = (
    df.groupby(
        [
            df.uploaded_on.dt.to_period("M").dt.to_timestamp().rename("month"),
            df.path.str.extract(r"\.([a-z0-9]+)$", 0, expand=False).rename("ext"),
        ]
    )
    .agg({"project_name": lambda s: list(set(s))})
    .rename(columns={"project_name": "projects"})
    .sort_index(level="month", ascending=False)
)
df
CPU times: user 8.48 s, sys: 189 ms, total: 8.67 s
Wall time: 8.67 s
projects
month ext
2023-11-01 rs [ast-grep-pyo3, dora-rs, qiskit-terra, kuzu, p...
hpp [mqt.ddsim, kuzu, tensorflow-cpu-aws, pycuda, ...
h [mqt.ddsim, cmeel-gmp, aesim.simba, nx-work-lo...
go [jina, ai-flow-nightly, tdewolff-minify, aws-i...
for [iricore]
f95 [scikit-digital-health, dioptas, easychem, PyG...
f90 [Clawpack, dropkick, sisl, c4p, iricore, fmodp...
f03 [mkl-include]
f [Clawpack, gnssrefl, mqt.ddsim, scikit-build-c...
cxx [mqt.ddsim, SimpleITK, ndicapi, cmeel-eigen, a...
cpp [mqt.ddsim, petljapub, perun-toolsuite, epipol...
cc [mqt.ddsim, stim, pytensor, tensorflow-cpu-aws...
c [mqt.ddsim, aesim.simba, albopictus, odoo13-ad...
asm [mqt.ddsim, pwntools, grpcio, cmeel-assimp, hr...
2023-10-01 rs [summa-embed, sqloxide, qiskit-terra, kuzu, st...
hpp [pydjinni, kuzu, tensorflow-cpu-aws, greenlet,...
h [greenlet, aesim.simba, confluent-kafka, Analy...
go [odoo14-addon-dms, micro-editor, jina, ai-flow...
f90 [petsc, macauff, wiptools, pypestutils, petsc4...
f [gnssrefl, LightSim2Grid, odoo14-addon-dms, od...
cxx [petsc, cars, solarwinds-apm, chalc, wxPython-...
cpp [python-ucto, akida, pydjinni, pylcs, ParmEd, ...
cc [duckdb, codecov-cli, odoo14-addon-dms, habana...
c [odoo14-addon-dms, kuzu, tensorflow-cpu-aws, c...
asm [chipsec, apache-flink-libraries, fibers-ddtes...

Sort the values, add a new column and do the final aggregation:

%%time
df = (
    df.reset_index()
    .assign(
        ext=lambda t: t.ext.str.replace(r"cxx|cpp|cc|c|hpp|h", "C/C++", regex=True)
        .str.replace("^f.*$", "Fortran", regex=True)
        .str.replace("rs", "Rust")
        .str.replace("go", "Go")
        .str.replace("asm", "Assembly")
        .replace("", None)
    )
    .groupby(["month", "ext"])
    .agg({"projects": lambda s: len(set(sum(s, [])))})
)
df
CPU times: user 5.36 ms, sys: 25 µs, total: 5.38 ms
Wall time: 5.28 ms
projects
month ext
2023-10-01 Assembly 14
C/C++ 484
Fortran 23
Go 25
Rust 99
2023-11-01 Assembly 10
C/C++ 836
Fortran 48
Go 33
Rust 190

Remember, all of the previous code is executing on a single file and still takes minutes to run.

Conclusion

If I only have pandas at my disposal, I’m unsure of how I can avoid getting a bigger computer to run this query over the entire data set.

Rewriting the query to be fair

At this point I wondered whether this was a fair query to run with pandas.

After all, the downsides of pandas’ use of object arrays to hold nested data structures like lists are well-known.

The original query uses a lot of nested array types, which are very performant in DuckDB, but in this case we’re throwing away all of our arrays and we don’t need to use them.

Additionally, I’m using lambda functions instead of taking advantage of pandas’ fast built-in methods like count, nunique and others.

Let’s see if we can alter the original query to give pandas a leg up.

A story of two GROUP BYs

Here’s the first Ibis expression:

from __future__ import annotations

import ibis
from ibis import _, udf


@udf.scalar.builtin
def flatten(x: list[list[str]]) -> list[str]:
    ...


expr = (
    ibis.read_parquet("/data/pypi-parquet/*.parquet")
    .filter(
        [
            _.path.re_search(
                r"\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"
            ),
            ~_.path.re_search(r"(^|/)test(|s|ing)"),
            ~_.path.contains("/site-packages/"),
        ]
    )
    .group_by(
        month=_.uploaded_on.truncate("M"),
        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1),
    )
    .aggregate(projects=_.project_name.collect().unique())
    .order_by(_.month.desc())
    .mutate(
        ext=_.ext.re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
        .re_replace("^f.*$", "Fortran")
        .replace("rs", "Rust")
        .replace("go", "Go")
        .replace("asm", "Assembly")
        .nullif(""),
    )
    .group_by(["month", "ext"])
    .aggregate(project_count=flatten(_.projects.collect()).unique().length())
    .dropna("ext")
    .order_by([_.month.desc(), _.project_count.desc()])
)

It looks like we can remove the double group_by by moving the second mutate expression directly into the first group_by call.

Applying these changes:

--- step0.py    2023-12-12 05:20:01.712513949 -0500
+++ step1.py    2023-12-12 05:20:01.712513949 -0500
@@ -5,7 +5,7 @@
 
 
 @udf.scalar.builtin
-def flatten(x: list[list[str]]) -> list[str]:
+def flatten(x: list[list[str]]) -> list[str]:
     ...
 
 
@@ -22,20 +22,16 @@
     )
     .group_by(
         month=_.uploaded_on.truncate("M"),
-        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1),
-    )
-    .aggregate(projects=_.project_name.collect().unique())
-    .order_by(_.month.desc())
-    .mutate(
-        ext=_.ext.re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
+        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1)
+        .re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
         .re_replace("^f.*$", "Fortran")
         .replace("rs", "Rust")
         .replace("go", "Go")
         .replace("asm", "Assembly")
         .nullif(""),
     )
+    .aggregate(projects=_.project_name.collect().unique())
+    .order_by(_.month.desc())
     .group_by(["month", "ext"])
     .aggregate(project_count=flatten(_.projects.collect()).unique().length())
-    .dropna("ext")
-    .order_by([_.month.desc(), _.project_count.desc()])
 )

We get:

from __future__ import annotations

import ibis
from ibis import _, udf


@udf.scalar.builtin
def flatten(x: list[list[str]]) -> list[str]:
    ...


expr = (
    ibis.read_parquet("/data/pypi-parquet/*.parquet")
    .filter(
        [
            _.path.re_search(
                r"\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"
            ),
            ~_.path.re_search(r"(^|/)test(|s|ing)"),
            ~_.path.contains("/site-packages/"),
        ]
    )
    .group_by(
        month=_.uploaded_on.truncate("M"),
        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1)
        .re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
        .re_replace("^f.*$", "Fortran")
        .replace("rs", "Rust")
        .replace("go", "Go")
        .replace("asm", "Assembly")
        .nullif(""),
    )
    .aggregate(projects=_.project_name.collect().unique())
    .order_by(_.month.desc())
    .group_by(["month", "ext"])
    .aggregate(project_count=flatten(_.projects.collect()).unique().length())
)

Don’t sort unnecessarily

Notice this order_by call just before a group_by call. Ordering before grouping is somewhat useless here; we should probably sort after we’ve reduced our data. Let’s stick the ordering at the end of the query.

Applying these changes:

--- step1.py    2023-12-12 05:20:01.712513949 -0500
+++ step2.py    2023-12-12 05:20:01.712513949 -0500
@@ -31,7 +31,7 @@
         .nullif(""),
     )
     .aggregate(projects=_.project_name.collect().unique())
-    .order_by(_.month.desc())
     .group_by(["month", "ext"])
     .aggregate(project_count=flatten(_.projects.collect()).unique().length())
+    .order_by(_.month.desc())
 )

We get:

from __future__ import annotations

import ibis
from ibis import _, udf


@udf.scalar.builtin
def flatten(x: list[list[str]]) -> list[str]:
    ...


expr = (
    ibis.read_parquet("/data/pypi-parquet/*.parquet")
    .filter(
        [
            _.path.re_search(
                r"\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"
            ),
            ~_.path.re_search(r"(^|/)test(|s|ing)"),
            ~_.path.contains("/site-packages/"),
        ]
    )
    .group_by(
        month=_.uploaded_on.truncate("M"),
        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1)
        .re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
        .re_replace("^f.*$", "Fortran")
        .replace("rs", "Rust")
        .replace("go", "Go")
        .replace("asm", "Assembly")
        .nullif(""),
    )
    .aggregate(projects=_.project_name.collect().unique())
    .group_by(["month", "ext"])
    .aggregate(project_count=flatten(_.projects.collect()).unique().length())
    .order_by(_.month.desc())
)

Don’t repeat yourself

Notice that we are now:

  • grouping
  • aggregating
  • grouping again by the same keys
  • aggregating

This is less optimal than it could be. We are also flattening an array, computing its distinct values and then computing its length.

We are computing the grouped number of distinct values, and we likely don’t need to collect values into an array to do that.

Let’s try using a COUNT(DISTINCT ...) query instead, to avoid wasting cycles collecting arrays.

We’ll remove the second group by and then call nunique() to get the final query.

Applying these changes:

--- step2.py    2023-12-12 05:20:01.712513949 -0500
+++ step3.py    2023-12-12 05:20:01.712513949 -0500
@@ -1,13 +1,7 @@
 from __future__ import annotations
 
 import ibis
-from ibis import _, udf
-
-
-@udf.scalar.builtin
-def flatten(x: list[list[str]]) -> list[str]:
-    ...
-
+from ibis import _
 
 expr = (
     ibis.read_parquet("/data/pypi-parquet/*.parquet")
@@ -30,8 +24,7 @@
         .replace("asm", "Assembly")
         .nullif(""),
     )
-    .aggregate(projects=_.project_name.collect().unique())
-    .group_by(["month", "ext"])
-    .aggregate(project_count=flatten(_.projects.collect()).unique().length())
-    .order_by(_.month.desc())
+    .aggregate(project_count=_.project_name.nunique())
+    .dropna("ext")
+    .order_by([_.month.desc(), _.project_count.desc()])
 )

We get:

from __future__ import annotations

import ibis
from ibis import _

expr = (
    ibis.read_parquet("/data/pypi-parquet/*.parquet")
    .filter(
        [
            _.path.re_search(
                r"\.(asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"
            ),
            ~_.path.re_search(r"(^|/)test(|s|ing)"),
            ~_.path.contains("/site-packages/"),
        ]
    )
    .group_by(
        month=_.uploaded_on.truncate("M"),
        ext=_.path.re_extract(r"\.([a-z0-9]+)$", 1)
        .re_replace(r"cxx|cpp|cc|c|hpp|h", "C/C++")
        .re_replace("^f.*$", "Fortran")
        .replace("rs", "Rust")
        .replace("go", "Go")
        .replace("asm", "Assembly")
        .nullif(""),
    )
    .aggregate(project_count=_.project_name.nunique())
    .dropna("ext")
    .order_by([_.month.desc(), _.project_count.desc()])
)
1
I added a second sort key (project_count) for deterministic output.

Let’s run it to make sure the results are as expected:

duckdb_results = %timeit -n1 -r1 -o expr.to_pandas()
29.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

It looks like the new query might be a bit slower even though we’re ostensibly doing less computation. Since we’re still pretty close to the original duration, let’s keep going.

Final pandas run with the new query

Rewriting the pandas code we get:

from __future__ import annotations

import glob
import os

import pandas as pd

df = pd.read_parquet(
    min(glob.glob("/data/pypi-parquet/*.parquet"), key=os.path.getsize),
    columns=["path", "uploaded_on", "project_name"],
)
df = df[
    df.path.str.contains(r"\.(?:asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$")
    & ~df.path.str.contains(r"(?:(?:^|/)test(?:|s|ing)|/site-packages/)")
]
print(
    df.assign(
        month=df.uploaded_on.dt.to_period("M").dt.to_timestamp(),
        ext=df.path.str.extract(r"\.([a-z0-9]+)$", 0)
        .iloc[:, 0]
        .str.replace(r"cxx|cpp|cc|c|hpp|h", "C/C++", regex=True)
        .str.replace("^f.*$", "Fortran", regex=True)
        .str.replace("rs", "Rust")
        .str.replace("go", "Go")
        .str.replace("asm", "Assembly"),
    )
    .groupby(["month", "ext"])
    .project_name.nunique()
    .rename("project_count")
    .reset_index()
    .sort_values(["month", "project_count"], ascending=False)
)

Running it we get:

pandas_results = %timeit -n1 -r1 -o %run pandas_impl.py
       month       ext  project_count
6 2023-11-01     C/C++            836
9 2023-11-01      Rust            190
7 2023-11-01   Fortran             48
8 2023-11-01        Go             33
5 2023-11-01  Assembly             10
1 2023-10-01     C/C++            484
4 2023-10-01      Rust             99
3 2023-10-01        Go             25
2 2023-10-01   Fortran             23
0 2023-10-01  Assembly             14
2min 58s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)
Remember, this is the time it took pandas to run the query for a single file.

DuckDB runs the query over the entire dataset about 4x faster than that!

Let’s try a tool that nominally scales to our problem: Dask.

Dask

One really nice component of Dask is dask.dataframe.

Dask DataFrame implements a good chunk of the pandas API and can be a drop-in replacement for pandas.

I am happy that this turned out to be the case here.

My first attempt was somewhat naive and was effectively a one line change from import pandas as pd to import dask.dataframe as pd.

This worked and the workload completed. However, after talking to Dask expert and Ibis contributor Naty Clementi she suggested I try a few things:

  • Use the distributed scheduler.
  • Ensure that pyarrow string arrays are used instead of NumPy object arrays. This required no changes to my Dask code because PyArrow strings have been the default since version 2023.7.1, hooray!
  • Explore some of the options to read_parquet. It turned that without setting split_row_groups=True I ran out of memory.

Let’s look at the Dask implementation:

from __future__ import annotations

import logging

import dask.dataframe as dd
from dask.distributed import Client

if __name__ == "__main__":
    client = Client(silence_logs=logging.ERROR)
    df = dd.read_parquet(
        "/data/pypi-parquet/*.parquet",
        columns=["path", "uploaded_on", "project_name"],
        split_row_groups=True,
    )
    df = df[
        df.path.str.contains(
            r"\.(?:asm|c|cc|cpp|cxx|h|hpp|rs|[Ff][0-9]{0,2}(?:or)?|go)$"
        )
        & ~df.path.str.contains(r"(?:^|/)test(?:|s|ing)")
        & ~df.path.str.contains("/site-packages/")
    ]
    print(
        df.assign(
            month=df.uploaded_on.dt.to_period("M").dt.to_timestamp(),
            ext=df.path.str.extract(r"\.([a-z0-9]+)$", 0, expand=False)
            .str.replace(r"cxx|cpp|cc|c|hpp|h", "C/C++", regex=True)
            .str.replace("^f.*$", "Fortran", regex=True)
            .str.replace("rs", "Rust")
            .str.replace("go", "Go")
            .str.replace("asm", "Assembly"),
        )
        .groupby(["month", "ext"])
        .project_name.nunique()
        .rename("project_count")
        .compute()
        .reset_index()
        .sort_values(["month", "project_count"], ascending=False)
    )
    client.shutdown()

Let’s run the code:

dask_results = %timeit -n1 -r1 -o %run dask_impl.py
         month       ext  project_count
794 2023-11-01     C/C++            836
796 2023-11-01      Rust            190
797 2023-11-01   Fortran             48
795 2023-11-01        Go             33
798 2023-11-01  Assembly             10
..         ...       ...            ...
2   2005-08-01     C/C++              7
1   2005-07-01     C/C++              4
83  2005-05-01     C/C++              1
82  2005-04-01     C/C++              1
0   2005-03-01     C/C++              1

[799 rows x 3 columns]
52.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)

That’s a great improvement over pandas: we finished the workload and our running time is pretty close to DuckDB.

Takeaways

Ibis + DuckDB is the only system tested that handles this workload well out of the box

  • Pandas couldn’t handle the workload due to memory constraints.
  • Dask required its recommended distributed scheduler to achieve maximum performance and still used a lot of memory.

Let’s recap the results with some numbers:

Numbers

Toolset Data size Duration Throughput
Ibis + DuckDB 25,825 MiB 30 seconds 863 MiB/s
Dask + Distributed 25,825 MiB 53 seconds 492 MiB/s

With Ibis + DuckDB, I was able to write the query the way I wanted to without running out of memory, using the default configuration provided by Ibis.

I was able run this computation around 193x faster than you can expect with pandas using this hardware setup.

In contrast, pandas ran out of memory on a single file without some hand holding and while Dask didn’t cause my program to run out of memory it still used quite a bit more than DuckDB.

Pandas is untenable for this workload

Pandas requires me to load everything into memory, and my machine doesn’t have enough memory to do that.

Given that Ibis + DuckDB runs this workload on my machine it doesn’t seem worth the effort to write any additional code to make pandas scale to the whole dataset.

Dask finishes in a similar amount of time as Ibis + DuckDB (within 2x)

Out of the box I had quite a bit of difficulty figuring out how to maximize performance and not run out of memory.

Please get in touch if you think my Dask code can be improved!

I know the Dask community is hard at work building dask-expr which might improve the performance of this workload when it lands.

Next steps

Please get in touch!

If you have ideas about how to speed up my use of the tools I’ve discussed here please get in touch by opening a GitHub discussion!

We would love it if more backends handled this workload!

Look out for part 2

In part 2 of this series we’ll explore how Polars and DataFusion perform on this query. Stay tuned!

Back to top