
In today’s fast-paced business environment, waiting for overnight batch processes to deliver insights is increasingly becoming a competitive disadvantage. Marketing teams need to respond to campaign performance within minutes, not days. Operations teams require instant visibility into supply chain disruptions. Finance departments demand real-time visibility into cash positions and transaction anomalies.
After implementing real-time analytics solutions for multiple enterprise clients, I’ve discovered that combining Snowflake’s streaming capabilities with Power BI creates a powerful yet manageable approach to solving these challenges. In this article, I’ll share a practical, end-to-end architecture for building near real-time reporting solutions that deliver insights within minutes of data creation—without overwhelming your engineering team or your cloud budget.
Before diving into the solution, let’s understand the common challenges that make real-time analytics difficult:
- Data Latency Gap: Traditional ETL processes run on schedules (hourly, daily), creating delays between data creation and availability for analysis
- Resource Intensity: Continuous processing typically requires significant compute resources
- Complexity: Traditional streaming architectures (Kafka, Flink, etc.) add substantial complexity to your data stack
- Maintenance Burden: Specialized streaming infrastructure requires specialized skills and ongoing maintenance
- Visualization Refresh Limitations: Most BI tools aren’t designed for true real-time data refreshing
The approach I’ll outline addresses these challenges by leveraging native Snowflake capabilities alongside Power BI’s refresh options to create a pragmatic, near real-time solution.
Here’s the high-level architecture we’ll implement:
- Source Systems → Generate transactional data (orders, events, etc.)
- Initial Load Layer → Snowflake table(s) that capture raw data
- Change Detection → Snowflake Streams that identify new/changed records
- Processing Pipeline → Snowflake Tasks that transform raw data and load it into analytics models
- Analytics Layer → Optimized Snowflake structures (tables/views) for reporting
- Visualization → Power BI with appropriate refresh strategy
The beauty of this approach is that it uses native cloud capabilities without requiring additional streaming platforms or infrastructure while delivering insights with just minutes of latency.
Let’s examine each component in detail.
The first step is getting your raw data into Snowflake. Depending on your source systems, you have several options:
For data residing in operational databases, Snowflake Snowpipe provides efficient continuous loading:
-- Create an external stage pointing to your cloud storage
CREATE OR REPLACE STAGE orders_stage
URL = 's3://my-company-bucket/orders/'
CREDENTIALS = (AWS_KEY_ID = '...' AWS_SECRET_KEY = '...');
-- Create a pipe to continuously load data
CREATE OR REPLACE PIPE orders_pipe AUTO_INGEST=TRUE AS
COPY INTO raw_orders
FROM @orders_stage
FILE_FORMAT = (TYPE = 'JSON');
For databases supporting Change Data Capture, you can implement CDC flows into Snowflake:
-- Create a table for raw CDC data
CREATE OR REPLACE TABLE raw_orders_cdc (
record_content VARIANT,
record_metadata VARIANT,
arrived_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
For SaaS platforms or applications with APIs, build lightweight integration processes:
# Example Python function for API data loading
def load_api_data_to_snowflake():
# Fetch data from API
response = requests.get(
"https://api.example.com/orders",
headers={"Authorization": f"Bearer {API_TOKEN}"}
)
data = response.json()
# Connect to Snowflake
conn = snowflake.connector.connect(
user=SF_USER,
password=SF_PASSWORD,
account=SF_ACCOUNT,
warehouse=SF_WAREHOUSE,
database=SF_DATABASE,
schema=SF_SCHEMA
)
# Load data to staging table
cursor = conn.cursor()
for record in data['orders']:
cursor.execute(
"INSERT INTO raw_orders_api (record_content, source_system) VALUES (%s, %s)",
(json.dumps(record), 'ordering-api')
)
conn.close()
Regardless of the ingestion method, the key is to establish a continuous flow of data into your Snowflake environment with minimal latency.
Once data is flowing into Snowflake, we need a mechanism to detect changes efficiently. This is where Snowflake Streams shine – they provide a powerful change tracking mechanism that identifies new, modified, and deleted records without complex coding or infrastructure.
-- Create a stream to track changes on the raw orders table
CREATE OR REPLACE STREAM orders_stream ON TABLE raw_orders;
This simple command creates a stream that captures all DML changes (inserts, updates, deletes) to the raw_orders table. The stream acts as a change log that we can query like any other table:
-- View changes captured by the stream
SELECT * FROM orders_stream;
The output includes:
- Metadata columns (METADATA$ACTION, METADATA$ISUPDATE, etc.) that describe the change
- All columns from the source table
- Only rows that have changed since the stream was created or last consumed
This gives us the foundation for incremental processing without having to implement complex change detection logic.
Now that we can detect changes, we need to process them continuously. Snowflake Tasks allow us to set up scheduled or triggered processing without external orchestration tools.
Let’s create a multi-step processing pipeline:
First, we’ll create a task to transform raw data into a cleaner, validated staging format:
-- Create the staging table
CREATE OR REPLACE TABLE orders_staging (
order_id VARCHAR(50),
customer_id VARCHAR(50),
order_date TIMESTAMP_NTZ,
total_amount DECIMAL(18,2),
status VARCHAR(20),
items VARIANT,
processed_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create a task to process new records
CREATE OR REPLACE TASK process_raw_orders
WAREHOUSE = compute_wh
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
INSERT INTO orders_staging (
order_id,
customer_id,
order_date,
total_amount,
status,
items
)
SELECT
r.order_id,
r.customer_id,
TRY_TO_TIMESTAMP(r.order_date) as order_date,
CAST(r.total_amount AS DECIMAL(18,2)) as total_amount,
r.status,
PARSE_JSON(r.items) as items
FROM orders_stream r
WHERE METADATA$ACTION = 'INSERT';
This task:
- Runs every minute but only if there’s data in the stream
- Transforms raw data into a properly typed staging format
- Processes only new records (INSERT actions)
Next, we’ll create a task to transform the staging data into analytics-ready models:
-- Create an analytics-ready fact table
CREATE OR REPLACE TABLE fact_orders (
order_key INT AUTOINCREMENT,
order_id VARCHAR(50),
customer_id VARCHAR(50),
order_date DATE,
order_timestamp TIMESTAMP_NTZ,
total_amount DECIMAL(18,2),
status VARCHAR(20),
item_count INT,
is_first_order BOOLEAN,
last_updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create a dependent task to build the fact table
CREATE OR REPLACE TASK build_fact_orders
WAREHOUSE = compute_wh
AFTER process_raw_orders
AS
MERGE INTO fact_orders f
USING (
SELECT
s.order_id,
s.customer_id,
DATE(s.order_date) as order_date,
s.order_date as order_timestamp,
s.total_amount,
s.status,
ARRAY_SIZE(s.items) as item_count,
CASE WHEN c.first_order_date = DATE(s.order_date) THEN TRUE ELSE FALSE END as is_first_order
FROM orders_staging s
LEFT JOIN dim_customers c ON s.customer_id = c.customer_id
WHERE s.processed_at > DATEADD(minute, -5, CURRENT_TIMESTAMP())
) src
ON f.order_id = src.order_id
WHEN MATCHED THEN
UPDATE SET
f.status = src.status,
f.last_updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (
order_id,
customer_id,
order_date,
order_timestamp,
total_amount,
status,
item_count,
is_first_order
)
VALUES (
src.order_id,
src.customer_id,
src.order_date,
src.order_timestamp,
src.total_amount,
src.status,
src.item_count,
src.is_first_order
);
This second task:
- Runs automatically after the first task completes
- Transforms staging data into an analytics-optimized fact table
- Uses a MERGE operation to handle both new orders and status updates
- Joins with dimension tables to add business context (like whether this is a customer’s first order)
Finally, we’ll create pre-aggregated metrics to optimize dashboard performance:
-- Create an aggregated metrics table
CREATE OR REPLACE TABLE agg_daily_sales (
report_date DATE,
total_orders INT,
total_revenue DECIMAL(18,2),
new_customer_orders INT,
new_customer_revenue DECIMAL(18,2),
avg_order_value DECIMAL(18,2),
last_updated_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Create a task to update the aggregations
CREATE OR REPLACE TASK update_daily_aggregates
WAREHOUSE = compute_wh
AFTER build_fact_orders
AS
MERGE INTO agg_daily_sales a
USING (
SELECT
order_date as report_date,
COUNT(*) as total_orders,
SUM(total_amount) as total_revenue,
SUM(CASE WHEN is_first_order THEN 1 ELSE 0 END) as new_customer_orders,
SUM(CASE WHEN is_first_order THEN total_amount ELSE 0 END) as new_customer_revenue,
AVG(total_amount) as avg_order_value
FROM fact_orders
WHERE last_updated_at > DATEADD(minute, -5, CURRENT_TIMESTAMP())
GROUP BY order_date
) src
ON a.report_date = src.report_date
WHEN MATCHED THEN
UPDATE SET
a.total_orders = src.total_orders,
a.total_revenue = src.total_revenue,
a.new_customer_orders = src.new_customer_orders,
a.new_customer_revenue = src.new_customer_revenue,
a.avg_order_value = src.avg_order_value,
a.last_updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN
INSERT (
report_date,
total_orders,
total_revenue,
new_customer_orders,
new_customer_revenue,
avg_order_value
)
VALUES (
src.report_date,
src.total_orders,
src.total_revenue,
src.new_customer_orders,
src.new_customer_revenue,
src.avg_order_value
);
This final task:
- Creates pre-aggregated metrics that will make dashboards lightning-fast
- Automatically updates when the fact table changes
- Uses a MERGE operation to ensure we’re always updating the latest values
Once our tasks are created, we need to activate them:
-- Start the task chain
ALTER TASK update_daily_aggregates RESUME;
ALTER TASK build_fact_orders RESUME;
ALTER TASK process_raw_orders RESUME;
Note that we resume the tasks in reverse order of dependency, ensuring all dependent tasks are active before their parent tasks.
With our processing pipeline in place, we need to create analytics views that Power BI will connect to. These views should:
- Be optimized for reporting performance
- Include business-friendly naming and calculations
- Implement any necessary security constraints
-- Create a reporting view for sales dashboards
CREATE OR REPLACE SECURE VIEW reporting.vw_sales_dashboard AS
SELECT
a.report_date,
d.year_number,
d.quarter_number,
d.month_number,
d.month_name,
d.day_of_week,
d.is_weekend,
a.total_orders,
a.total_revenue,
a.new_customer_orders,
a.new_customer_revenue,
a.avg_order_value,
a.new_customer_revenue / NULLIF(a.total_revenue, 0) as new_customer_revenue_pct,
-- Add YTD calculations
SUM(a.total_revenue) OVER (
PARTITION BY d.year_number
ORDER BY a.report_date
ROWS UNBOUNDED PRECEDING
) as ytd_revenue,
-- Add rolling metrics
AVG(a.total_orders) OVER (
ORDER BY a.report_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as rolling_7day_avg_orders,
-- Add growth calculations
LAG(a.total_revenue, 1) OVER (ORDER BY a.report_date) as prev_day_revenue,
LAG(a.total_revenue, 7) OVER (ORDER BY a.report_date) as prev_week_revenue,
(a.total_revenue / NULLIF(LAG(a.total_revenue, 1) OVER (ORDER BY a.report_date), 0)) - 1 as day_over_day_growth,
(a.total_revenue / NULLIF(LAG(a.total_revenue, 7) OVER (ORDER BY a.report_date), 0)) - 1 as week_over_week_growth
FROM agg_daily_sales a
JOIN dim_date d ON a.report_date = d.date_day
WHERE a.report_date >= DATEADD(day, -90, CURRENT_DATE());
This view:
- Joins with a date dimension to provide time intelligence
- Includes business KPIs and growth calculations
- Limits the date range to improve performance
- Uses window functions to calculate rolling metrics and growth rates
Now that our data pipeline is continuously processing data and creating analytics-ready tables and views, let’s connect Power BI to visualize it.
Power BI offers two main connection methods for Snowflake:
- Import Mode: Data is imported into Power BI’s in-memory engine
- DirectQuery Mode: Queries run directly against Snowflake in real-time
For true real-time reporting, DirectQuery is the better choice:
// Example Power BI M query for DirectQuery connection
let
Source = Snowflake.Databases(
"yourcompany.snowflakecomputing.com",
"ANALYTICS_DB",
[Role="REPORTER", Warehouse="REPORTING_WH"]
),
REPORTING_Schema = Source{[Name="ANALYTICS_DB"]}[Data]{[Name="REPORTING"]}[Data],
VW_SALES_DASHBOARD_Table = REPORTING_Schema{[Name="VW_SALES_DASHBOARD"]}[Data]
in
VW_SALES_DASHBOARD_Table
To maximize real-time capabilities in Power BI, consider these options:
- DirectQuery with Automatic Page Refresh: For Premium/Embedded capacities, configure your report to refresh automatically:
- Go to “File” > “Options and settings” > “Options”
- Under “Current File” > “Data Load”, enable “Automatic page refresh”
- Set the refresh interval (e.g., every 1-5 minutes)
- Hybrid Mode: Mix DirectQuery for real-time metrics with Import mode for historical data:
- Use DirectQuery for “today’s” metrics
- Use Import mode for historical trends and comparisons
- Schedule regular refreshes for the imported data
- Power BI Streaming Datasets: For critical KPIs that need second-level refresh, use streaming datasets:
- Create a lightweight service that queries Snowflake and pushes to Power BI’s streaming API
- Display these KPIs in tiles that update in near-real-time
DirectQuery mode can be slower than Import mode, so optimize your implementation:
- Create aggregated views in Snowflake (as we did above)
- Limit visuals per page to reduce the number of queries
- Use the “Dual” storage mode for dimension tables
- Implement query reduction measures in Power BI:
- Turn off “Auto-detect relationships”
- Use “Edit interactions” to limit cross-filtering
- Implement report page tooltips instead of basic tooltips
Let’s see how this architecture works in practice with an e-commerce dashboard example:
The main dashboard shows today’s order metrics with automatic 5-minute refreshes:
- Today’s Orders: 1,247 (up 12% from same time yesterday)
- Today’s Revenue: $127,842 (up 8% from same time yesterday)
- Average Order Value: $102.52
- New Customer Orders: 318 (25.5% of total)
All these metrics update automatically every 5 minutes through the Snowflake pipeline we created.
A geographical view shows orders as they come in around the world, with the most recent highlighted:
- Uses DirectQuery to show the latest orders
- Sizes circles by order amount
- Colors indicate new vs. returning customers
Historical trends combine DirectQuery for today with Import mode for historical data:
- Today’s hourly trend (DirectQuery)
- Last 7 days daily trend (Import)
- Month-to-date comparison vs. previous month (Import)
This architecture balances real-time capabilities with reasonable costs:
In production implementations, I’ve consistently seen:
- Data Latency: 2-5 minutes from transaction to dashboard
- Query Performance: 1-3 seconds for most DirectQuery visuals
- Scalability: Successfully handling thousands of orders per minute
To keep costs under control:
- Right-size your Snowflake warehouse for tasks:
-- Example: Create a right-sized warehouse for the streaming tasks CREATE WAREHOUSE stream_task_wh WITH WAREHOUSE_SIZE = 'X-SMALL' AUTO_SUSPEND = 60 AUTO_RESUME = TRUE; -- Then assign it to your tasks ALTER TASK process_raw_orders SET WAREHOUSE = stream_task_wh;
- Optimize task frequency based on business needs:
- Critical operational dashboards: 1-5 minute intervals
- Management dashboards: 15-30 minute intervals
- Strategic dashboards: Hourly or daily
- Implement multi-clustering warehouses for report query workloads:
CREATE WAREHOUSE reporting_wh WITH WAREHOUSE_SIZE = 'MEDIUM' MIN_CLUSTER_COUNT = 1 MAX_CLUSTER_COUNT = 3 SCALING_POLICY = 'STANDARD' AUTO_SUSPEND = 300 AUTO_RESUME = TRUE;
- Use resource monitors to prevent runaway costs:
-- Create a resource monitor for the streaming process CREATE RESOURCE MONITOR stream_monitor WITH CREDIT_QUOTA = 25 FREQUENCY = MONTHLY START_TIMESTAMP = IMMEDIATELY TRIGGERS ON 80 PERCENT DO NOTIFY TRIGGERS ON 100 PERCENT DO SUSPEND; -- Apply it to the warehouse ALTER WAREHOUSE stream_task_wh SET RESOURCE_MONITOR = stream_monitor;
Based on multiple implementations, here are the challenges you might face and how to address them:
Challenge: If a task fails, the entire pipeline stops processing.
Solution: Implement error handling and notifications:
-- Create a logging table
CREATE OR REPLACE TABLE task_log (
task_name VARCHAR,
status VARCHAR,
error_message VARCHAR,
row_count INT,
execution_time FLOAT,
logged_at TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);
-- Modify tasks to include error handling
CREATE OR REPLACE TASK process_raw_orders
WAREHOUSE = stream_task_wh
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('orders_stream')
AS
BEGIN
DECLARE
row_count INTEGER DEFAULT 0;
start_time TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP();
error_message VARCHAR DEFAULT NULL;
-- Attempt the operation in a transaction
BEGIN TRANSACTION;
INSERT INTO orders_staging (
order_id,
customer_id,
order_date,
total_amount,
status,
items
)
SELECT
r.order_id,
r.customer_id,
TRY_TO_TIMESTAMP(r.order_date) as order_date,
CAST(r.total_amount AS DECIMAL(18,2)) as total_amount,
r.status,
PARSE_JSON(r.items) as items
FROM orders_stream r
WHERE METADATA$ACTION = 'INSERT';
-- Capture row count
SET row_count = SQLROWCOUNT;
-- Log successful execution
INSERT INTO task_log (task_name, status, row_count, execution_time)
VALUES (
'process_raw_orders',
'SUCCESS',
:row_count,
DATEDIFF('millisecond', start_time, CURRENT_TIMESTAMP()) / 1000
);
COMMIT;
EXCEPTION
WHEN OTHER THEN
ROLLBACK;
-- Capture error
SET error_message = SQLSTATE || ': ' || SQLERRM;
-- Log error
INSERT INTO task_log (task_name, status, error_message, execution_time)
VALUES (
'process_raw_orders',
'ERROR',
:error_message,
DATEDIFF('millisecond', start_time, CURRENT_TIMESTAMP()) / 1000
);
-- Raise alert (could use Snowflake alerts in newer versions)
CALL system$send_email(
'snowflake_alerts',
'admin@example.com',
'Task Failure: process_raw_orders',
'The task failed with error: ' || :error_message
);
END;
END;
Challenge: Sudden data volume spikes can overwhelm the pipeline.
Solution: Implement backpressure handling and scaling:
- Use larger warehouses for tasks during peak periods
- Implement queue monitoring to detect backlogs
- Create a separate high-throughput pipeline for peak events
Challenge: As source systems change, schema drift can break the pipeline.
Solution: Implement schema evolution strategies:
- Use VARIANT types for initial ingestion to capture all fields
- Implement explicit schema validation steps
- Create alerting for unexpected schema changes
Challenge: Standard Power BI has refresh limitations.
Solution: Implement a tiered approach:
- Use Power BI Premium for critical dashboards requiring frequent refresh
- For standard Pro licenses, create a “last updated” timestamp to make refresh limitations transparent
- Consider Power BI Embedded for applications requiring real-time analytics
The architecture I’ve outlined provides a pragmatic approach to real-time analytics that balances several important factors:
- Business Value: Delivers insights within minutes of data creation
- Implementation Complexity: Uses native cloud tools without specialized streaming platforms
- Maintenance Overhead: Minimal ongoing maintenance compared to traditional streaming systems
- Cost Efficiency: Right-sized compute resources with appropriate auto-scaling and suspension
By combining Snowflake’s Streams and Tasks with Power BI’s visualization capabilities, you can create near real-time reporting solutions that provide the immediacy business users demand without the overwhelming complexity of traditional real-time architectures.
The key takeaway is that “near real-time” (2-5 minute latency) is often the sweet spot that delivers the most business value while maintaining reasonable implementation and operational complexity. For most use cases, this architecture provides an excellent balance of timeliness, maintainability, and cost.
What real-time analytics challenges is your organization facing? Have you implemented Snowflake Streams and Tasks for continuous data processing? Share your experiences in the comments below.
Snowflake #PowerBI #RealTimeAnalytics #DataStreaming #SnowflakeTasks #SnowflakeStreams #DataEngineering #BusinessIntelligence #DataPipeline #NearRealTime #CloudAnalytics #DataWarehouse #ETL #DirectQuery #DataArchitecture #DataVisualization #PerformanceOptimization #SnowflakeSQL #DashboardDesign #SnowflakeTips