Skip to content

PySpark

Install

Install ibis and dependencies for the PySpark backend:

pip install 'ibis-framework[pyspark]'
conda install -c conda-forge ibis-pyspark
mamba install -c conda-forge ibis-pyspark

Connect

API

Create a client by passing in PySpark things to ibis.pyspark.connect.

See ibis.backends.pyspark.Backend.do_connect for connection parameter information.

ibis.pyspark.connect is a thin wrapper around ibis.backends.pyspark.Backend.do_connect.

Connection Parameters

do_connect(session)

Create a PySpark Backend for use with Ibis.

Parameters:

Name Type Description Default
session SparkSession

A SparkSession instance

required

Examples:

>>> import ibis
>>> from pyspark.sql import SparkSession
>>> session = SparkSession.builder.getOrCreate()
>>> ibis.pyspark.connect(session)
<ibis.backends.pyspark.Backend at 0x...>

Backend API

Backend

Bases: BaseSQLBackend

Classes

Options

Bases: ibis.config.Config

PySpark options.

Attributes:

Name Type Description
treat_nan_as_null bool

Treat NaNs in floating point expressions as NULL.

Functions

close()

Close Spark connection and drop any temporary objects.

compile(expr, timecontext=None, params=None, *args, **kwargs)

Compile an ibis expression to a PySpark DataFrame object.

compute_stats(name, database=None, noscan=False)

Issue a COMPUTE STATISTICS command for a given table.

Parameters:

Name Type Description Default
name str

Table name

required
database str | None

Database name

None
noscan bool

If True, collect only basic statistics for the table (number of rows, size in bytes).

False
create_database(name, path=None, force=False)

Create a new Spark database.

Parameters:

Name Type Description Default
name str

Database name

required
path str | Path | None

Path where to store the database data; otherwise uses Spark default

None
force bool

Whether to append IF EXISTS to the database creation SQL

False
create_table(name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, format='parquet')

Create a new table in Spark.

Parameters:

Name Type Description Default
name str

Name of the new table.

required
obj ir.Table | pd.DataFrame | None

If passed, creates table from SELECT statement results

None
schema sch.Schema | None

Mutually exclusive with obj, creates an empty table with a schema

None
database str | None

Database name

None
temp bool | None

Whether the new table is temporary

None
overwrite bool

If True, overwrite existing data

False
format str

Format of the table on disk

'parquet'

Returns:

Type Description
Table

The newly created table.

Examples:

>>> con.create_table('new_table_name', table_expr)
create_view(name, obj, *, database=None, overwrite=False)

Create a Spark view from a table expression.

Parameters:

Name Type Description Default
name str

View name

required
obj ir.Table

Expression to use for the view

required
database str | None

Database name

None
overwrite bool

Replace an existing view of the same name if it exists

False

Returns:

Type Description
Table

The created view

drop_database(name, force=False)

Drop a Spark database.

Parameters:

Name Type Description Default
name str

Database name

required
force bool

If False, Spark throws exception if database is not empty or database does not exist

False
drop_table(name, *, database=None, force=False)

Drop a table.

drop_table_or_view(name, *, database=None, force=False)

Drop a Spark table or view.

Parameters:

Name Type Description Default
name str

Table or view name

required
database str | None

Database name

None
force bool

Database may throw exception if table does not exist

False

Examples:

>>> table = 'my_table'
>>> db = 'operations'
>>> con.drop_table_or_view(table, db, force=True)
drop_view(name, *, database=None, force=False)

Drop a view.

execute(expr, **kwargs)

Execute an expression.

insert(table_name, obj=None, database=None, overwrite=False, values=None, validate=True)

Insert data into an existing table.

Examples:

>>> table = 'my_table'
>>> con.insert(table, table_expr)
Completely overwrite contents
>>> con.insert(table, table_expr, overwrite=True)
read_csv(source_list, table_name=None, **kwargs)

Register a CSV file as a table in the current database.

Parameters:

Name Type Description Default
source_list str | list[str] | tuple[str]

The data source(s). May be a path to a file or directory of CSV files, or an iterable of CSV files.

required
table_name str | None

An optional name to use for the created table. This defaults to a sequentially generated name.

None
kwargs Any

Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html

{}

Returns:

Type Description
ir.Table

The just-registered table

read_parquet(source, table_name=None, **kwargs)

Register a parquet file as a table in the current database.

Parameters:

Name Type Description Default
source str | Path

The data source. May be a path to a file or directory of parquet files.

required
table_name str | None

An optional name to use for the created table. This defaults to a sequentially generated name.

None
kwargs Any

Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html

{}

Returns:

Type Description
ir.Table

The just-registered table

register(source, table_name=None, **kwargs)

Register a data source as a table in the current database.

Parameters:

Name Type Description Default
source str | Path | Any

The data source(s). May be a path to a file or directory of parquet/csv files, or an iterable of CSV files.

required
table_name str | None

An optional name to use for the created table. This defaults to a sequentially generated name.

None
**kwargs Any

Additional keyword arguments passed to PySpark loading functions for CSV or parquet.

{}

Returns:

Type Description
ir.Table

The just-registered table

table(name, database=None)

Return a table expression from a table or view in the database.

Parameters:

Name Type Description Default
name str

Table name

required
database str | None

Database in which the table resides

None

Returns:

Type Description
Table

Table named name from database

truncate_table(name, database=None)

Delete all rows from an existing table.

Parameters:

Name Type Description Default
name str

Table name

required
database str | None

Database name

None

Last update: March 1, 2022