import ibis
import ibis.expr.datatypes as dt
from ibis import _, selectors as s, udf
= True ibis.options.interactive
In this blog post we show how to leverage ecosystem tools to build an end-to-end ML pipeline using Ibis, DuckDB and PyTorch.
Check out the live stream of this notebook below!
Letβs get started!
Define a Function to Clean Inputs
Letβs define a function to clean the data in a few different ways:
- Remove outliers (Z-score based)
- Remove negative trip distances and negative fare amounts
- Cast inputs to
float32
, since thatβs what PyTorch wants
We use a function here to ensure that we can run the same code on the test data set before prediction.
def clean_input(path):
return (
# load parquet
ibis.read_parquet(path)# compute fare_amount_zscore and trip_distance_zscore
"fare_amount", "trip_distance"], dict(zscore=(_ - _.mean()) / _.std())))
.mutate(s.across([# filter out negative trip distance and bizarre transactions
filter([_.trip_distance > 0.0, _.fare_amount >= 0.0])
.# keep values that within 2 standard deviations
filter(s.if_all(s.endswith("_zscore"), _.abs() <= 2))
.# drop columns that aren't necessary for further analysis
"_zscore"))
.drop(s.endswith(# select the columns we care about
"fare_amount", "trip_distance"], _.cast("float32")))
.select(s.across([ )
= clean_input("https://storage.googleapis.com/ibis-tutorial-data/nyctaxi/yellow/yellow_tripdata_2016-01.parquet")
training_data training_data
βββββββββββββββββ³ββββββββββββββ β trip_distance β fare_amount β β‘ββββββββββββββββββββββββββββββ© β float32 β float32 β βββββββββββββββββΌββββββββββββββ€ β 3.20 β 14.0 β β 1.00 β 9.5 β β 0.90 β 6.0 β β 0.80 β 5.0 β β 1.80 β 11.0 β β 2.30 β 11.0 β β 13.80 β 43.0 β β 3.46 β 20.0 β β 0.83 β 5.5 β β 0.87 β 7.0 β β β¦ β β¦ β βββββββββββββββββ΄ββββββββββββββ
Execute the Query and Convert to Torch Tensors
New in Ibis 6.0 is the to_torch
method, which executes a query and returns the results as a dictionary of torch.Tensor
s keyed by column names.
Weβll use that to get our input data for training.
import torch
dict[str, torch.Tensor] = training_data.to_torch()
torch_training_data: torch_training_data
{'trip_distance': tensor([3.2000, 1.0000, 0.9000, ..., 5.6300, 0.7700, 1.2600]),
'fare_amount': tensor([14.0000, 9.5000, 6.0000, ..., 18.5000, 5.0000, 6.5000])}
Train the Model
Letβs assume for now we donβt have access to the model code. Maybe your co-worker wrote the model or itβs part of an API that you donβt control. Either way, itβs a black box to us.
The API looks like this:
import pyarrow
class PredictCabFare:
def __init__(self, data: dict[str, torch.Tensor]) -> None:
"""Initialize the model with training data."""
def train(self) -> None:
"""Train the model."""
def __call__(self, input: pyarrow.ChunkedArray) -> pyarrow.Array:
"""Invoke the trained model on unseen input."""
from model import PredictCabFare
= PredictCabFare(torch_training_data)
model model.train()
Define an Ibis UDF that predicts fares
Now we get to the meaty part: defining an Ibis UDF (user-defined function) that invokes our model on unseen data!
from ibis.expr.operations import udf
@udf.scalar.pyarrow
def predict_fare(distance: dt.float64) -> dt.float32:
return model(distance)
Letβs run our UDF
= (
prediction "https://storage.googleapis.com/ibis-tutorial-data/nyctaxi/yellow/yellow_tripdata_2016-02.parquet")
clean_input(10_000)
.limit(=lambda t: predict_fare(t.trip_distance.cast("float32")))
.mutate(predicted_fare
) prediction
βββββββββββββββββ³ββββββββββββββ³βββββββββββββββββ β trip_distance β fare_amount β predicted_fare β β‘βββββββββββββββββββββββββββββββββββββββββββββββ© β float32 β float32 β float32 β βββββββββββββββββΌββββββββββββββΌβββββββββββββββββ€ β 9.1 β 27.0 β 29.085516 β β 3.3 β 11.5 β 12.626410 β β 0.5 β 4.0 β 4.680637 β β 7.4 β 26.5 β 24.261295 β β 1.6 β 7.5 β 7.802191 β β 3.8 β 16.0 β 14.045299 β β 1.1 β 6.0 β 6.383303 β β 6.8 β 21.0 β 22.558630 β β 2.9 β 12.0 β 11.491301 β β 1.2 β 6.5 β 6.667080 β β β¦ β β¦ β β¦ β βββββββββββββββββ΄ββββββββββββββ΄βββββββββββββββββ
Prepare the Data for Plotting
Here we tidy up our data to make it easier to adjust plotting style based on the data.
In this case, weβre interested in visually distinguishing the modelβs predicted fare amount from the actual fare amount so we pivot the data into a longer form which adds a string column metric
that indicates the kind of fare a given row contains.
= prediction.pivot_longer(
pivoted_prediction "fare"),
s.contains(="fare",
values_to="metric",
names_to
) pivoted_prediction
βββββββββββββββββ³βββββββββββββββββ³ββββββββββββ β trip_distance β metric β fare β β‘βββββββββββββββββββββββββββββββββββββββββββββ© β float32 β string β float32 β βββββββββββββββββΌβββββββββββββββββΌββββββββββββ€ β 9.1 β fare_amount β 27.000000 β β 9.1 β predicted_fare β 29.085516 β β 3.3 β fare_amount β 11.500000 β β 3.3 β predicted_fare β 12.626410 β β 0.5 β fare_amount β 4.000000 β β 0.5 β predicted_fare β 4.680637 β β 7.4 β fare_amount β 26.500000 β β 7.4 β predicted_fare β 24.261295 β β 1.6 β fare_amount β 7.500000 β β 1.6 β predicted_fare β 7.802191 β β β¦ β β¦ β β¦ β βββββββββββββββββ΄βββββββββββββββββ΄ββββββββββββ
Plot the Results
There are a bunch of strange and interesting data points and observations that donβt have an obvious explanation:
- There seem to be a good number of \$50-ish rides regardless of distance. Whatβs going on there?
- Whatβs going on with the extreme outliers? For instance, the 50 mile ride that only cost about \$60 or the 25 mile ride that cost about \$140.
from plotnine import aes, ggtitle, ggplot, geom_point, xlab, ylab
(="trip_distance", y="fare", color="metric"))
ggplot(pivoted_prediction, aes(x+ geom_point()
+ xlab("Trip Distance")
+ ylab("Fare")
+ ggtitle("Predicted Fare vs Actual Fare by Trip Distance")
)
<Figure Size: (640 x 480)>
Appendix: model.py
"""Linear regression model for predicting cab fares using PyTorch.
Adapted from https://gist.github.com/pdet/e8d38734232c08e6c15aba79b4eb8368#file-taxi_prediction_example-py.
"""
from __future__ import annotations
import pyarrow as pa
import torch
import tqdm
from torch import nn
class LinearRegression(nn.Module):
def __init__(self, input_dim, output_dim):
super().__init__()
self.linear = nn.Linear(input_dim, output_dim)
def forward(self, distances):
return self.linear(distances)
class PredictCabFare:
def __init__(self, data, learning_rate: float = 0.01, epochs: int = 100) -> None:
# Define the input and output dimensions
= 1
input_dim = 1
output_dim
# Create a linear regression model instance
self.data = data
self.model = LinearRegression(input_dim, output_dim)
self.learning_rate = learning_rate
self.epochs = epochs
def train(self):
= self.data["trip_distance"].reshape(-1, 1)
distances = self.data["fare_amount"].reshape(-1, 1)
fares
# Define the loss function
= nn.MSELoss()
criterion
# Define the optimizer
= torch.optim.SGD(self.model.parameters(), lr=self.learning_rate)
optimizer
# Train the model
for _ in tqdm.trange(self.epochs):
# Forward pass
= self.model(distances)
y_pred
# Compute loss
= criterion(y_pred, fares)
loss
# Backward pass and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
def predict(self, input):
with torch.no_grad():
return self.model(input)
def __call__(self, input: pa.ChunkedArray):
# Convert the input to numpy so it can be fed to the model
#
# .copy() to avoid the warning about undefined behavior from torch
input = torch.from_numpy(input.to_numpy().copy())[:, None]
= self.predict(input).ravel()
predicted return pa.array(predicted.numpy())