SQLMesh simplifies managing RisingWave streaming SQL pipelines with declarative SQL, automated deployments, and safe schema changes for reliable real-time data streaming.
SQLMesh supports a wide range of use cases, from batch analytics to incremental transformations to stream processing. As more teams look to bridge traditional pipelines with real-time applications, we’re excited to launch integrations that make it easier. One of our newest additions is with RisingWave, an event stream processing platform that offers developers a unified, SQL-driven experience for real-time data ingestion, powerful but simplified stream processing, and low-latency serving. This post, originally authored by the RisingWave team, showcases how data teams can pair SQLMesh with RisingWave to build scalable, high-confidence streaming pipelines with built-in quality checks, safer testing, and version-controlled streaming queries.
If you’re already using SQLMesh and exploring streaming needs in your data stack, this cross-post is a good starting point to learn more about RisingWave. It gives a clear overview of how our tools complement one another, complete with a step-by-step implementation walkthrough. Read on for RisingWave’s deep dive into using SQLMesh as the control plane for streaming SQL workflows.
Streamlining Real-Time Pipelines: Managing RisingWave with SQLMesh
Demand for real-time insights is exploding. Businesses need to react instantly to changing conditions, personalize user experiences on the fly, and power operational dashboards with up-to-the-second data. RisingWave has emerged as a powerful force in this space, offering a PostgreSQL-compatible streaming database designed to handle complex SQL queries over streaming data via incrementally computed streaming queries. It's built for performance and ease of use for anyone familiar with SQL.
However, even with RisingWave simplifying the execution of streaming SQL, managing the development lifecycle of these streaming pipelines presents its own challenges, including how to:
- Version control your streaming queries?
- Safely test changes to a streaming query definition without impacting the live stream?
- Manage dependencies between different MVs?
- Ensure changes are rolled out consistently across environments?
- Integrate data quality checks directly into your streaming transformations?
Manually managing streaming queries, tracking dependencies, and coordinating updates is error-prone and quickly becomes unsustainable as pipelines grow. This is where SQLMesh steps in.
SQLMesh: The Control Plane for RisingWave Pipelines
SQLMesh is an open-source DataOps framework designed to manage the end-to-end lifecycle of all your data transformations, including the streaming queries powering your applications. Think of RisingWave as the highly efficient execution engine for stream processing, and SQLMesh as the intelligent control plane that orchestrates development, testing, and deployment.
Instead of writing ad-hoc DDL directly against RisingWave, you define your transformations declaratively within a SQLMesh project. Here’s how SQLMesh elevates your RisingWave workflow:
1. Declarative SQL as the Source of Truth: Define your RisingWave Materialized Views using standard SQL files within your SQLMesh project. You mark them specifically as materialized (kind=VIEW(materialized true)
). Your Git repository, not the database state, becomes the single source of truth for your pipeline logic.
2. Automated Lifecycle Management: SQLMesh understands your SQL definitions. When you run sqlmesh plan
, it analyzes changes and dependencies, determining the necessary DDL (CREATE MATERIALIZED VIEW, DROP
, etc.) to apply to RisingWave. Confirming the plan executes these commands safely and idempotently. No more manual DDL errors during deployment.
3. Safe Evolution with Environments: This is a game-changer. Want to test a change to an MV? Run sqlmesh plan staging
. SQLMesh will deploy a separate instance of your streaming query (and its dependencies) into a dedicated schema within RisingWave (e.g., sqlmesh_staging
), reading from the same underlying sources. You can validate the changes in isolation without touching your production streaming jobs. Once satisfied, you promote the changes to production (sqlmesh plan prod
followed by applying the plan).
4. Integrated Testing and Audits: Define data quality tests (Audits) directly within your SQLMesh project using SQL. SQLMesh can run these audits against your streaming queries (typically materialized views) in RisingWave after they've been created or updated, ensuring your streaming transformations meet expectations. This helps you catch data quality regressions before they impact downstream consumers.
5. Full Lineage Visibility: SQLMesh automatically parses your SQL and builds a dependency graph (lineage) of your MVs. Need to know what upstream MVs feed into a critical dashboard MV? Or what downstream MVs will be affected by a change? SQLMesh provides this visibility, which is crucial for impact analysis and debugging.
6. CI/CD for Streaming Pipelines: Treat your RisingWave pipeline definitions like any other software artifact. Integrate sqlmesh plan
(potentially with automated approval or checks) into your CI/CD workflows (GitHub Actions, GitLab CI, etc.) to automate testing and deployment, bringing DevOps best practices to your real-time data infrastructure.
Workflow Simplified
With SQLMesh and RisingWave, the development loop looks like this:
- Define/Modify: Write or update the SQL definition for a source, sink, or materialized view in your local SQLMesh project.
- Plan and Apply: Run
sqlmesh plan [environment]
. Review the proposed changes (DDL generation, dependencies impacted) for the target environment (e.g., dev, staging, prod). - Audit (Optional): Run
sqlmesh audit [environment]
to execute data quality tests against the deployed queries.
Demo: Aggregating Data in Real Time
Let's walk through a straightforward example using website click events to see how this works. Each event contains a timestamp
, event_type
, and value
.
Our goal is to calculate a rolling 5-minute count and sum of value for each event_type using a tumbling window. We'll then modify this logic to calculate an average, showcasing how SQLMesh handles the change.
Step 1: Set up RisingWave and Ingest Sample Data
First, we start RisingWave and manually insert some sample data into a base table.
- Start RisingWave via Docker.
docker run -it --pull=always -p 4566:4566 -p 5691:5691
risingwavelabs/risingwave:latest single_node
After you see RisingWave standalone mode
is ready , it means that RisingWave is started.
- Connect via
psql
(or another Postgres-compatible SQL client) and prepare the data
# Use localhost when connecting from your machine to the mapped port
psql -h localhost -p 4566 -d dev -U root
Inside psql
, run the following:
-- Create the source table to hold incoming data
CREATE TABLE click_events (
event_id INT PRIMARY KEY,
event_type VARCHAR,
event_value INT,
event_timestamp TIMESTAMPTZ
);
-- Insert seven rows of sample data
INSERT INTO click_events (event_id, event_type, event_value, event_timestamp) VALUES
(1, 'page_view', 10, '2023-10-27 10:00:30+00'),
(2, 'add_to_cart', 150, '2023-10-27 10:01:15+00'),
(3, 'page_view', 12, '2023-10-27 10:02:05+00'),
(4, 'page_view', 11, '2023-10-27 10:05:40+00'),
(5, 'purchase', 250, '2023-10-27 10:06:20+00'),
(6, 'page_view', 15, '2023-10-27 10:11:00+00'),
(7, 'add_to_cart', 99, '2023-10-27 10:12:30+00');
-- Optional: Verify inserts
SELECT COUNT(*) FROM click_events;
-- Exit psql
\\q
Step 2: Set up SQLMesh Project
Now, let's set up the SQLMesh environment to manage our RisingWave transformations.
- Create a project directory and Python virtual environment.
mkdir sqlmesh-risingwave-demo
cd sqlmesh-risingwave-demo
python3 -m venv venv
source venv/bin/activate # Adjust activation for your OS/shell
- Install the SQLMesh RisingWave adapter.
pip install "sqlmesh[risingwave]"
- Initialize the SQLMesh Project specifically for RisingWave.
sqlmesh init risingwave
This creates necessary folders and configuration files.
- Configure the connection in config.yaml file (created by init). Ensure the following matches or update your config.yaml.
gateways:
risingwave: # Gateway name used by default
connection:
type: risingwave
host: localhost
user: root
port: 4566 # Match the Docker port mapping
database: dev
default_gateway: risingwave
model_defaults:
dialect: risingwave
# start: <YYYY-MM-DD> # Optional: Default start date for backfills if needed
Step 3: Define and Deploy a Streaming Query
Let's create our first transformation model and deploy it.
- Create the SQLMesh model file
models/event_summary_tumbling.sql
.
```sql
-- models/event_summary_tumbling.sql
MODEL (
name reporting.event_summary_tumbling,
kind VIEW (materialized = true), -- This makes it a RisingWave MV
owner data_team,
description 'Summarizes event counts and total value in 5-minute tumbling windows.'
);
SELECT
window_start,
window_end,
event_type,
COUNT(*) AS event_count,
SUM(event_value) AS total_value
FROM TUMBLE(
click_events, -- Read from the base table
event_timestamp, -- Time column for windowing
INTERVAL '5 minutes' -- Tumbling window size
)
GROUP BY
window_start,
window_end,
event_type;
```
- Plan and apply the model using SQLMesh
sqlmesh plan
SQLMesh will detect the new model (reporting.event_summary_tumbling
) and show a plan to create it in the prod environment (which maps to a schema like sqlmesh_prod
or prod
in RisingWave, often including the model name itself, e.g., sqlmesh__reporting
). It will also detect the default models created by init; you can choose to apply changes only for your new model if desired, or apply all.
# Example Output Snippet:
# Summary of differences:
# Models:
# └── Added:
# └── reporting.event_summary_tumbling
# └── sqlmesh_example.full_model (if applying init models)
# └── ... (other init models)
#
# Apply - Apply the plan. [y/n]: y <-- Type 'y' and press Enter
SQLMesh executes the CREATE MATERIALIZED VIEW statement(s) in RisingWave.
- Verify the MV creation and content via
psql
.
The exact schema and MV name includes a hash for versioning. We can use SHOW
to find it.
Since we use reporting
as our model schema, the materialized view should be in the sqlmesh__reporting
schema.
show materialized views from sqlmesh__reporting;
-- Result below
Name
-----------------------------------------------
reporting__event_summary_tumbling__3036268106
(1 row)
You should see the aggregated results similar to below.
Step 4: Modify the MV Schema with SQLMesh
Now let’s change the aggregation logic and see how SQLMesh manages the update safely.
- Modify the model file
models/event_summary_tumbling.sql
to calculate the average value and distinct count.
-- models/event_summary_tumbling.sql
MODEL (
name reporting.event_summary_tumbling, -- Keep the same name
kind VIEW (materialized true),
owner data_team,
-- Update description to reflect change
description 'Calculates average event value and distinct event count in 5-minute tumbling windows.'
);
SELECT
window_start,
window_end,
event_type,
COUNT(DISTINCT event_id) AS distinct_event_count, -- Changed aggregation
AVG(event_value) AS average_value -- Changed aggregation
FROM TUMBLE(
click_events,
event_timestamp,
INTERVAL '5 minutes'
)
GROUP BY
window_start,
window_end,
event_type;
1. Run sqlmesh plan
again to see the impact.
SQLMesh detects the change in the definition of reporting.event_summary_tumbling
. Because the logic changed, it plans to:
a. Create a new version of the Materialized View in RisingWave with a different physical name (a new hash suffix).
b. It does not immediately drop the old version, allowing for validation or zero-downtime promotion strategies (though in this simple apply, the old one might eventually be cleaned up depending on settings).
2. Verify the new MV version via psql
.
-- List MVs again in the schema
SHOW MATERIALIZED VIEWS FROM sqlmesh__reporting; -- Replace schema if needed
-- You should now see TWO MVs related to event_summary_tumbling,
-- one with the old hash and one with a NEW hash.
-- Query the NEWER MV version (find its exact name)
SELECT * FROM sqlmesh__reporting."reporting__event_summary_tumbling__<new_hash>" ORDER BY window_start, event_type;
The results should show the distinct_event_count
and average_value columns
, reflecting the v2 logic. The original v1 MV still exists momentarily. This demonstrates SQLMesh's safe, versioned deployment approach.
Integrating Sink and Source Management
In our demo, we manually created the click_events
table in RisingWave for simplicity. However, SQLMesh provides pre- and post-statements within model definitions, allowing you to embed DDL statements that run before or after the main model logic executes.
You could, for example, place a CREATE SOURCE IF NOT EXISTS
... statement in pre-statement (that is, an SQL query prior to the SELECT
statement), and / or a CREATE SINK IF NOT EXISTS
... in a post-statement. This approach keeps the setup/teardown logic closer to the relevant transformation step within your SQLMesh project.
Keep in mind that SQLMesh's core focus is managing the transformation logic (the SELECT
statements) and dependencies across your entire pipeline (often multiple models). While pre- and post-statements offer integration points, managing complex CREATE SOURCE/SINK
statements with intricate connector configurations might still be cleaner handled outside the model definitions (e.g., separate setup scripts or specialized infrastructure tools) to maintain clarity.
Conclusion: Streamlined and Reliable Streaming Pipelines
Escape the complexities of manual streaming pipeline management. SQLMesh provides the essential control plane—declarative definitions, version control, automated deployments, safe schema changes—for RisingWave's efficient real-time SQL execution. This combination delivers streamlined workflows, improved data quality, and a robust foundation for building and evolving your streaming applications with confidence.
Resources
- RisingWave Documentation: https://docs.risingwave.com
- SQLMesh Documentation: https://sqlmesh.readthedocs.io/en/stable/
- Join the RisingWave open-source community: https://go.risingwave.com/slack
- Join the SQLMesh open-source community: https://www.tobikodata.com/slack