RisingWave

https://risingwave.com/

Install

Install Ibis and dependencies for the RisingWave backend:

Install with the risingwave extra:

pip install 'ibis-framework[risingwave]'

And connect:

import ibis

con = ibis.risingwave.connect()
1
Adjust connection parameters as needed.

Install for RisingWave:

conda install -c conda-forge ibis-risingwave

And connect:

import ibis

con = ibis.risingwave.connect()
1
Adjust connection parameters as needed.

Install for RisingWave:

mamba install -c conda-forge ibis-risingwave

And connect:

import ibis

con = ibis.risingwave.connect()
1
Adjust connection parameters as needed.

Connect

ibis.risingwave.connect

con = ibis.risingwave.connect(
    user="username",
    password="password",
    host="hostname",
    port=4566,
    database="database",
)
Note

ibis.risingwave.connect is a thin wrapper around ibis.backends.risingwave.Backend.do_connect.

Connection Parameters

do_connect

do_connect(self, host=None, user=None, password=None, port=5432, database=None, schema=None)

Create an Ibis client connected to RisingWave database.

Parameters
Name Type Description Default
host str | None Hostname None
user str | None Username None
password str | None Password None
port int Port number 5432
database str | None Database to connect to None
schema str | None RisingWave schema to use. If None, use the default search_path. None
Examples
>>> import os
>>> import ibis
>>> host = os.environ.get("IBIS_TEST_RISINGWAVE_HOST", "localhost")
>>> user = os.environ.get("IBIS_TEST_RISINGWAVE_USER", "root")
>>> password = os.environ.get("IBIS_TEST_RISINGWAVE_PASSWORD", "")
>>> database = os.environ.get("IBIS_TEST_RISINGWAVE_DATABASE", "dev")
>>> con = ibis.risingwave.connect(
...     database=database,
...     host=host,
...     user=user,
...     password=password,
...     port=4566,
... )
>>> con.list_tables()
[...]
>>> t = con.table("functional_alltypes")
>>> t
DatabaseTable: functional_alltypes
  id              int32
  bool_col        boolean
  tinyint_col     int16
  smallint_col    int16
  int_col         int32
  bigint_col      int64
  float_col       float32
  double_col      float64
  date_string_col string
  string_col      string
  timestamp_col   timestamp(6)
  year            int32
  month           int32

risingwave.Backend

begin

begin(self)

compile

compile(self, expr, limit=None, params=None, pretty=False)

Compile an Ibis expression to a SQL string.

connect

connect(self, *args, **kwargs)

Connect to the database.

Parameters

Name Type Description Default
*args Mandatory connection parameters, see the docstring of do_connect for details. ()
**kwargs Extra connection parameters, see the docstring of do_connect for details. {}

Notes

This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.

Returns

Name Type Description
BaseBackend An instance of the backend

create_database

create_database(self, name, catalog=None, force=False)

create_materialized_view

create_materialized_view(self, name, obj, *, database=None, overwrite=False)

Create a materialized view. Materialized views can be accessed like a normal table.

Parameters

Name Type Description Default
name str Materialized view name to Create. required
obj ir.Table The select statement to materialize. required
database str | None Name of the database where the view exists, if not the default None
overwrite bool Whether to overwrite the existing materialized view with the same name False

Returns

Name Type Description
Table Table expression

create_sink

create_sink(self, name, sink_from=None, connector_properties=None, *, obj=None, database=None, data_format=None, encode_format=None, encode_properties=None)

Creating a sink.

Parameters

Name Type Description Default
name str Sink name to Create. required
sink_from str | None The table or materialized view name to sink from. Only one of sink_from or obj can be provided. None
connector_properties dict | None The properties of the sink connector, providing the connector settings to push to the downstream data sink. Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. None
obj ir.Table | None An Ibis table expression that will be used to extract the schema and the data of the new table. Only one of sink_from or obj can be provided. None
database str | None Name of the database where the source exists, if not the default. None
data_format str | None The data format for the new source, e.g., “PLAIN”. data_format and encode_format must be specified at the same time. None
encode_format str | None The encode format for the new source, e.g., “JSON”. data_format and encode_format must be specified at the same time. None
encode_properties dict | None The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. None

create_source

create_source(self, name, schema, *, database=None, connector_properties, data_format, encode_format, encode_properties=None)

Creating a source.

Parameters

Name Type Description Default
name str Source name to Create. required
schema ibis.Schema The schema for the new Source. required
database str | None Name of the database where the source exists, if not the default. None
connector_properties dict The properties of the source connector, providing the connector settings to access the upstream data source. Refer https://docs.risingwave.com/docs/current/data-ingestion/ for the required properties of different data source. required
data_format str The data format for the new source, e.g., “PLAIN”. data_format and encode_format must be specified at the same time. required
encode_format str The encode format for the new source, e.g., “JSON”. data_format and encode_format must be specified at the same time. required
encode_properties dict | None The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. None

Returns

Name Type Description
Table Table expression

create_table

create_table(self, name, obj=None, *, schema=None, database=None, temp=False, overwrite=False, connector_properties=None, data_format=None, encode_format=None, encode_properties=None)

Create a table in RisingWave.

Parameters

Name Type Description Default
name str Name of the table to create required
obj ir.Table | pd.DataFrame | pa.Table | pl.DataFrame | pl.LazyFrame | None The data with which to populate the table; optional, but at least one of obj or schema must be specified None
schema sch.SchemaLike | None The schema of the table to create; optional, but at least one of obj or schema must be specified None
database str | None The name of the database in which to create the table; if not passed, the current database is used. None
temp bool Create a temporary table False
overwrite bool If True, replace the table if it already exists, otherwise fail if the table exists False
connector_properties dict | None The properties of the sink connector, providing the connector settings to push to the downstream data sink. Refer https://docs.risingwave.com/docs/current/data-delivery/ for the required properties of different data sink. None
data_format str | None The data format for the new source, e.g., “PLAIN”. data_format and encode_format must be specified at the same time. None
encode_format str | None The encode format for the new source, e.g., “JSON”. data_format and encode_format must be specified at the same time. None
encode_properties dict | None The properties of encode format, providing information like schema registry url. Refer https://docs.risingwave.com/docs/current/sql-create-source/ for more details. None

Returns

Name Type Description
Table Table expression

create_view

create_view(self, name, obj, *, database=None, overwrite=False)

disconnect

disconnect(self)

drop_database

drop_database(self, name, catalog=None, force=False, cascade=False)

drop_materialized_view

drop_materialized_view(self, name, *, database=None, force=False)

Drop a materialized view.

Parameters

Name Type Description Default
name str Materialized view name to drop. required
database str | None Name of the database where the view exists, if not the default. None
force bool If False, an exception is raised if the view does not exist. False

drop_sink

drop_sink(self, name, *, database=None, force=False)

Drop a Sink.

Parameters

Name Type Description Default
name str Sink name to drop. required
database str | None Name of the database where the view exists, if not the default. None
force bool If False, an exception is raised if the source does not exist. False

drop_source

drop_source(self, name, *, database=None, force=False)

Drop a Source.

Parameters

Name Type Description Default
name str Source name to drop. required
database str | None Name of the database where the view exists, if not the default. None
force bool If False, an exception is raised if the source does not exist. False

drop_table

drop_table(self, name, database=None, force=False)

drop_view

drop_view(self, name, *, database=None, force=False)

execute

execute(self, expr, params=None, limit='default', **kwargs)

Execute an expression.

from_connection

from_connection(cls, con)

Create an Ibis client from an existing connection to a PostgreSQL database.

Parameters

Name Type Description Default
con psycopg2.extensions.connection An existing connection to a PostgreSQL database. required

function

function(self, name, *, database=None)

get_schema

get_schema(self, name, *, catalog=None, database=None)

has_operation

has_operation(cls, operation)

insert

insert(self, table_name, obj, database=None, overwrite=False)

Insert data into a table.

Ibis does not use the word schema to refer to database hierarchy.

A collection of table is referred to as a database. A collection of database is referred to as a catalog.

These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.

Parameters

Name Type Description Default
table_name str The name of the table to which data needs will be inserted required
obj pd.DataFrame | ir.Table | list | dict The source data or expression to insert required
database str | None Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). None
overwrite bool If True then replace existing contents of table False

list_catalogs

list_catalogs(self, like=None)

list_databases

list_databases(self, *, like=None, catalog=None)

list_tables

list_tables(self, like=None, database=None)

List the tables in the database.

Ibis does not use the word schema to refer to database hierarchy.

A collection of tables is referred to as a database. A collection of database is referred to as a catalog.

These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.

Parameters

Name Type Description Default
like str | None A pattern to use for listing tables. None
database tuple[str, str] | str | None Database to list tables from. Default behavior is to show tables in the current database. None

raw_sql

raw_sql(self, query, **kwargs)

read_csv

read_csv(self, path, table_name=None, **kwargs)

Register a CSV file as a table in the current backend.

Parameters

Name Type Description Default
path str | Path The data source. A string or Path to the CSV file. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
**kwargs Any Additional keyword arguments passed to the backend loading function. {}

Returns

Name Type Description
ir.Table The just-registered table

read_delta

read_delta(self, source, table_name=None, **kwargs)

Register a Delta Lake table in the current database.

Parameters

Name Type Description Default
source str | Path The data source. Must be a directory containing a Delta Lake table. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
**kwargs Any Additional keyword arguments passed to the underlying backend or library. {}

Returns

Name Type Description
ir.Table The just-registered table.

read_json

read_json(self, path, table_name=None, **kwargs)

Register a JSON file as a table in the current backend.

Parameters

Name Type Description Default
path str | Path The data source. A string or Path to the JSON file. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
**kwargs Any Additional keyword arguments passed to the backend loading function. {}

Returns

Name Type Description
ir.Table The just-registered table

read_parquet

read_parquet(self, path, table_name=None, **kwargs)

Register a parquet file as a table in the current backend.

Parameters

Name Type Description Default
path str | Path The data source. required
table_name str | None An optional name to use for the created table. This defaults to a sequentially generated name. None
**kwargs Any Additional keyword arguments passed to the backend loading function. {}

Returns

Name Type Description
ir.Table The just-registered table

reconnect

reconnect(self)

Reconnect to the database already configured with connect.

register_options

register_options(cls)

Register custom backend options.

rename_table

rename_table(self, old_name, new_name)

Rename an existing table.

Parameters

Name Type Description Default
old_name str The old name of the table. required
new_name str The new name of the table. required

sql

sql(self, query, schema=None, dialect=None)

table

table(self, name, database=None)

Construct a table expression.

Parameters

Name Type Description Default
name str Table name required
database tuple[str, str] | str | None Database name None

Returns

Name Type Description
Table Table expression

to_csv

to_csv(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a CSV file.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to CSV. required
path str | Path The data source. A string or Path to the CSV file. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
kwargs Any Additional keyword arguments passed to pyarrow.csv.CSVWriter {}
https required

to_delta

to_delta(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a Delta Lake table.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to Delta Lake table. required
path str | Path The data source. A string or Path to the Delta Lake table. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
kwargs Any Additional keyword arguments passed to deltalake.writer.write_deltalake method {}

to_pandas

to_pandas(self, expr, *, params=None, limit=None, **kwargs)

Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.

Note

This method is a wrapper around execute.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs Any Keyword arguments {}

to_pandas_batches

to_pandas_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **kwargs)

Execute an Ibis expression and return an iterator of pandas DataFrames.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
chunk_size int Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. 1000000
kwargs Any Keyword arguments {}

Returns

Name Type Description
Iterator[pd.DataFrame] An iterator of pandas DataFrames.

to_parquet

to_parquet(self, expr, path, *, params=None, **kwargs)

Write the results of executing the given expression to a parquet file.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to parquet. required
path str | Path The data source. A string or Path to the parquet file. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
**kwargs Any Additional keyword arguments passed to pyarrow.parquet.ParquetWriter {}
https required

to_parquet_dir

to_parquet_dir(self, expr, directory, *, params=None, **kwargs)

Write the results of executing the given expression to a parquet file in a directory.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Table The ibis expression to execute and persist to parquet. required
directory str | Path The data source. A string or Path to the directory where the parquet file will be written. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
**kwargs Any Additional keyword arguments passed to pyarrow.dataset.write_dataset {}
https required

to_polars

to_polars(self, expr, *, params=None, limit=None, **kwargs)

Execute expression and return results in as a polars DataFrame.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to export to polars. required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs Any Keyword arguments {}

Returns

Name Type Description
dataframe A polars DataFrame holding the results of the executed expression.

to_pyarrow

to_pyarrow(self, expr, *, params=None, limit=None, **kwargs)

Execute expression and return results in as a pyarrow table.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to export to pyarrow required
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
kwargs Any Keyword arguments {}

Returns

Name Type Description
Table A pyarrow table holding the results of the executed expression.

to_pyarrow_batches

to_pyarrow_batches(self, expr, *, params=None, limit=None, chunk_size=1000000, **_)

Execute expression and return an iterator of pyarrow record batches.

This method is eager and will execute the associated expression immediately.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to export to pyarrow required
limit int | str | None An integer to effect a specific row limit. A value of None means “no limit”. The default is in ibis/config.py. None
params Mapping[ir.Scalar, Any] | None Mapping of scalar parameter expressions to value. None
chunk_size int Maximum number of rows in each returned record batch. 1000000

Returns

Name Type Description
RecordBatchReader Collection of pyarrow RecordBatchs.

to_torch

to_torch(self, expr, *, params=None, limit=None, **kwargs)

Execute an expression and return results as a dictionary of torch tensors.

Parameters

Name Type Description Default
expr ir.Expr Ibis expression to execute. required
params Mapping[ir.Scalar, Any] | None Parameters to substitute into the expression. None
limit int | str | None An integer to effect a specific row limit. A value of None means no limit. None
kwargs Any Keyword arguments passed into the backend’s to_torch implementation. {}

Returns

Name Type Description
dict[str, torch.Tensor] A dictionary of torch tensors, keyed by column name.

truncate_table

truncate_table(self, name, database=None)

Delete all rows from a table.

Ibis does not use the word schema to refer to database hierarchy.

A collection of tables is referred to as a database. A collection of database is referred to as a catalog. These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.

Parameters

Name Type Description Default
name str Table name required
database str | None Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). None
Back to top