PySpark
https://spark.apache.org/docs/latest/api/python
Install
Install Ibis and dependencies for the PySpark backend:
Install with the pyspark
extra:
pip install 'ibis-framework[pyspark]'
And connect:
import ibis
= ibis.pyspark.connect() con
- 1
- Adjust connection parameters as needed.
Install for PySpark:
conda install -c conda-forge ibis-pyspark
And connect:
import ibis
= ibis.pyspark.connect() con
- 1
- Adjust connection parameters as needed.
Install for PySpark:
mamba install -c conda-forge ibis-pyspark
And connect:
import ibis
= ibis.pyspark.connect() con
- 1
- Adjust connection parameters as needed.
Connect
ibis.pyspark.connect
= ibis.pyspark.connect(session=session) con
ibis.pyspark.connect
is a thin wrapper around ibis.backends.pyspark.Backend.do_connect
.
The pyspark
backend does not create SparkSession
objects, you must create a SparkSession
and pass that to ibis.pyspark.connect
.
Connection Parameters
do_connect
do_connect(self, session)
Create a PySpark Backend
for use with Ibis.
Parameters
Name | Type | Description | Default |
---|---|---|---|
session |
SparkSession | A SparkSession instance | required |
Examples
>>> import ibis
>>> from pyspark.sql import SparkSession
>>> session = SparkSession.builder.getOrCreate()
>>> ibis.pyspark.connect(session)
<ibis.backends.pyspark.Backend at 0x...>
pyspark.Backend
close
close(self)
Close Spark connection and drop any temporary objects.
compile
compile(self, expr, timecontext=None, params=None, *args, **kwargs)
Compile an ibis expression to a PySpark DataFrame object.
compute_stats
compute_stats(self, name, database=None, noscan=False)
Issue a COMPUTE STATISTICS
command for a given table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table name | required |
database |
str | None | Database name | None |
noscan |
bool | If True , collect only basic statistics for the table (number of rows, size in bytes). |
False |
create_database
create_database(self, name, path=None, force=False)
Create a new Spark database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Database name | required |
path |
str | Path | None | Path where to store the database data; otherwise uses Spark default | None |
force |
bool | Whether to append IF NOT EXISTS to the database creation SQL |
False |
create_table
create_table(self, name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, format='parquet')
Create a new table in Spark.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Name of the new table. | required |
obj |
ir.Table | pd.DataFrame | pa.Table | None | If passed, creates table from SELECT statement results |
None |
schema |
sch.Schema | None | Mutually exclusive with obj , creates an empty table with a schema |
None |
database |
str | None | Database name | None |
temp |
bool | None | Whether the new table is temporary | None |
overwrite |
bool | If True , overwrite existing data |
False |
format |
str | Format of the table on disk | 'parquet' |
Returns
Type | Description |
---|---|
Table | The newly created table. |
Examples
>>> con.create_table(
"new_table_name", table_expr
... # quartodoc: +SKIP ... )
create_view
create_view(self, name, obj, *, database=None, overwrite=False)
Create a Spark view from a table expression.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | View name | required |
obj |
ir.Table | Expression to use for the view | required |
database |
str | None | Database name | None |
overwrite |
bool | Replace an existing view of the same name if it exists | False |
Returns
Type | Description |
---|---|
Table | The created view |
drop_database
drop_database(self, name, force=False)
Drop a Spark database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Database name | required |
force |
bool | If False, Spark throws exception if database is not empty or database does not exist | False |
drop_table
drop_table(self, name, *, database=None, force=False)
Drop a table.
drop_table_or_view
drop_table_or_view(self, name, *, database=None, force=False)
Drop a Spark table or view.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table or view name | required |
database |
str | None | Database name | None |
force |
bool | Database may throw exception if table does not exist | False |
Examples
>>> table = "my_table"
>>> db = "operations"
>>> con.drop_table_or_view(
=True
... table, db, force# quartodoc: +SKIP ... )
drop_view
drop_view(self, name, *, database=None, force=False)
Drop a view.
execute
execute(self, expr, **kwargs)
Execute an expression.
fetch_from_cursor
fetch_from_cursor(self, cursor, schema)
Fetch data from cursor.
get_schema
get_schema(self, table_name, database=None)
Return a Schema object for the indicated table and database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_name |
str | Table name. May be fully qualified | required |
database |
str | None | Spark does not have a database argument for its table() method, so this must be None | None |
Returns
Type | Description |
---|---|
Schema | An ibis schema |
has_operation
has_operation(cls, operation)
insert
insert(self, table_name, obj=None, database=None, overwrite=False, values=None, validate=True)
Insert data into an existing table.
Examples
>>> table = "my_table"
>>> con.insert(table, table_expr) # quartodoc: +SKIP
Completely overwrite contents
>>> con.insert(
=True
... table, table_expr, overwrite# quartodoc: +SKIP ... )
list_databases
list_databases(self, like=None)
List existing databases in the current connection.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like |
str | None | A pattern in Python’s regex format to filter returned database names. | None |
Returns
Type | Description |
---|---|
list[str] | The database names that exist in the current connection, that match the like pattern if provided. |
list_tables
list_tables(self, like=None, database=None)
Return the list of table names in the current database.
For some backends, the tables may be files in a directory, or other equivalent entities in a SQL database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like |
str | None | A pattern in Python’s regex format. | None |
database |
str | None | The database from which to list tables. If not provided, the current database is used. | None |
Returns
Type | Description |
---|---|
list[str] | The list of the table names that match the pattern like . |
raw_sql
raw_sql(self, query)
Execute a query string and return the cursor used for execution.
.sql
instead
If your query is a SELECT statement, you should use the backend .sql
method to avoid having to release the cursor returned from this method manually.
raw_sql
.
To release a cursor, call the close
method on the returned cursor object.
You can close the cursor by explicitly calling its close
method:
= con.raw_sql("SELECT ...")
cursor cursor.close()
Or you can use a context manager:
with con.raw_sql("SELECT ...") as cursor:
...
Parameters
Name | Type | Description | Default |
---|---|---|---|
query |
str | DDL or DML statement | required |
Examples
>>> con = ibis.connect("duckdb://")
>>> with con.raw_sql("SELECT 1") as cursor:
= cursor.fetchall()
... result
...>>> result
1,)]
[(>>> cursor.closed
True
read_csv
read_csv(self, source_list, table_name=None, **kwargs)
Register a CSV file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source_list |
str | list[str] | tuple[str] | The data source(s). May be a path to a file or directory of CSV files, or an iterable of CSV files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
Any | Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html | {} |
Returns
Type | Description |
---|---|
ir.Table | The just-registered table |
read_delta
read_delta(self, source, table_name=None, **kwargs)
Register a Delta Lake table as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source |
str | Path | The path to the Delta Lake table. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
Any | Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.load.html | {} |
Returns
Type | Description |
---|---|
ir.Table | The just-registered table |
read_json
read_json(self, source_list, table_name=None, **kwargs)
Register a JSON file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source_list |
str | Sequence[str] | The data source(s). May be a path to a file or directory of JSON files, or an iterable of JSON files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
Any | Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.json.html | {} |
Returns
Type | Description |
---|---|
ir.Table | The just-registered table |
read_parquet
read_parquet(self, source, table_name=None, **kwargs)
Register a parquet file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source |
str | Path | The data source. May be a path to a file or directory of parquet files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
kwargs |
Any | Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html | {} |
Returns
Type | Description |
---|---|
ir.Table | The just-registered table |
register
register(self, source, table_name=None, **kwargs)
Register a data source as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source |
str | Path | Any | The data source(s). May be a path to a file or directory of parquet/csv files, or an iterable of CSV files. | required |
table_name |
str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs |
Any | Additional keyword arguments passed to PySpark loading functions for CSV or parquet. | {} |
Returns
Type | Description |
---|---|
ir.Table | The just-registered table |
rename_table
rename_table(self, old_name, new_name)
Rename an existing table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
old_name |
str | The old name of the table. | required |
new_name |
str | The new name of the table. | required |
table
table(self, name, database=None)
Return a table expression from a table or view in the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table name | required |
database |
str | None | Database in which the table resides | None |
Returns
Type | Description |
---|---|
Table | Table named name from database |
to_delta
to_delta(self, expr, path, **kwargs)
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr |
ir.Table | The ibis expression to execute and persist to a Delta Lake table. | required |
path |
str | Path | The data source. A string or Path to the Delta Lake table. | required |
**kwargs |
Any | PySpark Delta Lake table write arguments. https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.save.html | {} |
truncate_table
truncate_table(self, name, database=None)
Delete all rows from an existing table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name |
str | Table name | required |
database |
str | None | Database name | None |