

Install Ibis and dependencies for the PySpark backend:

Install with the pyspark extra:

pip install 'ibis-framework[pyspark]'

And connect:

import ibis

con = ibis.pyspark.connect()
Adjust connection parameters as needed.

Install for PySpark:

conda install -c conda-forge ibis-pyspark

And connect:

import ibis

con = ibis.pyspark.connect()
Adjust connection parameters as needed.

Install for PySpark:

mamba install -c conda-forge ibis-pyspark

And connect:

import ibis

con = ibis.pyspark.connect()
Adjust connection parameters as needed.



con = ibis.pyspark.connect(session=session)

ibis.pyspark.connect is a thin wrapper around ibis.backends.pyspark.Backend.do_connect.

Connection Parameters


do_connect(self, session=None, mode='batch', **kwargs)

Create a PySpark Backend for use with Ibis.

Name Type Description Default
session SparkSession | None A SparkSession instance. None
mode ConnectionMode Can be either “batch” or “streaming”. If “batch”, every source, sink, and query executed within this connection will be interpreted as a batch workload. If “streaming”, every source, sink, and query executed within this connection will be interpreted as a streaming workload. 'batch'
kwargs Additional keyword arguments used to configure the SparkSession. {}
>>> import ibis
>>> from pyspark.sql import SparkSession
>>> session = SparkSession.builder.getOrCreate()
>>> ibis.pyspark.connect(session)
<ibis.backends.pyspark.Backend at 0x...>

ibis.connect URL format

In addition to ibis.pyspark.connect, you can also connect to PySpark by passing a properly-formatted PySpark connection URL to ibis.connect:

con = ibis.connect(f"pyspark://{warehouse-dir}?[2]")



compile(self, expr, /, *, limit=None, params=None, pretty=False)

Compile an expression to a SQL string.


Name Type Description Default
expr ir.Expr An ibis expression to compile. required
limit str | int | None An integer to effect a specific row limit. A value of None means no limit. None
params Mapping[ir.Expr, Any] | None Mapping of scalar parameter expressions to value. None
pretty bool Pretty print the SQL query during compilation. False


Name Type Description
str Compiled expression


compute_stats(self, name, database=None, noscan=False)

Issue a COMPUTE STATISTICS command for a given table.


Name Type Description Default
name str Table name required
database str | None Database name None
noscan bool If True, collect only basic statistics for the table (number of rows, size in bytes). False


connect(self, *args, **kwargs)

Connect to the database.


Name Type Description Default
*args Mandatory connection parameters, see the docstring of do_connect for details. ()
**kwargs Extra connection parameters, see the docstring of do_connect for details. {}


This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.


Name Type Description
BaseBackend An instance of the backend


create_database(self, name, /, *, catalog=None, path=None, force=False)

Create a new Spark database.


Name Type Description Default
name str Database name required
catalog str | None Catalog to create database in (defaults to current_catalog) None
path str | Path | None Path where to store the database data; otherwise uses Spark default None
force bool Whether to append IF NOT EXISTS to the database creation SQL False


create_table(self, name, /, obj=None, *, schema=None, database=None, temp=None, overwrite=False, format='parquet', partition_by=None)

Create a new table in Spark.


Name Type Description Default
name str Name of the new table. required
obj ir.Table | pd.DataFrame | pa.Table | pl.DataFrame | pl.LazyFrame | None If passed, creates table from SELECT statement results None
schema sch.SchemaLike | None Mutually exclusive with obj, creates an empty table with a schema None
database str | None Database name To specify a table in a separate catalog, you can pass in the catalog and database as a string "catalog.database", or as a tuple of strings ("catalog", "database"). None
temp bool | None Whether the new table is temporary (unsupported) None
overwrite bool If True, overwrite existing data False
format str Format of the table on disk 'parquet'
partition_by str | list[str] | None Name(s) of partitioning column(s) None


Name Type Description
Table The newly created table.


>>> con.create_table("new_table_name", table_expr)  # quartodoc: +SKIP


create_view(self, name, /, obj, *, database=None, overwrite=False)

Create a temporary Spark view from a table expression.


Name Type Description Default
name str View name required
obj ir.Table Expression to use for the view required
database str | None Database name None
overwrite bool Replace an existing view of the same name if it exists False


Name Type Description
Table The created view



Disconnect from the backend.


drop_database(self, name, /, *, catalog=None, force=False)

Drop a Spark database.


Name Type Description Default
name str Database name required
catalog str | None Catalog containing database to drop (defaults to current_catalog) None
force bool If False, Spark throws exception if database is not empty or database does not exist False


drop_table(self, name, /, *, database=None, force=False)

Drop a table from the backend.


Name Type Description Default
name str The name of the table to drop required
database tuple[str, str] | str | None The database that the table is located in. None
force bool If True, do not raise an error if the table does not exist. False


drop_view(self, name, /, *, database=None, force=False)

Drop a view from the backend.


Name Type Description Default
name str The name of the view to drop. required
database str | None The database that the view is located in. None
force bool If True, do not raise an error if the view does not exist. False


execute(self, expr, /, *, params=None, limit=None, **kwargs)

Execute an expression.


from_connection(cls, session, /, *, mode='batch', **kwargs)

Create a PySpark Backend from an existing SparkSession instance.


Name Type Description Default
session SparkSession A SparkSession instance. required
mode ConnectionMode Can be either “batch” or “streaming”. If “batch”, every source, sink, and query executed within this connection will be interpreted as a batch workload. If “streaming”, every source, sink, and query executed within this connection will be interpreted as a streaming workload. 'batch'
kwargs Any Additional keyword arguments used to configure the SparkSession. {}


get_schema(self, table_name, *, catalog=None, database=None)

Return a Schema object for the indicated table and database.


Name Type Description Default
table_name str Table name. May be fully qualified required
catalog str | None Catalog to use None
database str | None Database to use to get the active database. None


Name Type Description
Schema An ibis schema


has_operation(cls, operation, /)

Return whether the backend supports the given operation.


Name Type Description Default
operation type[ops.Value] Operation type, a Python class object. required


insert(self, name, /, obj, *, database=None, overwrite=False)

Insert data into a table.

Ibis does not use the word schema to refer to database hierarchy.

A collection of table is referred to as a database. A collection of database is referred to as a catalog.

These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.


Name Type Description Default
name str The name of the table to which data needs will be inserted required
obj pd.DataFrame | ir.Table | list | dict The source data or expression to insert required
database str | None Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). None
overwrite bool If True then replace existing contents of table False


list_catalogs(self, *, like=None)

List existing catalogs in the current connection.

Ibis does not use the word schema to refer to database hierarchy.

A collection of table is referred to as a database. A collection of database is referred to as a catalog.

These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.


Name Type Description Default
like str | None A pattern in Python’s regex format to filter returned database names. None


Name Type Description
list[str] The catalog names that exist in the current connection, that match the like pattern if provided.


list_databases(self, *, like=None, catalog=None)

List existing databases in the current connection.

Ibis does not use the word schema to refer to database hierarchy.

A collection of table is referred to as a database. A collection of database is referred to as a catalog.

These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.


Name Type Description Default
like str | None A pattern in Python’s regex format to filter returned database names. None
catalog str | None The catalog to list databases from. If None, the current catalog is searched. None


Name Type Description
list[str] The database names that exist in the current connection, that match the like pattern if provided.


list_tables(self, *, like=None, database=None)

List the tables in the database.


Name Type Description Default
like str | None A pattern to use for listing tables. None
database str | None Database to list tables from. Default behavior is to show tables in the current catalog and database. To specify a table in a separate catalog, you can pass in the catalog and database as a string "catalog.database", or as a tuple of strings ("catalog", "database"). None


raw_sql(self, query, **kwargs)


read_csv(self, paths, /, *, table_name=None, **kwargs)

Register a CSV file as a table in the current database.


Name Type Description Default
paths str | list[str] | tuple[str] The data source(s). May be a path to a file or directory of CSV files, or an iterable of CSV files. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
kwargs Any Additional keyword arguments passed to PySpark loading function. {}


Name Type Description
ir.Table The just-registered table


read_csv_dir(self, path, /, *, table_name=None, watermark=None, **kwargs)

Register a CSV directory as a table in the current database.


Name Type Description Default
path str | Path The data source. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
watermark Watermark | None Watermark strategy for the table. None
kwargs Any Additional keyword arguments passed to PySpark loading function. {}


Name Type Description
ir.Table The just-registered table


read_delta(self, path, /, *, table_name=None, **kwargs)

Register a Delta Lake table as a table in the current database.


Name Type Description Default
path str | Path The path to the Delta Lake table. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
kwargs Any Additional keyword arguments passed to PySpark. {}


Name Type Description
ir.Table The just-registered table


read_json(self, paths, /, *, table_name=None, **kwargs)

Register a JSON file as a table in the current database.


Name Type Description Default
paths str | Sequence[str] The data source(s). May be a path to a file or directory of JSON files, or an iterable of JSON files. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
kwargs Any Additional keyword arguments passed to PySpark loading function. {}


Name Type Description
ir.Table The just-registered table


read_json_dir(self, path, /, *, table_name=None, watermark=None, **kwargs)

Register a JSON file as a table in the current database.


Name Type Description Default
path str | Path The data source. A directory of JSON files. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
watermark Watermark | None Watermark strategy for the table. None
kwargs Any Additional keyword arguments passed to PySpark loading function. {}


Name Type Description
ir.Table The just-registered table


read_kafka(self, *, table_name=None, watermark=None, auto_parse=False, schema=None, options=None)

Register a Kafka topic as a table.


Name Type Description Default
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
watermark Watermark | None Watermark strategy for the table. None
auto_parse bool Whether to parse Kafka messages automatically. If False, the source is read as binary keys and values. If True, the key is discarded and the value is parsed using the provided schema. False
schema sch.Schema | None Schema of the value of the Kafka messages. None
options Mapping[str, str] | None Additional arguments passed to PySpark as .option(“key”, “value”). None


Name Type Description
ir.Table The just-registered table


read_parquet(self, path, /, *, table_name=None, **kwargs)

Register a parquet file as a table in the current database.


Name Type Description Default
path str | Path The data source. May be a path to a file or directory of parquet files. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
kwargs Any Additional keyword arguments passed to PySpark. {}


Name Type Description
ir.Table The just-registered table


read_parquet_dir(self, path, /, *, table_name=None, watermark=None, schema=None, **kwargs)

Register a parquet file as a table in the current database.


Name Type Description Default
path str | Path The data source. A directory of parquet files. required
table_name str | None An optional name to use for the created table. This defaults to a random generated name. None
watermark Watermark | None Watermark strategy for the table. None
schema sch.Schema | None Schema of the parquet source. None
kwargs Any Additional keyword arguments passed to PySpark. {}


Name Type Description
ir.Table The just-registered table



Reconnect to the database already configured with connect.



Register custom backend options.


rename_table(self, old_name, new_name)

Rename an existing table.


Name Type Description Default
old_name str The old name of the table. required
new_name str The new name of the table. required


sql(self, query, /, *, schema=None, dialect=None)

Create an Ibis table expression from a SQL query.


Name Type Description Default
query str A SQL query string required
schema SchemaLike | None The schema of the query. If not provided, Ibis will try to infer the schema of the query. None
dialect str | None The SQL dialect of the query. If not provided, the backend’s dialect is assumed. This argument can be useful when the query is written in a different dialect from the backend. None


Name Type Description
ir.Table The table expression representing the query


table(self, name, /, *, database=None)

Construct a table expression.


Name Type Description Default
name str Table name required
database tuple[str, str] | str | None Database name None


Name Type Description
Table Table expression


to_csv(self, expr, /, path, *, params=None, **kwargs)

Write the results of executing the given expression to a CSV file.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Table The ibis expression to execute and persist to CSV. required
path str | Path The data source. A string or Path to the CSV file. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
kwargs Any Additional keyword arguments passed to pyarrow.csv.CSVWriter {}
https required


to_csv_dir(self, expr, /, path, *, params=None, limit=None, options=None)

Write the results of executing the given expression to a CSV directory.


Name Type Description Default
expr ir.Expr The ibis expression to execute and persist to CSV. required
path str | Path The data source. A string or Path to the CSV directory. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/ None
options Mapping[str, str] | None Additional keyword arguments passed to pyspark.sql.streaming.DataStreamWriter None


Name Type Description
StreamingQuery | None Returns a Pyspark StreamingQuery object if in streaming mode, otherwise None


to_delta(self, expr, /, path, *, params=None, limit=None, **kwargs)

Write the results of executing the given expression to a Delta Lake table.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Table The ibis expression to execute and persist to a Delta Lake table. required
path str | Path The data source. A string or Path to the Delta Lake table. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/ None
**kwargs Any Additional keyword arguments passed to pyspark.sql.DataFrameWriter. {}


to_json(self, expr, /, path, **kwargs)

Write the results of expr to a json file of [{column -> value}, …] objects.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Table The ibis expression to execute and persist to Delta Lake table. required
path str | Path The data source. A string or Path to the Delta Lake table. required
kwargs Any Additional, backend-specifc keyword arguments. {}


to_kafka(self, expr, /, *, auto_format=False, options=None, params=None, limit='default')

Write the results of executing the given expression to a Kafka topic.

This method does not return outputs. Streaming queries are run continuously in the background.


Name Type Description Default
expr ir.Expr The ibis expression to execute and persist to a Kafka topic. required
auto_format bool Whether to format the Kafka messages before writing. If False, the output is written as-is. If True, the output is converted into JSON and written as the value of the Kafka messages. False
options Mapping[str, str] | None PySpark Kafka write arguments. None
params Mapping | None Mapping of scalar parameter expressions to value. None
limit str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/ 'default'


Name Type Description
StreamingQuery A Pyspark StreamingQuery object


to_pandas(self, expr, /, *, params=None, limit=None, **kwargs)

Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.


This method is a wrapper around execute.


Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/ None
kwargs Any Keyword arguments {}


to_pandas_batches(self, expr, /, *, params=None, limit=None, chunk_size=1000000, **kwargs)

Execute an Ibis expression and return an iterator of pandas DataFrames.


Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/ None
chunk_size int Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. 1000000
kwargs Any Keyword arguments {}


Name Type Description
Iterator[pd.DataFrame] An iterator of pandas DataFrames.


to_parquet(self, expr, /, path, *, params=None, limit=None, **kwargs)

Write the results of executing the given expression to a Parquet file.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Table The ibis expression to execute and persist to a Parquet file. required
path str | Path The data source. A string or Path to the Parquet file. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/ None
**kwargs Any Additional keyword arguments passed to pyspark.sql.DataFrameWriter. {}


to_parquet_dir(self, expr, /, path, *, params=None, limit=None, options=None)

Write the results of executing the given expression to a parquet directory.


Name Type Description Default
expr ir.Expr The ibis expression to execute and persist to parquet. required
path str | Path The data source. A string or Path to the parquet directory. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/ None
options Mapping[str, str] | None Additional keyword arguments passed to pyspark.sql.streaming.DataStreamWriter None


Name Type Description
StreamingQuery | None Returns a Pyspark StreamingQuery object if in streaming mode, otherwise None


to_polars(self, expr, /, *, params=None, limit=None, **kwargs)

Execute expression and return results in as a polars DataFrame.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Expr Ibis expression to export to polars. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/ None
kwargs Any Keyword arguments {}


Name Type Description
dataframe A polars DataFrame holding the results of the executed expression.


to_pyarrow(self, expr, /, *, params=None, limit=None, **kwargs)

Execute expression and return results in as a pyarrow table.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Expr Ibis expression to export to pyarrow required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/ None
kwargs Any Keyword arguments {}


Name Type Description
Table A pyarrow table holding the results of the executed expression.


to_pyarrow_batches(self, expr, /, *, params=None, limit=None, chunk_size=1000000, **kwargs)

Execute expression and return an iterator of PyArrow record batches.

This method is eager and will execute the associated expression immediately.


Name Type Description Default
expr ir.Expr Ibis expression to export to pyarrow required
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/ None
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
chunk_size int Maximum number of rows in each returned record batch. 1000000


Name Type Description
RecordBatchReader Collection of pyarrow RecordBatchs.


to_torch(self, expr, /, *, params=None, limit=None, **kwargs)

Execute an expression and return results as a dictionary of torch tensors.


Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Parameters to substitute into the expression. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. None
kwargs Any Keyword arguments passed into the backend’s to_torch implementation. {}


Name Type Description
dict[str, torch.Tensor] A dictionary of torch tensors, keyed by column name.


truncate_table(self, name, /, *, database=None)

Delete all rows from a table.

Ibis does not use the word schema to refer to database hierarchy.

A collection of tables is referred to as a database. A collection of database is referred to as a catalog. These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.


Name Type Description Default
name str Table name required
database str | None Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). None
Back to top