PySpark¶
Install¶
Install ibis and dependencies for the PySpark backend:
pip install 'ibis-framework[pyspark]'
conda install -c conda-forge ibis-pyspark
mamba install -c conda-forge ibis-pyspark
Connect¶
API¶
Create a client by passing in PySpark things to ibis.pyspark.connect
.
See ibis.backends.pyspark.Backend.do_connect
for connection parameter information.
ibis.pyspark.connect
is a thin wrapper around ibis.backends.pyspark.Backend.do_connect
.
Connection Parameters¶
do_connect(session)
¶
Create a PySpark Backend
for use with Ibis.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
SparkSession
|
A SparkSession instance |
required |
Examples:
>>> import ibis
>>> from pyspark.sql import SparkSession
>>> session = SparkSession.builder.getOrCreate()
>>> ibis.pyspark.connect(session)
<ibis.backends.pyspark.Backend at 0x...>
Backend API¶
Backend
¶
Bases: BaseSQLBackend
Classes¶
Options
¶
Bases: ibis.config.Config
PySpark options.
Attributes:
Name | Type | Description |
---|---|---|
treat_nan_as_null |
bool
|
Treat NaNs in floating point expressions as NULL. |
Functions¶
close()
¶
Close Spark connection and drop any temporary objects.
compile(expr, timecontext=None, params=None, *args, **kwargs)
¶
Compile an ibis expression to a PySpark DataFrame object.
compute_stats(name, database=None, noscan=False)
¶
create_database(name, path=None, force=False)
¶
create_table(name, obj=None, *, schema=None, database=None, temp=None, overwrite=False, format='parquet')
¶
Create a new table in Spark.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Name of the new table. |
required |
obj |
ir.Table | pd.DataFrame | None
|
If passed, creates table from |
None
|
schema |
sch.Schema | None
|
Mutually exclusive with |
None
|
database |
str | None
|
Database name |
None
|
temp |
bool | None
|
Whether the new table is temporary |
None
|
overwrite |
bool
|
If |
False
|
format |
str
|
Format of the table on disk |
'parquet'
|
Returns:
Type | Description |
---|---|
Table
|
The newly created table. |
Examples:
>>> con.create_table('new_table_name', table_expr)
create_view(name, obj, *, database=None, overwrite=False)
¶
Create a Spark view from a table expression.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
View name |
required |
obj |
ir.Table
|
Expression to use for the view |
required |
database |
str | None
|
Database name |
None
|
overwrite |
bool
|
Replace an existing view of the same name if it exists |
False
|
Returns:
Type | Description |
---|---|
Table
|
The created view |
drop_database(name, force=False)
¶
drop_table(name, *, database=None, force=False)
¶
Drop a table.
drop_table_or_view(name, *, database=None, force=False)
¶
Drop a Spark table or view.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str
|
Table or view name |
required |
database |
str | None
|
Database name |
None
|
force |
bool
|
Database may throw exception if table does not exist |
False
|
Examples:
>>> table = 'my_table'
>>> db = 'operations'
>>> con.drop_table_or_view(table, db, force=True)
drop_view(name, *, database=None, force=False)
¶
Drop a view.
execute(expr, **kwargs)
¶
Execute an expression.
insert(table_name, obj=None, database=None, overwrite=False, values=None, validate=True)
¶
Insert data into an existing table.
Examples:
>>> table = 'my_table'
>>> con.insert(table, table_expr)
Completely overwrite contents¶
>>> con.insert(table, table_expr, overwrite=True)
read_csv(source_list, table_name=None, **kwargs)
¶
Register a CSV file as a table in the current database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_list |
str | list[str] | tuple[str]
|
The data source(s). May be a path to a file or directory of CSV files, or an iterable of CSV files. |
required |
table_name |
str | None
|
An optional name to use for the created table. This defaults to a sequentially generated name. |
None
|
kwargs |
Any
|
Additional keyword arguments passed to PySpark loading function. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html |
{}
|
Returns:
Type | Description |
---|---|
ir.Table
|
The just-registered table |
read_parquet(source, table_name=None, **kwargs)
¶
Register a parquet file as a table in the current database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str | Path
|
The data source. May be a path to a file or directory of parquet files. |
required |
table_name |
str | None
|
An optional name to use for the created table. This defaults to a sequentially generated name. |
None
|
kwargs |
Any
|
Additional keyword arguments passed to PySpark. https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.parquet.html |
{}
|
Returns:
Type | Description |
---|---|
ir.Table
|
The just-registered table |
register(source, table_name=None, **kwargs)
¶
Register a data source as a table in the current database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str | Path | Any
|
The data source(s). May be a path to a file or directory of parquet/csv files, or an iterable of CSV files. |
required |
table_name |
str | None
|
An optional name to use for the created table. This defaults to a sequentially generated name. |
None
|
**kwargs |
Any
|
Additional keyword arguments passed to PySpark loading functions for CSV or parquet. |
{}
|
Returns:
Type | Description |
---|---|
ir.Table
|
The just-registered table |