Flink

https://nightlies.apache.org/flink/flink-docs-stable/

Install

Install Ibis and dependencies for the Flink backend:

Install alongside the apache-flink package:

pip install ibis-framework apache-flink

And connect:

import ibis

con = ibis.flink.connect()
1
Adjust connection parameters as needed.

Connect

ibis.flink.connect

con = ibis.flink.connect(table_env=table_env)
Note

ibis.flink.connect is a thin wrapper around ibis.backends.flink.Backend.do_connect.

Note

The flink backend does not create TableEnvironment objects; you must create a TableEnvironment and pass that to ibis.flink.connect.

Connection Parameters

do_connect

do_connect(self, table_env)

Create a Flink Backend for use with Ibis.

Parameters
Name Type Description Default
table_env TableEnvironment A table environment. required
Examples
>>> import ibis
>>> from pyflink.table import EnvironmentSettings, TableEnvironment
>>> table_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
>>> ibis.flink.connect(table_env)
<ibis.backends.flink.Backend object at 0x...>

flink.Backend

compile

compile(self, expr, params=None, pretty=False, **_)

Compile an Ibis expression to Flink.

connect

connect(self, *args, **kwargs)

Connect to the database.

Parameters

Name Type Description Default
*args Mandatory connection parameters, see the docstring of do_connect for details. ()
**kwargs Extra connection parameters, see the docstring of do_connect for details. {}

Notes

This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.

Returns

Name Type Description
BaseBackend An instance of the backend

create_database

create_database(self, name, db_properties=None, catalog=None, force=False)

Create a new database.

Parameters

Name Type Description Default
name str Name of the new database. required
db_properties dict Properties of the database. Accepts dictionary of key-value pairs (key1=val1, key2=val2, …). None
catalog str Name of the catalog in which the new database will be created. None
force bool If False, an exception is raised if the database already exists. False

create_table

create_table(self, name, obj=None, *, schema=None, database=None, catalog=None, tbl_properties=None, watermark=None, primary_key=None, temp=False, overwrite=False)

Create a new table in Flink.

In Flink, tables can be either virtual (VIEWS) or regular (TABLES). VIEWS can be created from an existing Table object, usually the result of a Table API or SQL query. TABLES describe external data, such as a file, database table, or message queue. In other words, TABLES refer explicitly to tables constructed directly from source/sink connectors.

When obj is in-memory (e.g., Dataframe), currently this function can create only a TEMPORARY VIEW. If obj is in-memory and temp is False, it will raise an error.

Parameters

Name Type Description Default
name str Name of the new table. required
obj pd.DataFrame | pa.Table | ir.Table | None An Ibis table expression, pandas DataFrame, or PyArrow Table that will be used to extract the schema and the data of the new table. An optional schema can be used to override the schema. None
schema sch.Schema | None The schema for the new table. Required if obj is not provided. None
database str | None Name of the database where the table will be created, if not the default. None
catalog str | None Name of the catalog where the table will be created, if not the default. None
tbl_properties dict | None Table properties used to create a table source/sink. The properties are usually used to find and create the underlying connector. Accepts dictionary of key-value pairs (key1=val1, key2=val2, …). None
watermark Watermark | None Watermark strategy for the table, only applicable on sources. None
primary_key str | list[str] | None A single column or a list of columns to be marked as primary. Raises an error if the column(s) in primary_key is NOT a subset of the columns in schema. Primary keys must be non-nullable in Flink and the columns indicated as primary key will be designated as non-nullable. None
temp bool Whether a table is temporary or not. False
overwrite bool Whether to clobber existing data. False

Returns

Name Type Description
Table The table that was created.

create_view

create_view(self, name, obj, *, schema=None, database=None, catalog=None, force=False, temp=False, overwrite=False)

Create a new view from a dataframe or table.

When obj is in-memory (e.g., Dataframe), currently this function can create only a TEMPORARY VIEW. If obj is in-memory and temp is False, it will raise an error.

Parameters

Name Type Description Default
name str Name of the new view. required
obj pd.DataFrame | ir.Table An Ibis table expression that will be used to create the view. required
schema sch.Schema | None The schema for the new view. None
database str | None Name of the database where the view will be created, if not provided the database’s default is used. None
catalog str | None Name of the catalog where the table exists, if not the default. None
force bool If False, an exception is raised if the table is already present. False
temp bool Whether the table is temporary or not. False
overwrite bool If True, remove the existing view, and create a new one. False

Returns

Name Type Description
Table The view that was created.

disconnect

disconnect(self)

Close the connection to the backend.

drop_database

drop_database(self, name, catalog=None, force=False)

Drop a database with name name.

Parameters

Name Type Description Default
name str Database to drop. required
catalog str Name of the catalog from which the database will be dropped. None
force bool If False, an exception is raised if the database does not exist. False

drop_table

drop_table(self, name, *, database=None, catalog=None, temp=False, force=False)

Drop a table.

Parameters

Name Type Description Default
name str Name of the table to drop. required
database str | None Name of the database where the table exists, if not the default. None
catalog str | None Name of the catalog where the table exists, if not the default. None
temp bool Whether the table is temporary or not. False
force bool If False, an exception is raised if the table does not exist. False

drop_view

drop_view(self, name, *, database=None, catalog=None, temp=False, force=False)

Drop a view.

Parameters

Name Type Description Default
name str Name of the view to drop. required
database str | None Name of the database where the view exists, if not the default. None
catalog str | None Name of the catalog where the view exists, if not the default. None
temp bool Whether the view is temporary or not. False
force bool If False, an exception is raised if the view does not exist. False

execute

execute(self, expr, **kwargs)

Execute an expression.

from_connection

from_connection(cls, table_env)

Create a Flink Backend from an existing table environment.

Parameters

Name Type Description Default
table_env TableEnvironment A table environment. required

get_schema

get_schema(self, table_name, *, catalog=None, database=None)

Return a Schema object for the indicated table and database.

Parameters

Name Type Description Default
table_name str Table name. required
catalog str Catalog name. None
database str Database name. None

Returns

Name Type Description
sch.Schema Ibis schema

has_operation

has_operation(cls, operation)

Return whether the backend implements support for operation.

Parameters

Name Type Description Default
operation type[ops.Value] A class corresponding to an operation. required

Returns

Name Type Description
bool Whether the backend implements the operation.

Examples

>>> import ibis
>>> import ibis.expr.operations as ops
>>> ibis.sqlite.has_operation(ops.ArrayIndex)
False
>>> ibis.postgres.has_operation(ops.ArrayIndex)
True

insert

insert(self, table_name, obj, database=None, catalog=None, overwrite=False)

Insert data into a table.

Parameters

Name Type Description Default
table_name str The name of the table to insert data into. required
obj pa.Table | pd.DataFrame | ir.Table | list | dict The source data or expression to insert. required
database str | None Name of the attached database that the table is located in. None
catalog str | None Name of the attached catalog that the table is located in. None
overwrite bool If True then replace existing contents of table. False

Returns

Name Type Description
TableResult The table result.

Raises

Name Type Description
ValueError If the type of obj isn’t supported

list_databases

list_databases(self, like=None)

List existing databases in the current connection.

Ibis does not use the word schema to refer to database hierarchy.

A collection of table is referred to as a database. A collection of database is referred to as a catalog.

These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.

Parameters

Name Type Description Default
like str | None A pattern in Python’s regex format to filter returned database names. None
catalog str | None The catalog to list databases from. If None, the current catalog is searched. None

Returns

Name Type Description
list[str] The database names that exist in the current connection, that match the like pattern if provided.

list_tables

list_tables(self, like=None, *, database=None, catalog=None, temp=False)

Return the list of table/view names.

Return the list of table/view names in the database and catalog. If database/catalog are not specified, their default values will be used. Temporary tables can only be listed for the default database and catalog, hence database and catalog are ignored if temp is True.

Parameters

Name Type Description Default
like str A pattern in Python’s regex format. None
temp bool Whether to list temporary tables or permanent tables. False
database str The database to list tables of, if not the current one. None
catalog str The catalog to list tables of, if not the current one. None

Returns

Name Type Description
list[str] The list of the table/view names that match the pattern like.

list_views

list_views(self, like=None, temp=False)

Return the list of view names.

Return the list of view names.

Parameters

Name Type Description Default
like str A pattern in Python’s regex format. None
temp bool Whether to list temporary views or permanent views. False

Returns

Name Type Description
list[str] The list of the view names that match the pattern like.

raw_sql

raw_sql(self, query)

read_csv

read_csv(self, path, schema=None, table_name=None)

Register a csv file as a table in the current database.

Parameters

Name Type Description Default
path str | Path The data source. required
schema sch.Schema | None The schema for the new table. None
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None

Returns

Name Type Description
ir.Table The just-registered table

read_delta

read_delta(self, source, table_name=None, **kwargs)

Register a Delta Lake table in the current database.

Parameters

Name Type Description Default
source str | Path The data source. Must be a directory containing a Delta Lake table. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
**kwargs Any Additional keyword arguments passed to the underlying backend or library. {}

Returns

Name Type Description
ir.Table The just-registered table.

read_json

read_json(self, path, schema=None, table_name=None)

Register a json file as a table in the current database.

Parameters

Name Type Description Default
path str | Path The data source. required
schema sch.Schema | None The schema for the new table. None
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None

Returns

Name Type Description
ir.Table The just-registered table

read_parquet

read_parquet(self, path, schema=None, table_name=None)

Register a parquet file as a table in the current database.

Parameters

Name Type Description Default
path str | Path The data source. required
schema sch.Schema | None The schema for the new table. None
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None

Returns

Name Type Description
ir.Table The just-registered table

reconnect

reconnect(self)

Reconnect to the database already configured with connect.

register_options

register_options(cls)

Register custom backend options.

rename_table

rename_table(self, old_name, new_name, force=True)

Rename an existing table.

Parameters

Name Type Description Default
old_name str The old name of the table. required
new_name str The new name of the table. required
force bool If False, an exception is raised if the table does not exist. True

sql

sql(self, query, schema=None, dialect=None)

table

table(self, name, database=None, catalog=None)

Return a table expression from a table or view in the database.

Parameters

Name Type Description Default
name str Table name. required
database str | None Database in which the table resides. None
catalog str | None Catalog in which the table resides. None

Returns

Name Type Description
Table Table named name from database

to_csv

to_csv(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a CSV file.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to CSV. required
path str | Path The data source. A string or Path to the CSV file. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
kwargs Any Additional keyword arguments passed to pyarrow.csv.CSVWriter {}
https required

to_delta

to_delta(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a Delta Lake table.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to Delta Lake table. required
path str | Path The data source. A string or Path to the Delta Lake table. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
kwargs Any Additional keyword arguments passed to deltalake.writer.write_deltalake method {}

to_pandas

to_pandas(self, expr, *, params=None, limit=None, **kwargs)

Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.

Note

This method is a wrapper around execute.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs Any Keyword arguments {}

to_pandas_batches

to_pandas_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)

Execute an Ibis expression and return an iterator of pandas DataFrames.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
chunk_size int Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. 1000000
kwargs Any Keyword arguments {}

Returns

Name Type Description
Iterator[pd.DataFrame] An iterator of pandas DataFrames.

to_parquet

to_parquet(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a parquet file.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to parquet. required
path str | Path The data source. A string or Path to the parquet file. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
**kwargs Any Additional keyword arguments passed to pyarrow.parquet.ParquetWriter {}
https required

to_parquet_dir

to_parquet_dir(self, expr, directory, *, params=None, **kwargs)

Write the results of executing the given expression to a parquet file in a directory.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to parquet. required
directory str | Path The data source. A string or Path to the directory where the parquet file will be written. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
**kwargs Any Additional keyword arguments passed to pyarrow.dataset.write_dataset {}
https required

to_polars

to_polars(self, expr, *, params=None, limit=None, **kwargs)

Execute expression and return results in as a polars DataFrame.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to export to polars. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs Any Keyword arguments {}

Returns

Name Type Description
dataframe A polars DataFrame holding the results of the executed expression.

to_pyarrow

to_pyarrow(self, expr, *, params=None, limit=None, **kwargs)

Execute expression and return results in as a pyarrow table.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to export to pyarrow required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs Any Keyword arguments {}

Returns

Name Type Description
Table A pyarrow table holding the results of the executed expression.

to_pyarrow_batches

to_pyarrow_batches(self, expr, *, params=None, chunk_size=None, limit=None, **kwargs)

Execute expression and return an iterator of pyarrow record batches.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to export to pyarrow required
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
chunk_size int Maximum number of rows in each returned record batch. 1000000

Returns

Name Type Description
RecordBatchReader Collection of pyarrow RecordBatchs.

to_torch

to_torch(self, expr, *, params=None, limit=None, **kwargs)

Execute an expression and return results as a dictionary of torch tensors.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Parameters to substitute into the expression. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. None
kwargs Any Keyword arguments passed into the backend’s to_torch implementation. {}

Returns

Name Type Description
dict[str, torch.Tensor] A dictionary of torch tensors, keyed by column name.

truncate_table

truncate_table(self, name, database=None)

Delete all rows from a table.

Ibis does not use the word schema to refer to database hierarchy.

A collection of tables is referred to as a database. A collection of database is referred to as a catalog. These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.

Parameters

Name Type Description Default
name str Table name required
database str | None Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). None
Back to top