RisingWave

Install
Install Ibis and dependencies for the RisingWave backend:
Install with the risingwave extra:
pip install 'ibis-framework[risingwave]'And connect:
import ibis
con = ibis.risingwave.connect()- 1
- Adjust connection parameters as needed.
Install for RisingWave:
conda install -c conda-forge ibis-risingwaveAnd connect:
import ibis
con = ibis.risingwave.connect()- 1
- Adjust connection parameters as needed.
Install for RisingWave:
mamba install -c conda-forge ibis-risingwaveAnd connect:
import ibis
con = ibis.risingwave.connect()- 1
- Adjust connection parameters as needed.
Connect
ibis.risingwave.connect
con = ibis.risingwave.connect(
user="username",
password="password",
host="hostname",
port=4566,
database="database",
)ibis.risingwave.connect is a thin wrapper around ibis.backends.risingwave.Backend.do_connect.
Connection Parameters
do_connect
do_connect(['self', 'host=None', 'user=None', 'password=None', 'port=5432', 'database=None', 'schema=None'])
Create an Ibis client connected to RisingWave database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| host | str | None | Hostname | None |
| user | str | None | Username | None |
| password | str | None | Password | None |
| port | int | Port number | 5432 |
| database | str | None | Database to connect to | None |
| schema | str | None | RisingWave schema to use. If None, use the default search_path. |
None |
Examples
>>> import os
>>> import ibis
>>> host = os.environ.get("IBIS_TEST_RISINGWAVE_HOST", "localhost")
>>> user = os.environ.get("IBIS_TEST_RISINGWAVE_USER", "root")
>>> password = os.environ.get("IBIS_TEST_RISINGWAVE_PASSWORD", "")
>>> database = os.environ.get("IBIS_TEST_RISINGWAVE_DATABASE", "dev")
>>> con = ibis.risingwave.connect(
... database=database,
... host=host,
... user=user,
... password=password,
... port=4566,
... )
>>> con.list_tables()
[...]
>>> t = con.table("functional_alltypes")
>>> t
DatabaseTable: functional_alltypes
id int32
bool_col boolean
tinyint_col int16
smallint_col int16
int_col int32
bigint_col int64
float_col float32
double_col float64
date_string_col string
string_col string
timestamp_col timestamp(6)
year int32
month int32risingwave.Backend
begin
begin(['self'])
compile
compile(['self', 'expr', 'limit=None', 'params=None', 'pretty=False'])
Compile an Ibis expression to a SQL string.
connect
connect(['self', '*args', '**kwargs'])
Connect to the database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| *args | Mandatory connection parameters, see the docstring of do_connect for details. |
() |
|
| **kwargs | Extra connection parameters, see the docstring of do_connect for details. |
{} |
Notes
This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.
Returns
| Name | Type | Description |
|---|---|---|
| BaseBackend | An instance of the backend |
create_database
create_database(['self', 'name', 'catalog=None', 'force=False'])
create_materialized_view
create_materialized_view(['self', 'name', 'obj', '*', 'database=None', 'overwrite=False'])
Create a materialized view. Materialized views can be accessed like a normal table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Materialized view name to Create. | required |
| obj | ir.Table | The select statement to materialize. | required |
| database | str | None | Name of the database where the view exists, if not the default | None |
| overwrite | bool | Whether to overwrite the existing materialized view with the same name | False |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
create_sink
create_sink(['self', 'name', 'sink_from=None', 'connector_properties=None', '*', 'obj=None', 'database=None', 'data_format=None', 'encode_format=None', 'encode_properties=None'])
Creating a sink.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Sink name to Create. | required |
| sink_from | str | None | The table or materialized view name to sink from. Only one of sink_from or obj can be provided. |
None |
| connector_properties | dict | None | The properties of the sink connector, providing the connector settings to push to the downstream data sink. Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. | None |
| obj | ir.Table | None | An Ibis table expression that will be used to extract the schema and the data of the new table. Only one of sink_from or obj can be provided. |
None |
| database | str | None | Name of the database where the source exists, if not the default. | None |
| data_format | str | None | The data format for the new source, e.g., “PLAIN”. data_format and encode_format must be specified at the same time. | None |
| encode_format | str | None | The encode format for the new source, e.g., “JSON”. data_format and encode_format must be specified at the same time. | None |
| encode_properties | dict | None | The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. | None |
create_source
create_source(['self', 'name', 'schema', '*', 'database=None', 'connector_properties', 'data_format', 'encode_format', 'encode_properties=None'])
Creating a source.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Source name to Create. | required |
| schema | ibis.Schema | The schema for the new Source. | required |
| database | str | None | Name of the database where the source exists, if not the default. | None |
| connector_properties | dict | The properties of the source connector, providing the connector settings to access the upstream data source. Refer https://docs.risingwave.com/docs/current/data-ingestion/ for the required properties of different data source. | required |
| data_format | str | The data format for the new source, e.g., “PLAIN”. data_format and encode_format must be specified at the same time. | required |
| encode_format | str | The encode format for the new source, e.g., “JSON”. data_format and encode_format must be specified at the same time. | required |
| encode_properties | dict | None | The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. | None |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
create_table
create_table(['self', 'name', 'obj=None', '*', 'schema=None', 'database=None', 'temp=False', 'overwrite=False', 'connector_properties=None', 'data_format=None', 'encode_format=None', 'encode_properties=None'])
Create a table in RisingWave.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Name of the table to create | required |
| obj | ir.Table | pd.DataFrame | pa.Table | pl.DataFrame | pl.LazyFrame | None | The data with which to populate the table; optional, but at least one of obj or schema must be specified |
None |
| schema | sch.SchemaLike | None | The schema of the table to create; optional, but at least one of obj or schema must be specified |
None |
| database | str | None | The name of the database in which to create the table; if not passed, the current database is used. | None |
| temp | bool | Create a temporary table | False |
| overwrite | bool | If True, replace the table if it already exists, otherwise fail if the table exists |
False |
| connector_properties | dict | None | The properties of the sink connector, providing the connector settings to push to the downstream data sink. Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. | None |
| data_format | str | None | The data format for the new source, e.g., “PLAIN”. data_format and encode_format must be specified at the same time. | None |
| encode_format | str | None | The encode format for the new source, e.g., “JSON”. data_format and encode_format must be specified at the same time. | None |
| encode_properties | dict | None | The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. | None |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
create_view
create_view(['self', 'name', 'obj', '*', 'database=None', 'overwrite=False'])
disconnect
disconnect(['self'])
drop_database
drop_database(['self', 'name', 'catalog=None', 'force=False', 'cascade=False'])
drop_materialized_view
drop_materialized_view(['self', 'name', '*', 'database=None', 'force=False'])
Drop a materialized view.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Materialized view name to drop. | required |
| database | str | None | Name of the database where the view exists, if not the default. | None |
| force | bool | If False, an exception is raised if the view does not exist. |
False |
drop_sink
drop_sink(['self', 'name', '*', 'database=None', 'force=False'])
Drop a Sink.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Sink name to drop. | required |
| database | str | None | Name of the database where the view exists, if not the default. | None |
| force | bool | If False, an exception is raised if the source does not exist. |
False |
drop_source
drop_source(['self', 'name', '*', 'database=None', 'force=False'])
Drop a Source.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Source name to drop. | required |
| database | str | None | Name of the database where the view exists, if not the default. | None |
| force | bool | If False, an exception is raised if the source does not exist. |
False |
drop_table
drop_table(['self', 'name', 'database=None', 'force=False'])
drop_view
drop_view(['self', 'name', '*', 'database=None', 'force=False'])
execute
execute(['self', 'expr', 'params=None', "limit='default'", '**kwargs'])
Execute an expression.
from_connection
from_connection(['cls', 'con'])
Create an Ibis client from an existing connection to a PostgreSQL database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| con | psycopg2.extensions.connection | An existing connection to a PostgreSQL database. | required |
function
function(['self', 'name', '*', 'database=None'])
get_schema
get_schema(['self', 'name', '*', 'catalog=None', 'database=None'])
has_operation
has_operation(['cls', 'operation'])
insert
insert(['self', 'table_name', 'obj', 'database=None', 'overwrite=False'])
Insert data into a table.
schema to refer to database hierarchy.
A collection of table is referred to as a database. A collection of database is referred to as a catalog.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table_name | str | The name of the table to which data needs will be inserted | required |
| obj | pd.DataFrame | ir.Table | list | dict | The source data or expression to insert | required |
| database | str | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). |
None |
| overwrite | bool | If True then replace existing contents of table |
False |
list_catalogs
list_catalogs(['self', 'like=None'])
list_databases
list_databases(['self', '*', 'like=None', 'catalog=None'])
list_tables
list_tables(['self', 'like=None', 'database=None'])
List the tables in the database.
schema to refer to database hierarchy.
A collection of tables is referred to as a database. A collection of database is referred to as a catalog.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| like | str | None | A pattern to use for listing tables. | None |
| database | tuple[str, str] | str | None | Database to list tables from. Default behavior is to show tables in the current database. | None |
raw_sql
raw_sql(['self', 'query', '**kwargs'])
read_csv
read_csv(['self', 'path', 'table_name=None', '**kwargs'])
Register a CSV file as a table in the current backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. A string or Path to the CSV file. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table |
read_delta
read_delta(['self', 'source', 'table_name=None', '**kwargs'])
Register a Delta Lake table in the current database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| source | str | Path | The data source. Must be a directory containing a Delta Lake table. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the underlying backend or library. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table. |
read_json
read_json(['self', 'path', 'table_name=None', '**kwargs'])
Register a JSON file as a table in the current backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. A string or Path to the JSON file. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table |
read_parquet
read_parquet(['self', 'path', 'table_name=None', '**kwargs'])
Register a parquet file as a table in the current backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table |
reconnect
reconnect(['self'])
Reconnect to the database already configured with connect.
register_options
register_options(['cls'])
Register custom backend options.
rename_table
rename_table(['self', 'old_name', 'new_name'])
Rename an existing table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| old_name | str | The old name of the table. | required |
| new_name | str | The new name of the table. | required |
sql
sql(['self', 'query', 'schema=None', 'dialect=None'])
table
table(['self', 'name', 'database=None'])
Construct a table expression.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | tuple[str, str] | str | None | Database name | None |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression |
to_csv
to_csv(['self', 'expr', 'path', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to CSV. | required |
| path | str | Path | The data source. A string or Path to the CSV file. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| kwargs | Any | Additional keyword arguments passed to pyarrow.csv.CSVWriter | {} |
| https | required |
to_delta
to_delta(['self', 'expr', 'path', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to Delta Lake table. | required |
| path | str | Path | The data source. A string or Path to the Delta Lake table. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| kwargs | Any | Additional keyword arguments passed to deltalake.writer.write_deltalake method | {} |
to_pandas
to_pandas(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.
This method is a wrapper around execute.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
to_pandas_batches
to_pandas_batches(['self', 'expr', '*', 'params=None', 'limit=None', 'chunk_size=1000000', '**kwargs'])
Execute an Ibis expression and return an iterator of pandas DataFrames.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
| chunk_size | int | Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. |
1000000 |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Iterator[pd.DataFrame] | An iterator of pandas DataFrames. |
to_parquet
to_parquet(['self', 'expr', 'path', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
| path | str | Path | The data source. A string or Path to the parquet file. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| **kwargs | Any | Additional keyword arguments passed to pyarrow.parquet.ParquetWriter | {} |
| https | required |
to_parquet_dir
to_parquet_dir(['self', 'expr', 'directory', '*', 'params=None', '**kwargs'])
Write the results of executing the given expression to a parquet file in a directory.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
| directory | str | Path | The data source. A string or Path to the directory where the parquet file will be written. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| **kwargs | Any | Additional keyword arguments passed to pyarrow.dataset.write_dataset | {} |
| https | required |
to_polars
to_polars(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute expression and return results in as a polars DataFrame.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to export to polars. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| dataframe | A polars DataFrame holding the results of the executed expression. |
to_pyarrow
to_pyarrow(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute expression and return results in as a pyarrow table.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to export to pyarrow | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Table | A pyarrow table holding the results of the executed expression. |
to_pyarrow_batches
to_pyarrow_batches(['self', 'expr', '*', 'params=None', 'limit=None', 'chunk_size=1000000', '**_'])
Execute expression and return an iterator of pyarrow record batches.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to export to pyarrow | required |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. |
None |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| chunk_size | int | Maximum number of rows in each returned record batch. | 1000000 |
Returns
| Name | Type | Description |
|---|---|---|
| RecordBatchReader | Collection of pyarrow RecordBatchs. |
to_torch
to_torch(['self', 'expr', '*', 'params=None', 'limit=None', '**kwargs'])
Execute an expression and return results as a dictionary of torch tensors.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Parameters to substitute into the expression. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
| kwargs | Any | Keyword arguments passed into the backend’s to_torch implementation. |
{} |
Returns
| Name | Type | Description |
|---|---|---|
| dict[str, torch.Tensor] | A dictionary of torch tensors, keyed by column name. |
truncate_table
truncate_table(['self', 'name', 'database=None'])
Delete all rows from a table.
schema to refer to database hierarchy.
A collection of tables is referred to as a database. A collection of database is referred to as a catalog. These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | str | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). |
None |