Materialize

Install
Install Ibis and dependencies for the Materialize backend:
pip install 'ibis-framework[materialize]'And connect:
import ibis
con = ibis.materialize.connect()- 1
- Adjust connection parameters as needed.
Connect
ibis.materialize.connect
con = ibis.materialize.connect(
user="materialize",
password="password",
host="localhost",
port=6875,
database="materialize",
cluster="quickstart", # Optional: specify default cluster
)ibis.materialize.connect is a thin wrapper around ibis.backends.materialize.Backend.do_connect.
Connection Parameters
do_connect
do_connect(self, host=None, user=None, password=None, port=6875, database=None, schema=None, autocommit=True, cluster=None, **kwargs)
Create an Ibis client connected to Materialize database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| host | str | None | Hostname | None |
| user | str | None | Username | None |
| password | str | None | Password | None |
| port | int | Port number (default: 6875 for Materialize) | 6875 |
| database | str | None | Database to connect to | None |
| schema | str | None | Schema to use. If None, uses the default search_path. |
None |
| autocommit | bool | Whether or not to autocommit (default: True) | True |
| cluster | str | None | Default cluster to use for queries. If None, uses Materialize’s default cluster. You can change clusters later with SET CLUSTER. |
None |
| kwargs | Any | Additional keyword arguments to pass to the backend client connection. | {} |
Examples
>>> import os
>>> import ibis
>>> host = os.environ.get("IBIS_TEST_MATERIALIZE_HOST", "localhost")
>>> user = os.environ.get("IBIS_TEST_MATERIALIZE_USER", "materialize")
>>> password = os.environ.get("IBIS_TEST_MATERIALIZE_PASSWORD", "")
>>> database = os.environ.get(
... "IBIS_TEST_MATERIALIZE_DATABASE", "materialize"
... )
>>> con = ibis.materialize.connect(
... database=database, host=host, user=user, password=password
... )
>>> con.list_tables()
[...]Connect with a specific cluster:
>>> con = ibis.materialize.connect(
... database="materialize", host="localhost", user="materialize", cluster="quickstart"
... )ibis.connect URL format
In addition to ibis.materialize.connect, you can also connect to Materialize by passing a properly-formatted connection URL to ibis.connect:
con = ibis.connect(f"materialize://{user}:{password}@{host}:{port}/{database}")materialize.Backend
alter_cluster
alter_cluster(self, name, /, *, rename_to=None, set_options=None, reset_options=None)
Alter a cluster’s configuration.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Cluster name to alter. | required |
| rename_to | str | None | New name for the cluster (for rename operations). | None |
| set_options | dict[str, Any] | None | Dictionary of options to set, e.g.: - ‘SIZE’: Cluster size (e.g., ‘25cc’, ‘50cc’). Use list_cluster_sizes() to discover available sizes. - ‘REPLICATION FACTOR’: Number of replicas - does not increase workload capacity (int) - ‘DISK’: Enable disk storage (bool) - ‘INTROSPECTION INTERVAL’: Collection interval (str like ‘1s’, ‘0’ to disable) - ‘INTROSPECTION DEBUGGING’: Enable debugging data (bool) |
None |
| reset_options | list[str] | None | List of option names to reset to defaults: - ‘REPLICATION FACTOR’ - ‘INTROSPECTION INTERVAL’ - ‘INTROSPECTION DEBUGGING’ - ‘SCHEDULE’ | None |
alter_connection
alter_connection(self, name, /, *, set_options=None, reset_options=None, rotate_keys=False, database=None, schema=None)
Alter a connection’s configuration.
Modify connection parameters, reset them to defaults, or rotate SSH tunnel keys.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Connection name to alter. | required |
| set_options | dict[str, str | Any] | None | Dictionary of connection options to set. Options depend on connection type: - Kafka: ‘BROKER’, ‘SASL MECHANISMS’, etc. - Postgres: ‘HOST’, ‘PORT’, ‘DATABASE’, etc. - AWS: ‘REGION’, ‘ACCESS KEY ID’, etc. Values can be strings or SECRET() references. | None |
| reset_options | list[str] | None | List of option names to reset to defaults. | None |
| rotate_keys | bool | If True, rotate SSH tunnel key pairs. Only valid for SSH TUNNEL connections. Requires manual update of SSH bastion server keys. | False |
| database | str | None | Name of the database (catalog) where the connection exists. | None |
| schema | str | None | Name of the schema where the connection exists. | None |
alter_secret
alter_secret(self, name, /, value, *, database=None, schema=None)
Alter a secret’s value.
Updates the value of an existing secret. Future connections, sources, and sinks will use the new value immediately. Note that existing running sources/sinks may continue caching the old secret for some time.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Secret name to alter. | required |
| value | str | New secret value (will be converted to bytea). | required |
| database | str | None | Name of the database (catalog) where the secret exists. | None |
| schema | str | None | Name of the schema where the secret exists. | None |
alter_sink
alter_sink(self, name, /, *, set_from, database=None, schema=None)
Alter a sink to read from a different upstream relation.
Allows cutting a sink over to a new upstream relation (table, view, or materialized view) without disrupting downstream consumers. Useful for blue/green deployments.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Sink name to alter. | required |
| set_from | str | Name of the new upstream relation (table/view/materialized view) to read from. The new relation must be compatible with the original sink definition. | required |
| database | str | None | Name of the database (catalog) where the sink exists. | None |
| schema | str | None | Name of the schema where the sink exists. | None |
alter_source
alter_source(self, name, /, *, add_subsources=None, database=None, schema=None)
Alter a source.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Source name to alter. | required |
| add_subsources | list[tuple[str, str]] | list[str] | None | Tables to add as subsources. Can be: - List of table names: [‘table1’, ‘table2’] - List of (table_name, subsource_name) tuples: [(‘table1’, ‘sub1’)] | None |
| database | str | None | Name of the database (catalog) where the source exists. | None |
| schema | str | None | Name of the schema where the source exists. | None |
begin
begin(self)
compile
compile(self, expr, /, *, limit=None, params=None, pretty=False)
Compile an expression to a SQL string.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | An ibis expression to compile. | required |
| limit | int | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
| params | Mapping[ir.Expr, Any] | None | Mapping of scalar parameter expressions to value. | None |
| pretty | bool | Pretty print the SQL query during compilation. | False |
Returns
| Name | Type | Description |
|---|---|---|
| str | Compiled expression |
connect
connect(self, *args, **kwargs)
Connect to the database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| *args | Mandatory connection parameters, see the docstring of do_connect for details. |
() |
|
| **kwargs | Extra connection parameters, see the docstring of do_connect for details. |
{} |
Notes
This creates a new backend instance with saved args and kwargs, then calls reconnect and finally returns the newly created and connected backend instance.
Returns
| Name | Type | Description |
|---|---|---|
| BaseBackend | An instance of the backend |
create_cluster
create_cluster(self, name, /, *, size=None, replication_factor=1, disk=False, introspection_interval=None, introspection_debugging=False, managed=True)
Create a cluster in Materialize.
Clusters provide resource isolation and computational resources for running queries, materialized views, indexes, sources, and sinks.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Cluster name to create. | required |
| size | str | None | Cluster size (e.g., ‘25cc’, ‘50cc’, ‘100cc’, ‘200cc’, etc.). Required for managed clusters. Use list_cluster_sizes() to discover available sizes in your Materialize instance. |
None |
| replication_factor | int | Number of replicas (default: 1). Set to 0 to create an empty cluster. | 1 |
| disk | bool | Whether replicas should have disk storage (default: False). | False |
| introspection_interval | str | None | Introspection data collection interval (default: ‘1s’). Set to ‘0’ to disable introspection. | None |
| introspection_debugging | bool | Enable introspection debugging data (default: False). | False |
| managed | bool | Whether to create a managed cluster (default: True). Unmanaged clusters require manual replica management. | True |
create_connection
create_connection(self, name, /, *, connection_type, properties, database=None, schema=None, validate=True)
Create a connection in Materialize.
Connections store reusable connection configurations for sources and sinks. They enable secure credential management and connection reuse across multiple streaming objects.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Connection name to create. | required |
| connection_type | str | Type of connection: ‘KAFKA’, ‘POSTGRES’, ‘MYSQL’, ‘AWS’, ‘SSH TUNNEL’, ‘AWS PRIVATELINK’, ‘CONFLUENT SCHEMA REGISTRY’ | required |
| properties | dict[str, str | Any] | Connection-specific properties as key-value pairs, e.g.: - Kafka: {‘BROKER’: ‘localhost:9092’, ‘SASL MECHANISMS’: ‘PLAIN’, …} - Postgres: {‘HOST’: ‘localhost’, ‘PORT’: ‘5432’, ‘DATABASE’: ‘mydb’, …} - AWS: {‘REGION’: ‘us-east-1’, ‘ACCESS KEY ID’: SECRET(‘aws_key’), …} | required |
| database | str | None | Name of the database (catalog) where the connection will be created. | None |
| schema | str | None | Name of the schema where the connection will be created. | None |
| validate | bool | Whether to validate the connection (default: True). Set to False to create without validation. | True |
create_database
create_database(self, name, /, *, catalog=None, force=False)
create_index
create_index(self, name, /, table, *, expressions=None, cluster=None, database=None, unique=False)
Create an index in Materialize.
In Materialize, indexes store query results in memory within a specific cluster, and keep these results incrementally updated as new data arrives. This ensures that indexed data remains fresh, reflecting the latest changes with minimal latency.
The primary use case for indexes is to accelerate direct queries issued via SELECT statements. By maintaining fresh, up-to-date results in memory, indexes can significantly optimize query performance, reducing both response time and compute load—especially for resource-intensive operations such as joins, aggregations, and repeated subqueries.
Because indexes are scoped to a single cluster, they are most useful for accelerating queries within that cluster. For results that must be shared across clusters or persisted to durable storage, consider using a materialized view, which also maintains fresh results but is accessible system-wide.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Name of the index to create | required |
| table | str | Name of the table, view, or materialized view to index | required |
| expressions | list[str] | None | List of column names or SQL expressions to index. If None, creates a default index where Materialize automatically determines the best key columns. | None |
| cluster | str | None | Name of the cluster to maintain the index. If None, uses the active cluster. | None |
| database | str | None | Schema/database where the index should be created. If None, uses the current database. | None |
| unique | bool | Whether the index enforces uniqueness. This parameter is included for API compatibility but is always False for Materialize, as Materialize indexes do not enforce uniqueness constraints. | False |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()Create a default index (Materialize chooses key columns):
>>> con.create_index("orders_idx", "orders")Create an index on a specific column:
>>> con.create_index(
... "orders_customer_idx", "orders", expressions=["customer_id"]
... )Create a multi-column index:
>>> con.create_index(
... "orders_composite_idx", "orders", expressions=["customer_id", "order_date"]
... )Create an index with an expression:
>>> con.create_index(
... "customers_upper_idx", "customers", expressions=["upper(email)"]
... )Create an index in a specific cluster:
>>> con.create_index("orders_idx", "orders", cluster="production")Notes
- Default indexes let Materialize automatically choose the best columns
- Indexes consume memory proportional to the indexed data size
- Creating indexes on large datasets can take time
- Materialize indexes only support the ‘arrangement’ method internally
create_materialized_view
create_materialized_view(self, name, /, obj, *, database=None, schema=None, overwrite=False)
Create a materialized view.
Materialized views that maintains fresh results by incrementally updating them as new data arrives. They are particularly useful when you need cross-cluster access to results or want to sink data to external systems like Kafka. When you create a materialized view, you specify a cluster responsible for maintaining it, but the results can be queried from any cluster. This allows you to separate the compute resources used for view maintenance from those used for serving queries.
If you do not need cross-cluster sharing, and you are primarily interested in fast query performance within a single cluster, you may prefer to create a view and index it. In Materialize, indexes on views also maintain results incrementally, but store them in memory, scoped to the cluster where the index was created. This approach offers lower latency for direct querying within that cluster.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Materialized view name to create. | required |
| obj | ibis.expr.types.Table | The select statement to materialize. | required |
| database | str | None | Name of the database (catalog) where the view will be created. | None |
| schema | str | None | Name of the schema where the view will be created. | None |
| overwrite | bool | Whether to overwrite the existing materialized view with the same name. Uses CREATE OR REPLACE syntax. | False |
Returns
| Name | Type | Description |
|---|---|---|
| Table | Table expression representing the materialized view |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()
>>> table = con.table("orders")
>>> daily_summary = table.group_by("date").aggregate(
... total=table.amount.sum(), count=table.count()
... )
>>> mv = con.create_materialized_view("daily_orders", daily_summary)create_secret
create_secret(self, name, /, value, *, database=None, schema=None)
Create a secret in Materialize.
Secrets store sensitive data like passwords, API keys, and certificates. They can be referenced in connections and other objects.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Secret name to create. | required |
| value | str | Secret value (plain text or base64 encoded). | required |
| database | str | None | Name of the database (catalog) where the secret will be created. | None |
| schema | str | None | Name of the schema where the secret will be created. | None |
create_sink
create_sink(self, name, /, *, sink_from=None, obj=None, connector=None, connection=None, properties=None, format_spec=None, envelope=None, key=None, database=None, schema=None)
Create a sink in Materialize.
Sinks allow you to stream data from Materialize to external systems.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Sink name to create. | required |
| sink_from | str | None | Name of the table/materialized view/source to sink from. Either sink_from or obj must be specified (RisingWave compatibility). |
None |
| obj | ibis.expr.types.Table | None | Ibis table expression to sink from. Either sink_from or obj must be specified (RisingWave compatibility). |
None |
| connector | str | None | Type of connector: ‘KAFKA’ (default if connection is provided). | None |
| connection | str | None | Name of the connection object (for Kafka). Must be created beforehand using CREATE CONNECTION. | None |
| properties | dict[str, str] | None | Connector-specific properties, e.g.: - Kafka: {‘TOPIC’: ‘events’} | None |
| format_spec | dict[str, str] | None | Format specifications. Can specify either: - Single format: {‘FORMAT’: ‘JSON’} - Key/Value formats: {‘KEY FORMAT’: ‘TEXT’, ‘VALUE FORMAT’: ‘JSON’} | None |
| envelope | str | None | Data envelope type: ‘UPSERT’ or ‘DEBEZIUM’ | None |
| key | list[str] | None | List of column names to use as the message key. Required for UPSERT envelope. | None |
| database | str | None | Name of the database (catalog) where the sink will be created. | None |
| schema | str | None | Name of the schema where the sink will be created. | None |
create_source
create_source(self, name, /, *, source_schema=None, database=None, schema=None, connection=None, connector=None, properties=None, format_spec=None, envelope=None, include_properties=None, for_all_tables=False, for_schemas=None, for_tables=None)
Create a source in Materialize.
This method supports creating sources from various systems including: - Load generators (AUCTION, TPCH, MARKETING) - Kafka/Redpanda message brokers - PostgreSQL, MySQL, SQL Server (CDC) - Webhooks
The API is designed for compatibility with RisingWave’s create_source while supporting Materialize-specific features.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Source name to create. | required |
| source_schema | ibis.Schema | None | Ibis schema defining the structure of data from the source (columns and types). Required for some Kafka sources to specify the shape of incoming data. | None |
| database | str | None | Name of the database (catalog) where the source will be created. | None |
| schema | str | None | Name of the schema where the source will be created. | None |
| connection | str | None | Name of the connection object (for Kafka, Postgres, MySQL, etc.). Must be created beforehand using CREATE CONNECTION. | None |
| connector | str | None | Type of connector: ‘AUCTION’, ‘TPCH’, ‘KAFKA’, ‘POSTGRES’, etc. Load generator types (AUCTION, TPCH, MARKETING, etc.) are detected automatically. | None |
| properties | dict[str, str] | None | Connector-specific properties, e.g.: - Kafka: {‘TOPIC’: ‘my_topic’} - Postgres: {‘PUBLICATION’: ‘my_pub’} - Load Generator: {‘TICK INTERVAL’: ‘1s’, ‘SCALE FACTOR’: ‘0.01’} | None |
| format_spec | dict[str, str] | None | Format specifications, e.g.: {‘KEY FORMAT’: ‘JSON’, ‘VALUE FORMAT’: ‘JSON’} or {‘FORMAT’: ‘JSON’} for non-Kafka sources | None |
| envelope | str | None | Data envelope type: ‘NONE’, ‘UPSERT’, or ‘DEBEZIUM’ | None |
| include_properties | list[str] | None | List of metadata to include, e.g., [‘KEY’, ‘PARTITION’, ‘OFFSET’] | None |
| for_all_tables | bool | Create subsources for all tables (Postgres/MySQL) or all load generator tables | False |
| for_schemas | list[str] | None | List of schemas to create subsources for (Postgres/MySQL) | None |
| for_tables | list[tuple[str, str]] | None | List of (table_name, subsource_name) tuples | None |
Returns
| Name | Type | Description |
|---|---|---|
| Table | None | Table expression for the source. Returns None for multi-table sources (when for_all_tables=True). |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()>>> # Load generator
>>> auction = con.create_source(
... "my_auction", connector="AUCTION", properties={"TICK INTERVAL": "500ms"}
... )>>> # Kafka source
>>> kafka_src = con.create_source(
... "kafka_data",
... connector="KAFKA",
... connection="kafka_conn",
... properties={"TOPIC": "my_topic"},
... format_spec={"FORMAT": "JSON"},
... envelope="UPSERT",
... )>>> # PostgreSQL CDC
>>> pg_src = con.create_source(
... "pg_tables",
... connector="POSTGRES",
... connection="pg_conn",
... properties={"PUBLICATION": "mz_source"},
... for_all_tables=True,
... )create_table
create_table(self, name, /, obj=None, *, schema=None, database=None, temp=False, overwrite=False)
Create a table in Materialize.
create_view
create_view(self, name, /, obj, *, database=None, overwrite=False)
Create a view from an Ibis expression.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | The name of the view to create. | required |
| obj | ir.Table | The Ibis expression to create the view from. | required |
| database | str | None | The database that the view should be created in. | None |
| overwrite | bool | If True, replace an existing view with the same name. |
False |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | A table expression representing the view. |
disconnect
disconnect(self)
Disconnect from the backend.
drop_cluster
drop_cluster(self, name, /, *, force=False, cascade=False)
Drop a cluster.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Cluster name to drop. | required |
| force | bool | If False, an exception is raised if the cluster does not exist. |
False |
| cascade | bool | If True, drop dependent objects (indexes, materialized views) as well. |
False |
drop_connection
drop_connection(self, name, /, *, database=None, schema=None, force=False, cascade=False)
Drop a connection.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Connection name to drop. | required |
| database | str | None | Name of the database (catalog) where the connection exists, if not the default. | None |
| schema | str | None | Name of the schema where the connection exists, if not the default. | None |
| force | bool | If False, an exception is raised if the connection does not exist. |
False |
| cascade | bool | If True, drop dependent objects (sources, sinks) as well. |
False |
drop_database
drop_database(self, name, /, *, catalog=None, force=False, cascade=False)
drop_index
drop_index(self, name, /, *, database=None, force=False)
Drop an index from Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Name of the index to drop | required |
| database | str | None | Schema/database where the index exists. If None, uses the current database. | None |
| force | bool | If True, does not raise an error if the index does not exist (uses IF EXISTS) | False |
drop_materialized_view
drop_materialized_view(self, name, /, *, database=None, schema=None, force=False, cascade=False)
Drop a materialized view.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Materialized view name to drop. | required |
| database | str | None | Name of the database (catalog) where the view exists, if not the default. | None |
| schema | str | None | Name of the schema where the view exists, if not the default. | None |
| force | bool | If False, an exception is raised if the view does not exist. |
False |
| cascade | bool | If True, also drop dependent objects (views, indexes, etc.). |
False |
drop_secret
drop_secret(self, name, /, *, database=None, schema=None, force=False)
Drop a secret.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Secret name to drop. | required |
| database | str | None | Name of the database (catalog) where the secret exists, if not the default. | None |
| schema | str | None | Name of the schema where the secret exists, if not the default. | None |
| force | bool | If False, an exception is raised if the secret does not exist. |
False |
drop_sink
drop_sink(self, name, /, *, database=None, schema=None, force=False)
Drop a sink.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Sink name to drop. | required |
| database | str | None | Name of the database (catalog) where the sink exists, if not the default. | None |
| schema | str | None | Name of the schema where the sink exists, if not the default. | None |
| force | bool | If False, an exception is raised if the sink does not exist. |
False |
drop_source
drop_source(self, name, /, *, database=None, schema=None, force=False, cascade=False)
Drop a source.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Source name to drop. | required |
| database | str | None | Name of the database (catalog) where the source exists, if not the default. | None |
| schema | str | None | Name of the schema where the source exists, if not the default. | None |
| force | bool | If False, an exception is raised if the source does not exist. |
False |
| cascade | bool | If True, also drops dependent objects (views, materialized views). |
False |
drop_table
drop_table(self, name, /, *, database=None, force=False)
drop_view
drop_view(self, name, /, *, database=None, force=False)
Drop a view from the backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | The name of the view to drop. | required |
| database | str | None | The database that the view is located in. | None |
| force | bool | If True, do not raise an error if the view does not exist. |
False |
execute
execute(self, expr, /, *, params=None, limit=None, **kwargs)
Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| DataFrame | Series | scalar | The result of the expression execution. |
from_connection
from_connection(cls, con, /)
Create an Ibis client from an existing connection to a PostgreSQL database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| con | psycopg.Connection | An existing connection to a PostgreSQL database. | required |
function
function(self, name, *, database=None)
get_schema
get_schema(self, name, *, catalog=None, database=None)
Get the schema for a table, view, or materialized view.
has_operation
has_operation(cls, operation, /)
Return whether the backend supports the given operation.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| operation | type[ops.Value] | Operation type, a Python class object. | required |
insert
insert(self, name, /, obj, *, database=None, overwrite=False)
Insert data into a table.
schema to refer to database hierarchy.
A collection of table is referred to as a database. A collection of database is referred to as a catalog.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | The name of the table to which data will be inserted | required |
| obj | pd.DataFrame | ir.Table | list | dict | The source data or expression to insert | required |
| database | str | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). |
None |
| overwrite | bool | If True then replace existing contents of table |
False |
list_catalogs
list_catalogs(self, *, like=None)
list_cluster_sizes
list_cluster_sizes(self)
List available cluster replica sizes in Materialize.
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of available cluster size names (e.g., ‘25cc’, ‘50cc’, ‘100cc’) |
list_clusters
list_clusters(self, *, like=None)
List clusters in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| like | str | None | Pattern to filter cluster names (SQL LIKE syntax). | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of cluster names |
list_connections
list_connections(self, *, database=None, like=None)
List connections in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| database | str | None | Database/schema to list connections from. If None, uses current database. | None |
| like | str | None | Pattern to filter connection names (SQL LIKE syntax). | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of connection names |
list_databases
list_databases(self, *, like=None, catalog=None)
list_indexes
list_indexes(self, *, table=None, database=None, cluster=None, like=None)
List indexes in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| table | str | None | Filter indexes for a specific table, view, or materialized view | None |
| database | str | None | Filter indexes by schema/database name | None |
| cluster | str | None | Filter indexes by cluster name | None |
| like | str | None | Filter index names using SQL LIKE pattern (e.g., “orders%”) | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of index names matching the filters |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()List all indexes:
>>> con.list_indexes()
['orders_idx', 'customers_idx', ...]List indexes on a specific table:
>>> con.list_indexes(table="orders")
['orders_idx', 'orders_customer_idx']List indexes in a specific cluster:
>>> con.list_indexes(cluster="production")
['orders_idx', 'products_idx']List indexes with a name pattern:
>>> con.list_indexes(like="orders%")
['orders_idx', 'orders_customer_idx', 'orders_composite_idx']Combine filters:
>>> con.list_indexes(table="orders", cluster="production")
['orders_idx']list_materialized_views
list_materialized_views(self, *, database=None, like=None)
List materialized views in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| database | str | None | Database/schema to list materialized views from. If None, uses current database. | None |
| like | str | None | Pattern to filter materialized view names (SQL LIKE syntax). | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of materialized view names |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()
>>> con.list_materialized_views()
['daily_orders', 'weekly_summary', 'user_stats']
>>> con.list_materialized_views(like="daily%")
['daily_orders']list_secrets
list_secrets(self, *, database=None, like=None)
List secrets in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| database | str | None | Database/schema to list secrets from. If None, uses current database. | None |
| like | str | None | Pattern to filter secret names (SQL LIKE syntax). | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of secret names |
list_sinks
list_sinks(self, *, database=None, like=None)
List sinks in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| database | str | None | Database/schema to list sinks from. If None, uses current database. | None |
| like | str | None | Pattern to filter sink names (SQL LIKE syntax). | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of sink names |
list_sources
list_sources(self, *, database=None, like=None)
List sources in Materialize.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| database | str | None | Database/schema to list sources from. If None, uses current database. | None |
| like | str | None | Pattern to filter source names (SQL LIKE syntax). | None |
Returns
| Name | Type | Description |
|---|---|---|
| list[str] | List of source names |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()
>>> con.list_sources()
['my_counter', 'auction_house', 'kafka_source']
>>> con.list_sources(like="auction%")
['auction_house']list_tables
list_tables(self, *, like=None, database=None)
raw_sql
raw_sql(self, query, **kwargs)
read_csv
read_csv(self, path, /, *, table_name=None, **kwargs)
Register a CSV file as a table in the current backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. A string or Path to the CSV file. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table |
read_delta
read_delta(self, path, /, *, table_name=None, **kwargs)
Register a Delta Lake table in the current database.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. Must be a directory containing a Delta Lake table. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the underlying backend or library. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table. |
read_json
read_json(self, path, /, *, table_name=None, **kwargs)
Register a JSON file as a table in the current backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. A string or Path to the JSON file. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table |
read_parquet
read_parquet(self, path, /, *, table_name=None, **kwargs)
Register a parquet file as a table in the current backend.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| path | str | Path | The data source. | required |
| table_name | str | None | An optional name to use for the created table. This defaults to a sequentially generated name. | None |
| **kwargs | Any | Additional keyword arguments passed to the backend loading function. | {} |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The just-registered table |
reconnect
reconnect(self)
Reconnect to the database already configured with connect.
register_options
register_options(cls)
Register custom backend options.
rename_table
rename_table(self, old_name, new_name)
Rename an existing table.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| old_name | str | The old name of the table. | required |
| new_name | str | The new name of the table. | required |
set_cluster
set_cluster(self, name)
Set the active cluster for this session.
This changes which cluster will be used for subsequent queries, materialized views, indexes, and other compute operations.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Name of the cluster to switch to | required |
Examples
>>> import ibis
>>> con = ibis.materialize.connect()
>>> con.set_cluster("production")
>>> con.current_cluster
'production'Switch clusters for different workloads:
>>> con.set_cluster("analytics")
>>> result = con.table("large_dataset").aggregate(...)
>>> con.set_cluster("quickstart")See Also
current_cluster : Get the currently active cluster list_clusters : List all available clusters create_cluster : Create a new cluster
sql
sql(self, query, /, *, schema=None, dialect=None)
Create an Ibis table expression from a SQL query.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| query | str | A SQL query string | required |
| schema | IntoSchema | None | The schema of the query. If not provided, Ibis will try to infer the schema of the query. | None |
| dialect | str | None | The SQL dialect of the query. If not provided, the backend’s dialect is assumed. This argument can be useful when the query is written in a different dialect from the backend. | None |
Returns
| Name | Type | Description |
|---|---|---|
| ir.Table | The table expression representing the query |
subscribe
subscribe(self, obj, /, *, envelope=None, snapshot=True, as_of=None, up_to=None, progress=False, batch_size=1000, format='pandas')
Subscribe to real-time changes in a table, view, or materialized view.
SUBSCRIBE enables streaming change data capture (CDC) from Materialize relations. Unlike regular queries that return a snapshot, SUBSCRIBE continuously streams updates as they happen, making it ideal for:
- Real-time dashboards and monitoring
- Event-driven architectures and triggers
- Syncing data to external systems
- Live data pipelines
The stream continues indefinitely (unless up_to is specified) and delivers changes incrementally as pandas DataFrames.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| obj | str | ibis.expr.types.Table | Name of source/table/view/materialized view, or an Ibis table expression to subscribe to. | required |
| envelope | str | None | Output format: ‘UPSERT’ or ‘DEBEZIUM’. If None, uses default format with mz_diff column. | None |
| snapshot | bool | If True (default), emits the initial state before streaming changes. If False, only emits changes that occur after subscription starts. | True |
| as_of | int | None | Start streaming from this Materialize timestamp. | None |
| up_to | int | None | Stop streaming at this Materialize timestamp (for time-travel queries). | None |
| progress | bool | If True, emits progress updates in addition to data changes. | False |
| batch_size | int | Number of rows to fetch per batch (default: 1000). | 1000 |
| format | str | Output format for batches: ‘pandas’ (default), ‘arrow’, or ‘polars’. - ‘pandas’: Returns pandas DataFrames (familiar, feature-rich) - ‘arrow’: Returns PyArrow RecordBatches (efficient, zero-copy) - ‘polars’: Returns Polars DataFrames (fast, modern API) | 'pandas' |
Returns
| Name | Type | Description |
|---|---|---|
| Iterator[pd.DataFrame | pa.RecordBatch | pl.DataFrame] | Generator that yields batches of changes. Format depends on format parameter. Each batch includes: - mz_timestamp: Materialize’s logical timestamp for this change - mz_diff: Change type indicator: - 1 = row inserted (or new version after update) - -1 = row deleted (or old version before update) - 0 = progress message (only if progress=True) - All columns from the subscribed relation Important: Row updates appear as a delete (-1) followed by an insert (+1). Filter for mz_diff == 1 to see only current/new rows. |
table
table(self, name, /, *, database=None)
to_csv
to_csv(self, expr, /, path, *, params=None, **kwargs)
Write the results of executing the given expression to a CSV file.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to CSV. | required |
| path | str | Path | The data source. A string or Path to the CSV file. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| kwargs | Any | Additional keyword arguments passed to pyarrow.csv.CSVWriter | {} |
| https | required |
to_delta
to_delta(self, expr, /, path, *, params=None, **kwargs)
Write the results of executing the given expression to a Delta Lake table.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to Delta Lake table. | required |
| path | str | Path | The data source. A string or Path to the Delta Lake table. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| kwargs | Any | Additional keyword arguments passed to deltalake.writer.write_deltalake method | {} |
to_json
to_json(self, expr, /, path, **kwargs)
Write the results of expr to a json file of [{column -> value}, …] objects.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to Delta Lake table. | required |
| path | str | Path | The data source. A string or Path to the Delta Lake table. | required |
| kwargs | Any | Additional, backend-specifc keyword arguments. | {} |
to_pandas
to_pandas(self, expr, /, *, params=None, limit=None, **kwargs)
Execute an Ibis expression and return a pandas DataFrame, Series, or scalar.
This method is a wrapper around execute.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
to_pandas_batches
to_pandas_batches(self, expr, /, *, params=None, limit=None, chunk_size=1000000, **kwargs)
Execute an Ibis expression and return an iterator of pandas DataFrames.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/config.py. |
None |
| chunk_size | int | Maximum number of rows in each returned DataFrame batch. This may have no effect depending on the backend. |
1000000 |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| Iterator[pd.DataFrame] | An iterator of pandas DataFrames. |
to_parquet
to_parquet(self, expr, /, path, *, params=None, **kwargs)
Write the results of executing the given expression to a parquet file.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
| path | str | Path | The data source. A string or Path to the parquet file. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| **kwargs | Any | Additional keyword arguments passed to pyarrow.parquet.ParquetWriter | {} |
| https | required |
to_parquet_dir
to_parquet_dir(self, expr, /, directory, *, params=None, **kwargs)
Write the results of executing the given expression to a parquet file in a directory.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Table | The ibis expression to execute and persist to parquet. | required |
| directory | str | Path | The data source. A string or Path to the directory where the parquet file will be written. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| **kwargs | Any | Additional keyword arguments passed to pyarrow.dataset.write_dataset | {} |
| https | required |
to_polars
to_polars(self, expr, /, *, params=None, limit=None, **kwargs)
Execute expression and return results in as a polars DataFrame.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to export to polars. | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| dataframe | A polars DataFrame holding the results of the executed expression. |
to_pyarrow
to_pyarrow(self, expr, /, *, params=None, limit=None, **kwargs)
Execute expression to a pyarrow object.
This method is eager and will execute the associated expression immediately.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to export to pyarrow | required |
| params | Mapping[ir.Scalar, Any] | None | Mapping of scalar parameter expressions to value. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. The default is in ibis/config.py. |
None |
| kwargs | Any | Keyword arguments | {} |
Returns
| Name | Type | Description |
|---|---|---|
| result | If the passed expression is a Table, a pyarrow table is returned. If the passed expression is a Column, a pyarrow array is returned. If the passed expression is a Scalar, a pyarrow scalar is returned. |
to_pyarrow_batches
to_pyarrow_batches(self, expr, /, *, params=None, limit=None, chunk_size=1000000, **_)
to_torch
to_torch(self, expr, /, *, params=None, limit=None, **kwargs)
Execute an expression and return results as a dictionary of torch tensors.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| expr | ir.Expr | Ibis expression to execute. | required |
| params | Mapping[ir.Scalar, Any] | None | Parameters to substitute into the expression. | None |
| limit | int | str | None | An integer to effect a specific row limit. A value of None means no limit. |
None |
| kwargs | Any | Keyword arguments passed into the backend’s to_torch implementation. |
{} |
Returns
| Name | Type | Description |
|---|---|---|
| dict[str, torch.Tensor] | A dictionary of torch tensors, keyed by column name. |
truncate_table
truncate_table(self, name, /, *, database=None)
Delete all rows from a table.
schema to refer to database hierarchy.
A collection of table is referred to as a database. A collection of database is referred to as a catalog.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | Table name | required |
| database | str | tuple[str, str] | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). |
None |
upsert
upsert(self, name, /, obj, on, *, database=None)
Upsert data into a table.
schema to refer to database hierarchy.
A collection of table is referred to as a database. A collection of database is referred to as a catalog.
These terms are mapped onto the corresponding features in each backend (where available), regardless of whether the backend itself uses the same terminology.
Parameters
| Name | Type | Description | Default |
|---|---|---|---|
| name | str | The name of the table to which data will be upserted | required |
| obj | pd.DataFrame | ir.Table | list | dict | The source data or expression to upsert | required |
| on | str | Column name to join on | required |
| database | str | None | Name of the attached database that the table is located in. For backends that support multi-level table hierarchies, you can pass in a dotted string path like "catalog.database" or a tuple of strings like ("catalog", "database"). |
None |