Ibis @ LinkedIn

Portable Python DataFrames

Chloe He
Phillip Cloud

2024-04-24

Who

Phillip Cloud

  • Principal engineer at Voltron Data
  • Python analytics for 10+ years
  • Open source
  • Tech lead for Ibis

Chloe He

  • Founding engineer at Claypot → Senior engineer at Voltron Data
  • Infrastructure for real-time ML
  • Ibis streaming

What

Ibis is a Python library for:

  • Exploratory data analysis
  • General analytics
  • Data engineering
  • ML preprocessing
  • Library: (e.g., Google BigFrames)
  • Build your own … Ibis (??)

💡 Development to production with few rewrites

Examples

Examples

https://ibis-project.org/tutorials/getting_started

from ibis.interactive import *

penguins = ibis.examples.penguins.fetch()
penguins
┏━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━┓
┃ species  island     bill_length_mm  bill_depth_mm  flipper_length_mm  body_mass_g  sex     year  ┃
┡━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━┩
│ stringstringfloat64float64int64int64stringint64 │
├─────────┼───────────┼────────────────┼───────────────┼───────────────────┼─────────────┼────────┼───────┤
│ Adelie Torgersen39.118.71813750male  2007 │
│ Adelie Torgersen39.517.41863800female2007 │
│ Adelie Torgersen40.318.01953250female2007 │
│ Adelie TorgersenNULLNULLNULLNULLNULL2007 │
│ Adelie Torgersen36.719.31933450female2007 │
│ Adelie Torgersen39.320.61903650male  2007 │
│ Adelie Torgersen38.917.81813625female2007 │
│ Adelie Torgersen39.219.61954675male  2007 │
│ Adelie Torgersen34.118.11933475NULL2007 │
│ Adelie Torgersen42.020.21904250NULL2007 │
│  │
└─────────┴───────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴───────┘
penguins.group_by("species", "island").agg(
    n=penguins.count(),
    avg_bill_mm=penguins.bill_length_mm.mean(),
    med_flipper_mm=penguins.flipper_length_mm.median()
)
┏━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┓
┃ species    island     n      avg_bill_mm  med_flipper_mm ┃
┡━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━┩
│ stringstringint64float64float64        │
├───────────┼───────────┼───────┼─────────────┼────────────────┤
│ Adelie   Dream    5638.501786190.0 │
│ Gentoo   Biscoe   12447.504878216.0 │
│ ChinstrapDream    6848.833824196.0 │
│ Adelie   Torgersen5238.950980191.0 │
│ Adelie   Biscoe   4438.975000189.5 │
└───────────┴───────────┴───────┴─────────────┴────────────────┘
cols = {
    c: penguins[c] - penguins[c].mean()
    for c in penguins.columns
    if penguins[c].type().is_numeric() and c != "year"
}
expr = penguins.group_by("species").mutate(**cols).head(5)
expr
┏━━━━━━━━━┳━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━┓
┃ species  island  bill_length_mm  bill_depth_mm  flipper_length_mm  body_mass_g  sex     year  ┃
┡━━━━━━━━━╇━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━┩
│ stringstringfloat64float64float64float64stringint64 │
├─────────┼────────┼────────────────┼───────────────┼───────────────────┼─────────────┼────────┼───────┤
│ Gentoo Biscoe-1.404878-1.782114-6.186992-576.01626female2007 │
│ Gentoo Biscoe-3.004878-0.282114-3.186992-226.01626female2009 │
│ Gentoo Biscoe1.195122-0.882114-7.186992-626.01626female2007 │
│ Gentoo Biscoe2.4951220.2178860.813008623.98374male  2007 │
│ Gentoo Biscoe0.095122-0.482114-2.186992323.98374male  2007 │
└─────────┴────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴───────┘

Let’s talk about SQL

SQL

cols = {
    c: penguins[c] - penguins[c].mean()
    for c in penguins.columns
    if penguins[c].type().is_numeric() and c != "year"
}
expr = penguins.group_by("species").mutate(**cols).head(5)
ibis.to_sql(expr)
SELECT
  "t0"."species",
  "t0"."island",
  "t0"."bill_length_mm" - AVG("t0"."bill_length_mm") OVER (PARTITION BY "t0"."species" ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "bill_length_mm",
  "t0"."bill_depth_mm" - AVG("t0"."bill_depth_mm") OVER (PARTITION BY "t0"."species" ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "bill_depth_mm",
  "t0"."flipper_length_mm" - AVG("t0"."flipper_length_mm") OVER (PARTITION BY "t0"."species" ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "flipper_length_mm",
  "t0"."body_mass_g" - AVG("t0"."body_mass_g") OVER (PARTITION BY "t0"."species" ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS "body_mass_g",
  "t0"."sex",
  "t0"."year"
FROM "penguins" AS "t0"
LIMIT 5

Back to examples…

ibis-analytics

https://ibis-analytics.streamlit.app

Why?

DataFrame lore

  • DataFrames appear in the S programming language, which evolves into the R calculator programming language.
  • pandas perfects the DataFrame in Python … or did it?
  • Dozens of Python DataFrame libraries appear and disappear…
  • pandas is the de facto standard for Python DataFrames. It still doesn’t scale.
  • Leads to data scientists throwing code over the wall to engineers.
  • What if Ibis were a new standard?

Ibis origins

from Apache Arrow and the “10 Things I Hate About pandas” by Wes McKinney

…in 2015, I started the Ibis project…to create a pandas-friendly deferred expression system for static analysis and compilation [of] these types of [query planned, multicore execution] operations. Since an efficient multithreaded in-memory engine for pandas was not available when I started Ibis, I instead focused on building compilers for SQL engines (Impala, PostgreSQL, SQLite), similar to the R dplyr package. Phillip Cloud from the pandas core team has been actively working on Ibis with me for quite a long time.

Two world problem

What does Ibis solve?

SQL:

  • databases & tables
  • analytics
  • metrics
  • dashboards

Python:

  • files & DataFrames
  • data science
  • statistics
  • notebooks

Ibis bridges the gap.

Bridging the gap

import ibis

con = ibis.duckdb.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

An embeddable, zero-dependency, C++ SQL database engine.

import ibis

con = ibis.datafusion.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A Rust SQL query engine.

import ibis

con = ibis.clickhouse.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A C++ column-oriented database management system.

import ibis

con = ibis.polars.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A Rust DataFrame library.

import ibis

con = ibis.bigquery.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A serverless, highly scalable, and cost-effective cloud data warehouse.

import ibis

con = ibis.snowflake.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A cloud data platform.

import ibis

con = ibis.oracle.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A relational database management system.

import ibis

con = ibis.pyspark.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A unified analytics engine for large-scale data processing.

import ibis

con = ibis.trino.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A distributed SQL query engine.

import ibis

con = ibis.flink.connect()
penguins = con.table("penguins")
penguins.group_by("species", "island").agg(count=ibis._.count())

A distributed streaming and batch SQL analytics engine.

How does Ibis work?

This is going to be very fast 🏃💨. I will happily answer questions about it 😂.

Engineering themes

  • Immutability
  • Type checking
  • Separation of concerns
  • Extensibility
  • Focus on end-user experience
  • Avoid common denominator trap
  • Scale up and down

Components: expressions

Expressions: interface

  • StringScalar, IntegerColumn, Table
  • .sum(), .split(), .join()
  • No knowledge of specific operation

Operations: implementation

  • Specific action: e.g., StringSplit
  • Inputs + output dtype, shape
  • Used for compiler dispatch

Other goodies

  • Type system
  • Pattern matching
  • Graph manipulation/traversal

Goal: separate API from implementation.

Components: expressions

from ibis.expr.visualize import to_graph

expr = penguins.group_by("species").agg(
    avg_bill_mm=_.bill_length_mm.mean()
)

to_graph(expr)

Components: compiler

expr = penguins.group_by("species").agg(
    avg_bill_mm=_.bill_length_mm.mean()
)

graph BT
  classDef white color:white;

  %% graph definition
  DatabaseTable --> species
  DatabaseTable --> bill_length_mm
  bill_length_mm --> Mean
  species --> Aggregate
  Mean --> Aggregate

  %% style
  class DatabaseTable white;
  class species white;
  class bill_length_mm white;
  class Mean white;
  class Aggregate white;

graph BT
  classDef white color:white;

  DatabaseTable2[DatabaseTable] --> species2[species]
  species2 --> bill_length_mm2[bill_length_mm]
  bill_length_mm2 --> Mean2[Mean]
  Mean2 --> Aggregate2[Aggregate]

  %% style
  class DatabaseTable2 white;
  class species2 white;
  class bill_length_mm2 white;
  class Mean2 white;
  class Aggregate2 white;

Components: compiler

  • Rewrite operations
  • Bottom up compile storing intermediate outputs
  • Handoff output to sqlglot
SELECT
  "t0"."species",
  AVG("t0"."bill_length_mm") AS "avg_bill_mm"
FROM "penguins" AS "t0"
GROUP BY
  1

Components: drivers

Drivers

  • We have SQL at this point
  • Send to DB via DBAPI: cursor.execute(ibis_generated_sql)
  • (Heavily) massage the output

Ibis + Streaming

Growth of streaming

  • Over 70% of Fortune 500 companies have adopted Kafka
  • 54% of Databricks’ customers are using Spark Structured Streaming
  • The stream processing market is expected to grow at a compound annual growth rate (CAGR) of 21.5% from 2022 to 2028 (IDC)

Batch and streaming

graph LR
  subgraph " "
    direction LR
    A[data] --> B[batch processing] & C[stream processing] --> D[downstream]
  end

In the machine learning world…

graph TB
  proddata --> sampled
  model --> prodpipeline

  subgraph "local env"
    sampled[sampled data] --> local[local experimentation]
    local <--> iterate
    local --> model[finally, we have a production-ready model!]
  end

  subgraph "prod env"
    proddata[production data] --> prodpipeline[production pipelines]
  end

In the machine learning world…

graph TB
  proddata --> sampled
  model -- "code rewrite" --> prodpipeline

  linkStyle 1 color:white;

  subgraph "local env"
    sampled[sampled data] --> local[local experimentation]
    local <--> iterate
    local --> model[finally, we have a production-ready model!]
  end

  subgraph "prod env"
    proddata[production data] --> prodpipeline[production pipelines]
  end

A real-world example

pandas

return (
    clicks_df
    .groupby(["user"])
    .rolling("1h")
    .size()
)

Flink SQL

SELECT
  user,
  COUNT(url) OVER (
    PARTITION BY user
    ORDER BY click_time
    RANGE BETWEEN
      INTERVAL '1' HOUR PRECEDING
      AND CURRENT ROW
  ) AS one_hour_user_click_cnt
FROM clicks

Code rewrites

  • From batch to streaming
  • From local experimentation to production
  • Backfilling a streaming feature on a batch backing table

The solution…

Stream-batch unified API

  • Flink SQL
  • Spark DataFrame API

Stream-batch unification

pandas

return (
    clicks_df
    .groupby(["user"])
    .rolling("1h")
    .size()
)

Flink SQL

SELECT
  user,
  COUNT(url) OVER (
    PARTITION BY user
    ORDER BY click_time
    RANGE BETWEEN
      INTERVAL '1' HOUR PRECEDING
      AND CURRENT ROW
  ) AS one_hour_user_click_cnt
FROM clicks

Ibis

agged = clicks.select(
    _.user,
    one_hour_user_click_cnt=_.url.count().over(
        range=(-ibis.interval(hour=1), 0),
        group_by=_.user,
        order_by=_.click_time,
    ),
)

But it’s hard…

  • Streaming is different
    • Time semantics
    • Long-running queries
    • Sources and sinks
  • Less established standards in streaming syntax

Ibis streaming today

  • Flink backend and RisingWave backend launched in Ibis 8.0
  • Introduction of watermarks, windowed aggregations, etc in Ibis

What’s next?

  • Expand support of streaming operations and syntax
  • Continuously iterate on a stream-batch unified API
  • More streaming backends (Spark Structured Streaming)

Towards composable data systems

Try it out now!

Install:

pip install 'ibis-framework[duckdb]'

Then run:

from ibis.interactive import *

t = ibis.examples.penguins.fetch()
t.head()
┏━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━┳━━━━━━━┓
┃ species  island     bill_length_mm  bill_depth_mm  flipper_length_mm  body_mass_g  sex     year  ┃
┡━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━╇━━━━━━━┩
│ stringstringfloat64float64int64int64stringint64 │
├─────────┼───────────┼────────────────┼───────────────┼───────────────────┼─────────────┼────────┼───────┤
│ Adelie Torgersen39.118.71813750male  2007 │
│ Adelie Torgersen39.517.41863800female2007 │
│ Adelie Torgersen40.318.01953250female2007 │
│ Adelie TorgersenNULLNULLNULLNULLNULL2007 │
│ Adelie Torgersen36.719.31933450female2007 │
└─────────┴───────────┴────────────────┴───────────────┴───────────────────┴─────────────┴────────┴───────┘

Questions?

Where to find us