Flink
https://nightlies.apache.org/flink/flink-docs-stable/
Install
Install Ibis and dependencies for the Flink backend:
Install alongside the apache-flink
package:
pip install ibis-framework apache-flink
And connect:
import ibis
= ibis.flink.connect() con
- 1
- Adjust connection parameters as needed.
Connect
ibis.flink.connect
= ibis.flink.connect(table_env=table_env) con
ibis.flink.connect
is a thin wrapper around ibis.backends.flink.Backend.do_connect
.
The flink
backend does not create TableEnvironment
objects; you must create a TableEnvironment
and pass that to ibis.flink.connect
.
Connection Parameters
do_connect
do_connect(self, table_env)
Create a Flink Backend
for use with Ibis.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_env | TableEnvironment | A table environment. | required |
Examples
>>> import ibis
>>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>> table_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> ibis.flink.connect(table_env)
<ibis.backends.flink.Backend object at 0x...>
flink.Backend
compile
compile(self, expr, params=None, pretty=False, **_)
Compile an Ibis expression to Flink.
connect
connect(self, *args, **kwargs)
Connect to the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
*args | Mandatory connection parameters, see the docstring of do_connect for details. |
() |
|
**kwargs | Extra connection parameters, see the docstring of do_connect for details. |
{} |
Notes
This creates a new backend instance with saved args
and kwargs
, then calls reconnect
and finally returns the newly created and connected backend instance.
Returns
Name | Type | Description |
---|---|---|
BaseBackend | An instance of the backend |
create_database
create_database(self, name, db_properties=None, catalog=None, force=False)
Create a new database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the new database. | required |
db_properties | dict | Properties of the database. Accepts dictionary of key-value pairs (key1=val1, key2=val2, …). | None |
catalog | str | Name of the catalog in which the new database will be created. | None |
force | bool | If False , an exception is raised if the database already exists. |
False |
create_table
create_table(self, name, obj=None, *, schema=None, database=None, catalog=None, tbl_properties=None, watermark=None, primary_key=None, temp=False, overwrite=False)
Create a new table in Flink.
In Flink, tables can be either virtual (VIEWS) or regular (TABLES). VIEWS can be created from an existing Table object, usually the result of a Table API or SQL query. TABLES describe external data, such as a file, database table, or message queue. In other words, TABLES refer explicitly to tables constructed directly from source/sink connectors.
When obj
is in-memory (e.g., Dataframe), currently this function can create only a TEMPORARY VIEW. If obj
is in-memory and temp
is False, it will raise an error.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the new table. | required |
obj | pd.DataFrame | pa.Table | ir.Table | None | An Ibis table expression, pandas DataFrame, or PyArrow Table that will be used to extract the schema and the data of the new table. An optional schema can be used to override the schema. |
None |
schema | sch.Schema | None | The schema for the new table. Required if obj is not provided. |
None |
database | str | None | Name of the database where the table will be created, if not the default. | None |
catalog | str | None | Name of the catalog where the table will be created, if not the default. | None |
tbl_properties | dict | None | Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector. Accepts dictionary of key-value pairs (key1=val1, key2=val2, …). | None |
watermark | Watermark | None | Watermark strategy for the table, only applicable on sources. | None |
primary_key | str | list[str] | None | A single column or a list of columns to be marked as primary. Raises an error if the column(s) in primary_key is NOT a subset of the columns in schema . Primary keys must be non-nullable in Flink and the columns indicated as primary key will be designated as non-nullable. |
None |
temp | bool | Whether a table is temporary or not. | False |
overwrite | bool | Whether to clobber existing data. | False |
Returns
Name | Type | Description |
---|---|---|
Table | The table that was created. |
create_view
create_view(self, name, obj, *, schema=None, database=None, catalog=None, force=False, temp=False, overwrite=False)
Create a new view from a dataframe or table.
When obj
is in-memory (e.g., Dataframe), currently this function can create only a TEMPORARY VIEW. If obj
is in-memory and temp
is False, it will raise an error.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the new view. | required |
obj | pd.DataFrame | ir.Table | An Ibis table expression that will be used to create the view. | required |
schema | sch.Schema | None | The schema for the new view. | None |
database | str | None | Name of the database where the view will be created, if not provided the database’s default is used. | None |
catalog | str | None | Name of the catalog where the table exists, if not the default. | None |
force | bool | If False , an exception is raised if the table is already present. |
False |
temp | bool | Whether the table is temporary or not. | False |
overwrite | bool | If True , remove the existing view, and create a new one. |
False |
Returns
Name | Type | Description |
---|---|---|
Table | The view that was created. |
disconnect
disconnect(self)
Close the connection to the backend.
drop_database
drop_database(self, name, catalog=None, force=False)
Drop a database with name name
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Database to drop. | required |
catalog | str | Name of the catalog from which the database will be dropped. | None |
force | bool | If False , an exception is raised if the database does not exist. |
False |
drop_table
drop_table(self, name, *, database=None, catalog=None, temp=False, force=False)
Drop a table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the table to drop. | required |
database | str | None | Name of the database where the table exists, if not the default. | None |
catalog | str | None | Name of the catalog where the table exists, if not the default. | None |
temp | bool | Whether the table is temporary or not. | False |
force | bool | If False , an exception is raised if the table does not exist. |
False |
drop_view
drop_view(self, name, *, database=None, catalog=None, temp=False, force=False)
Drop a view.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Name of the view to drop. | required |
database | str | None | Name of the database where the view exists, if not the default. | None |
catalog | str | None | Name of the catalog where the view exists, if not the default. | None |
temp | bool | Whether the view is temporary or not. | False |
force | bool | If False , an exception is raised if the view does not exist. |
False |
execute
execute(self, expr, **kwargs)
Execute an expression.
from_connection
from_connection(cls, table_env)
Create a Flink Backend
from an existing table environment.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_env | TableEnvironment | A table environment. | required |
get_schema
get_schema(self, table_name, *, catalog=None, database=None)
Return a Schema object for the indicated table and database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_name | str | Table name. | required |
catalog | str | Catalog name. | None |
database | str | Database name. | None |
Returns
Name | Type | Description |
---|---|---|
sch.Schema | Ibis schema |
has_operation
has_operation(cls, operation)
Return whether the backend implements support for operation
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
operation | type[ops.Value] | A class corresponding to an operation. | required |
Returns
Name | Type | Description |
---|---|---|
bool | Whether the backend implements the operation. |
Examples
>>> import ibis
>>> import ibis.expr.operations as ops
>>> ibis.sqlite.has_operation(ops.ArrayIndex)
False
>>> ibis.postgres.has_operation(ops.ArrayIndex)
True
insert
insert(self, table_name, obj, database=None, catalog=None, overwrite=False)
Insert data into a table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
table_name | str | The name of the table to insert data into. | required |
obj | pa.Table | pd.DataFrame | ir.Table | list | dict | The source data or expression to insert. | required |
database | str | None | Name of the attached database that the table is located in. | None |
catalog | str | None | Name of the attached catalog that the table is located in. | None |
overwrite | bool | If True then replace existing contents of table. |
False |
Returns
Name | Type | Description |
---|---|---|
TableResult | The table result. |
Raises
Name | Type | Description |
---|---|---|
ValueError | If the type of obj isn’t supported |
list_databases
list_databases(self, like=None)
List existing databases in the current connection.
schema
to refer to database hierarchy.
A collection of table
is referred to as a database
. A collection of database
is referred to as a catalog
.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like | str | None | A pattern in Python’s regex format to filter returned database names. | None |
catalog | str | None | The catalog to list databases from. If None , the current catalog is searched. |
None |
Returns
Name | Type | Description |
---|---|---|
list[str] | The database names that exist in the current connection, that match the like pattern if provided. |
list_tables
list_tables(self, like=None, *, database=None, catalog=None, temp=False)
Return the list of table/view names.
Return the list of table/view names in the database
and catalog
. If database
/catalog
are not specified, their default values will be used. Temporary tables can only be listed for the default database and catalog, hence database
and catalog
are ignored if temp
is True.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like | str | A pattern in Python’s regex format. | None |
temp | bool | Whether to list temporary tables or permanent tables. | False |
database | str | The database to list tables of, if not the current one. | None |
catalog | str | The catalog to list tables of, if not the current one. | None |
Returns
Name | Type | Description |
---|---|---|
list[str] | The list of the table/view names that match the pattern like . |
list_views
list_views(self, like=None, temp=False)
Return the list of view names.
Return the list of view names.
Parameters
Name | Type | Description | Default |
---|---|---|---|
like | str | A pattern in Python’s regex format. | None |
temp | bool | Whether to list temporary views or permanent views. | False |
Returns
Name | Type | Description |
---|---|---|
list[str] | The list of the view names that match the pattern like . |
raw_sql
raw_sql(self, query)
read_csv
read_csv(self, path, schema=None, table_name=None)
Register a csv file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
path | str | Path | The data source. | required |
schema | sch.Schema | None | The schema for the new table. | None |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
read_delta
read_delta(self, source, table_name=None, **kwargs)
Register a Delta Lake table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
source | str | Path | The data source. Must be a directory containing a Delta Lake table. | required |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
**kwargs | Any | Additional keyword arguments passed to the underlying backend or library. | {} |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table. |
read_json
read_json(self, path, schema=None, table_name=None)
Register a json file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
path | str | Path | The data source. | required |
schema | sch.Schema | None | The schema for the new table. | None |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
read_parquet
read_parquet(self, path, schema=None, table_name=None)
Register a parquet file as a table in the current database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
path | str | Path | The data source. | required |
schema | sch.Schema | None | The schema for the new table. | None |
table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
Returns
Name | Type | Description |
---|---|---|
ir.Table | The just-registered table |
reconnect
reconnect(self)
Reconnect to the database already configured with connect.
register_options
register_options(cls)
Register custom backend options.
rename_table
rename_table(self, old_name, new_name, force=True)
Rename an existing table.
Parameters
Name | Type | Description | Default |
---|---|---|---|
old_name | str | The old name of the table. | required |
new_name | str | The new name of the table. | required |
force | bool | If False , an exception is raised if the table does not exist. |
True |
sql
sql(self, query, schema=None, dialect=None)
table
table(self, name, database=None, catalog=None)
Return a table expression from a table or view in the database.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Table name. | required |
database | str | None | Database in which the table resides. | None |
catalog | str | None | Catalog in which the table resides. | None |
Returns
Name | Type | Description |
---|---|---|
Table | Table named name from database |
to_csv
to_csv(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to CSV. | required |
path | str | Path | The data source. A string or Path to the CSV file. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs | Any | Additional keyword arguments passed to pyarrow.csv.CSVWriter | {} |
https | required |
to_delta
to_delta(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to Delta Lake table. | required |
path | str | Path | The data source. A string or Path to the Delta Lake table. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
kwargs | Any | Additional keyword arguments passed to deltalake.writer.write_deltalake method | {} |
to_json
to_json(self, expr, path, **kwargs)
Write the results of expr
to a json file of [{column -> value}, …] objects.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to Delta Lake table. | required |
path | str | Path | The data source. A string or Path to the Delta Lake table. | required |
kwargs | Any | Additional, backend-specifc keyword arguments. | {} |
to_pandas
to_pandas(self, expr, *, params=None, limit=None, **kwargs)
Execute an Ibis expression and return a pandas DataFrame
, Series
, or scalar.
This method is a wrapper around execute
.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to execute. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs | Any | Keyword arguments | {} |
to_pandas_batches
to_pandas_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)
Execute an Ibis expression and return an iterator of pandas DataFrame
s.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to execute. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
chunk_size | int | Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. |
1000000 |
kwargs | Any | Keyword arguments | {} |
Returns
Name | Type | Description |
---|---|---|
Iterator[pd.DataFrame] | An iterator of pandas DataFrame s. |
to_parquet
to_parquet(self, expr, path, *, params=None, **kwargs)
Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
path | str | Path | The data source. A string or Path to the parquet file. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
**kwargs | Any | Additional keyword arguments passed to pyarrow.parquet.ParquetWriter | {} |
https | required |
to_parquet_dir
to_parquet_dir(self, expr, directory, *, params=None, **kwargs)
Write the results of executing the given expression to a parquet file in a directory.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
directory | str | Path | The data source. A string or Path to the directory where the parquet file will be written. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
**kwargs | Any | Additional keyword arguments passed to pyarrow.dataset.write_dataset | {} |
https | required |
to_polars
to_polars(self, expr, *, params=None, limit=None, **kwargs)
Execute expression and return results in as a polars DataFrame.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to export to polars. | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs | Any | Keyword arguments | {} |
Returns
Name | Type | Description |
---|---|---|
dataframe | A polars DataFrame holding the results of the executed expression. |
to_pyarrow
to_pyarrow(self, expr, *, params=None, limit=None, **kwargs)
Execute expression and return results in as a pyarrow table.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to export to pyarrow | required |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
kwargs | Any | Keyword arguments | {} |
Returns
Name | Type | Description |
---|---|---|
Table | A pyarrow table holding the results of the executed expression. |
to_pyarrow_batches
to_pyarrow_batches(self, expr, *, params=None, chunk_size=None, limit=None, **kwargs)
Execute expression and return an iterator of pyarrow record batches.
This method is eager and will execute the associated expression immediately.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to export to pyarrow | required |
limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py . |
None |
params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
chunk_size | int | Maximum number of rows in each returned record batch. | 1000000 |
Returns
Name | Type | Description |
---|---|---|
RecordBatchReader | Collection of pyarrow RecordBatch s. |
to_torch
to_torch(self, expr, *, params=None, limit=None, **kwargs)
Execute an expression and return results as a dictionary of torch tensors.
Parameters
Name | Type | Description | Default |
---|---|---|---|
expr | ir.Expr | Ibis expression to execute. | required |
params | Mapping[ir.Scalar, Any] | None | Parameters to substitute into the expression. | None |
limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
kwargs | Any | Keyword arguments passed into the backend’s to_torch implementation. |
{} |
Returns
Name | Type | Description |
---|---|---|
dict[str, torch.Tensor] | A dictionary of torch tensors, keyed by column name. |
truncate_table
truncate_table(self, name, database=None)
Delete all rows from a table.
schema
to refer to database hierarchy.
A collection of tables is referred to as a database
. A collection of database
is referred to as a catalog
. These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
Name | Type | Description | Default |
---|---|---|---|
name | str | Table name | required |
database | str | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database") . |
None |