DataFusion
https://datafusion.apache.org/
Install
Install Ibis and dependencies for the Apache DataFusion backend:
Install with the Apache datafusion
extra:
pip install 'ibis-framework[datafusion]'
And connect:
import ibis
= ibis.datafusion.connect() con
- 1
- Adjust connection parameters as needed.
Install for Apache DataFusion:
conda install -c conda-forge ibis-datafusion
And connect:
import ibis
= ibis.datafusion.connect() con
- 1
- Adjust connection parameters as needed.
Install for Apache DataFusion:
mamba install -c conda-forge ibis-datafusion
And connect:
import ibis
= ibis.datafusion.connect() con
- 1
- Adjust connection parameters as needed.
Connect
ibis.datafusion.connect
= ibis.datafusion.connect() con
= ibis.datafusion.connect(
con ={"table1": "path/to/file.parquet", "table2": "path/to/file.csv"}
config )
ibis.datafusion.connect
is a thin wrapper around ibis.backends.datafusion.Backend.do_connect
.
Connection Parameters
do_connect
do_connect(['self', 'config=None'])
Create a DataFusion Backend
for use with Ibis.
Parameters
Name | Type | Description | Default |
---|---|---|---|
config | Mapping[str, str | Path] | SessionContext | None | Mapping of table names to files or a SessionContext instance. |
None |
Examples
>>> import ibis
>>> config = {
"astronauts": "ci/ibis-testing-data/parquet/astronauts.parquet",
... "diamonds": "ci/ibis-testing-data/csv/diamonds.csv",
...
... }>>> con = ibis.datafusion.connect(config)
>>> con.list_tables()
'astronauts', 'diamonds']
[>>> con.table("diamonds")
DatabaseTable: diamonds
carat float64
cut string
color string
clarity string
depth float64
table float64
price int64
x float64
y float64 z float64
datafusion.Backend
compile
compile(['self', 'expr', 'limit=None', 'params=None', 'pretty=False'])
Compile an Ibis expression to a SQL string.
connect
connect(['self', '*args', '**kwargs'])
Connect to the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
*args | Mandatory connection parameters, see the docstring of do_connect for details. |
() |
|
**kwargs | Extra connection parameters, see the docstring of do_connect for details. |
{} |
Notes
This creates a new backend instance with saved args
and kwargs
, then calls reconnect
and finally returns the newly created and connected backend instance.
Returns
Name | Type | Description |
---|---|---|
BaseBackend | An instance of the backend |
create_catalog
create_catalog(['self', 'name', 'force=False'])
Create a new catalog.
schema
to refer to database hierarchy.
A collection of table
is referred to as a database
. A collection of database
is referred to as a catalog
.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the new catalog. | required |
force | bool | If False , an exception is raised if the catalog already exists. |
False |
create_database
create_database(['self', 'name', 'catalog=None', 'force=False'])
Create a database named name
in catalog
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the database to create. | required |
catalog | str | None | Name of the catalog in which to create the database. If None , the current catalog is used. |
None |
force | bool | If False , an exception is raised if the database exists. |
False |
create_table
create_table(['self', 'name', 'obj=None', '*', 'schema=None', 'database=None', 'temp=False', 'overwrite=False'])
Create a table in DataFusion.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the table to create | required |
obj | ir.Table | pd.DataFrame | pa.Table | pa.RecordBatchReader | pa.RecordBatch | pl.DataFrame | pl.LazyFrame | None | The data with which to populate the table; optional, but at least one of obj or schema must be specified |
None |
schema | sch.SchemaLike | None | The schema of the table to create; optional, but at least one of obj or schema must be specified |
None |
database | str | None | The name of the database in which to create the table; if not passed, the current database is used. | None |
temp | bool | Create a temporary table | False |
overwrite | bool | If True , replace the table if it already exists, otherwise fail if the table exists |
False |
create_view
create_view(['self', 'name', 'obj', '*', 'database=None', 'overwrite=False'])
Create a new view from an expression.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the new view. | required |
obj | ir.Table | An Ibis table expression that will be used to create the view. | required |
database | str | None | Name of the database where the view will be created, if not provided the database’s default is used. | None |
overwrite | bool | Whether to clobber an existing view with the same name | False |
Returns
Name | Type | Description |
---|---|---|
Table | The view that was created. |
disconnect
disconnect(['self'])
Close the connection to the backend.
drop_catalog
drop_catalog(['self', 'name', 'force=False'])
Drop a catalog with name name
.
schema
to refer to database hierarchy.
A collection of table
is referred to as a database
. A collection of database
is referred to as a catalog
.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Catalog to drop. | required |
force | bool | If False , an exception is raised if the catalog does not exist. |
False |
drop_database
drop_database(['self', 'name', 'catalog=None', 'force=False'])
Drop the database with name
in catalog
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the schema to drop. | required |
catalog | str | None | Name of the catalog to drop the database from. If None , the current catalog is used. |
None |
force | bool | If False , an exception is raised if the database does not exist. |
False |
drop_table
drop_table(['self', 'name', 'database=None', 'force=False'])
Drop a table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the table to drop. | required |
database | str | None | Name of the database where the table exists, if not the default. | None |
force | bool | If False , an exception is raised if the table does not exist. |
False |
drop_view
drop_view(['self', 'name', '*', 'database=None', 'force=False'])
Drop a view.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the view to drop. | required |
database | str | None | Name of the database where the view exists, if not the default. | None |
force | bool | If False , an exception is raised if the view does not exist. |
False |
execute
execute(['self', 'expr', '**kwargs'])
Execute an expression.
from_connection
from_connection(['cls', 'con'])
Create a DataFusion Backend
from an existing SessionContext
instance.
Parameters
Name | Type | Description | Default |
---|---|---|---|
con | SessionContext | A SessionContext instance. |
required |
get_schema
get_schema(['self', 'table_name', '*', 'catalog=None', 'database=None'])
has_operation
has_operation(['cls', 'operation'])
Return whether the backend implements support for operation
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
operation | type[ops.Value] | A class corresponding to an operation. | required |
Returns
Name | Type | Description |
---|---|---|
bool | Whether the backend implements the operation. |
Examples
>>> import ibis
>>> import ibis.expr.operations as ops
>>> ibis.sqlite.has_operation(ops.ArrayIndex)
False
>>> ibis.postgres.has_operation(ops.ArrayIndex)
True
insert
insert(['self', 'table_name', 'obj', 'database=None', 'overwrite=False'])
Insert data into a table.
schema
to refer to database hierarchy.
A collection of table
is referred to as a database
. A collection of database
is referred to as a catalog
.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_name | str | The name of the table to which data needs will be inserted | required |
obj | pd.DataFrame | ir.Table | list | dict | The source data or expression to insert | required |
database | str | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database") . |
None |
overwrite | bool | If True then replace existing contents of table |
False |
list_catalogs
list_catalogs(['self', 'like=None'])
List existing catalogs in the current connection.
schema
to refer to database hierarchy.
A collection of table
is referred to as a database
. A collection of database
is referred to as a catalog
.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like | str | None | A pattern in Python’s regex format to filter returned database names. | None |
Returns
Name | Type | Description |
---|---|---|
list[str] | The catalog names that exist in the current connection, that match the like pattern if provided. |
list_databases
list_databases(['self', 'like=None', 'catalog=None'])
List existing databases in the current connection.
schema
to refer to database hierarchy.
A collection of table
is referred to as a database
. A collection of database
is referred to as a catalog
.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like | str | None | A pattern in Python’s regex format to filter returned database names. | None |
catalog | str | None | The catalog to list databases from. If None , the current catalog is searched. |
None |
Returns
Name | Type | Description |
---|---|---|
list[str] | The database names that exist in the current connection, that match the like pattern if provided. |
list_tables
list_tables(['self', 'like=None', 'database=None'])
Return the list of table names in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like | str | None | A pattern in Python’s regex format. | None |
database | str | None | Unused in the datafusion backend. | None |
Returns
Name | Type | Description |
---|---|---|
list[str] | The list of the table names that match the pattern like . |
raw_sql
raw_sql(['self', 'query'])
Execute a SQL string query
against the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
query | str | sge.Expression | Raw SQL string | required |
kwargs | Backend specific query arguments | required |
read_csv
read_csv(['self', 'source_list', 'table_name=None', '**kwargs'])
Register a CSV file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source_list | str | Path | list[str | Path] | tuple[str | Path] | The data source. A string or Path to the CSV file. | required |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs | Any | Additional keyword arguments passed to DataFusion loading function. | {} |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
read_delta
read_delta(['self', 'source_table', 'table_name=None', '**kwargs'])
Register a Delta Lake table as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source_table | str | Path | The data source. Must be a directory containing a Delta Lake table. | required |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs | Any | Additional keyword arguments passed to deltalake.DeltaTable. | {} |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
read_json
read_json(['self', 'path', 'table_name=None', '**kwargs'])
Register a JSON file as a table in the current backend.
Parameters
Name | Type | Description | Default |
---|---|---|---|
path | str | Path | The data source. A string or Path to the JSON file. | required |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
read_parquet
read_parquet(['self', 'path', 'table_name=None', '**kwargs'])
Register a parquet file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
path | str | Path | The data source. | required |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs | Any | Additional keyword arguments passed to DataFusion loading function. | {} |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
reconnect
reconnect(['self'])
Reconnect to the database already configured with connect.
register
register(['self', 'source', 'table_name=None', '**kwargs'])
register_options
register_options(['cls'])
Register custom backend options.
rename_table
rename_table(['self', 'old_name', 'new_name'])
Rename an existing table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
old_name | str | The old name of the table. | required |
new_name | str | The new name of the table. | required |
sql
sql(['self', 'query', 'schema=None', 'dialect=None'])
table
table(['self', 'name', 'database=None'])
Construct a table expression.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Table name | required |
database | tuple[str, str] | str | None | Database name | None |
Returns
Name | Type | Description |
---|---|---|
Table | Table expression |
to_csv
to_csv(['self', 'expr', 'path', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to CSV. | required |
path | str | Path | The data source. A string or Path to the CSV file. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs | Any | Additional keyword arguments passed to pyarrow.csv.CSVWriter | {} |
https | required |
to_delta
to_delta(['self', 'expr', 'path', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to Delta Lake table. | required |
path | str | Path | The data source. A string or Path to the Delta Lake table. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs | Any | Additional keyword arguments passed to deltalake.writer.write_deltalake method | {} |
to_pandas
to_pandas(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute an Ibis expression and return a pandas DataFrame
, Series
, or scalar.
This method is a wrapper around execute
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to execute. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs | Any | Keyword arguments | {} |
to_pandas_batches
to_pandas_batches(['self', 'expr', '*', 'params=None', 'limit=None', 'chunk_size=1000000', '**kwargs'])
Execute an Ibis expression and return an iterator of pandas DataFrame
s.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to execute. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
chunk_size | int | Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. |
1000000 |
kwargs | Any | Keyword arguments | {} |
Returns
Name | Type | Description |
---|---|---|
Iterator[pd.DataFrame] | An iterator of pandas DataFrame s. |
to_parquet
to_parquet(['self', 'expr', 'path', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
path | str | Path | The data source. A string or Path to the parquet file. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
**kwargs | Any | Additional keyword arguments passed to pyarrow.parquet.ParquetWriter | {} |
https | required |
to_parquet_dir
to_parquet_dir(['self', 'expr', 'directory', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a parquet file in a directory.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
directory | str | Path | The data source. A string or Path to the directory where the parquet file will be written. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
**kwargs | Any | Additional keyword arguments passed to pyarrow.dataset.write_dataset | {} |
https | required |
to_polars
to_polars(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute expression and return results in as a polars DataFrame.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to export to polars. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs | Any | Keyword arguments | {} |
Returns
Name | Type | Description |
---|---|---|
dataframe | A polars DataFrame holding the results of the executed expression. |
to_pyarrow
to_pyarrow(['self', 'expr', '**kwargs'])
Execute expression and return results in as a pyarrow table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to export to pyarrow | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs | Any | Keyword arguments | {} |
Returns
Name | Type | Description |
---|---|---|
Table | A pyarrow table holding the results of the executed expression. |
to_pyarrow_batches
to_pyarrow_batches(['self', 'expr', '*', 'chunk_size=1000000', '**kwargs'])
Execute expression and return an iterator of pyarrow record batches.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to export to pyarrow | required |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
chunk_size | int | Maximum number of rows in each returned record batch. | 1000000 |
Returns
Name | Type | Description |
---|---|---|
RecordBatchReader | Collection of pyarrow RecordBatch s. |
to_torch
to_torch(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute an expression and return results as a dictionary of torch tensors.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to execute. | required |
params | Mapping[ir.Scalar, Any] | None | Parameters to substitute into the expression. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
kwargs | Any | Keyword arguments passed into the backend’s to_torch implementation. |
{} |
Returns
Name | Type | Description |
---|---|---|
dict[str, torch.Tensor] | A dictionary of torch tensors, keyed by column name. |
truncate_table
truncate_table(['self', 'name', 'database=None'])
Delete all rows from a table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Table name | required |
database | str | None | Database name | None |