Read Data from SQL Databases¶
Query relational databases and process results row-by-row using the SQLSource step, which supports any database accessible via SQLAlchemy connection strings.
QType YAML¶
steps:
- type: SQLSource
id: load_reviews
connection: "sqlite:///data/reviews.db"
query: |
SELECT
review_id,
product_name,
rating,
review_text
FROM product_reviews
WHERE rating >= 4
ORDER BY review_id
outputs:
- review_id
- product_name
- rating
- review_text
Explanation¶
- SQLSource: Step type that executes SQL queries and emits one message per database row
- connection: SQLAlchemy-format connection string (e.g.,
sqlite:///path.db,postgresql://user:pass@host/db) - query: SQL query to execute; column names must match output variable IDs
- outputs: Variables to populate from query result columns (order must match SELECT clause)
- auth: Optional reference to AuthorizationProvider for database credentials
Complete Example¶
id: review_analysis_pipeline
description: |
Automated product review analysis pipeline demonstrating dataflow processing.
Reads reviews from SQLite database, analyzes sentiment with LLM, and writes
enriched results to a Parquet file.
models:
- type: Model
id: nova_lite
provider: aws-bedrock
model_id: amazon.nova-lite-v1:0
inference_params:
temperature: 0.7
max_tokens: 256
flows:
- type: Flow
id: analyze_reviews
description: Batch process product reviews with LLM sentiment analysis
variables:
- id: review_id
type: int
- id: product_name
type: text
- id: rating
type: int
- id: review_text
type: text
- id: analysis_prompt
type: text
- id: llm_analysis
type: text
- id: output_path
type: text
- id: result_file
type: text
inputs:
- output_path
outputs:
- result_file
steps:
# Step 1: Read reviews from SQLite database
# SQLSource emits one message per database row
- id: load_reviews
type: SQLSource
connection: "sqlite:///examples/data_processing/reviews.db"
query: |
SELECT
review_id,
product_name,
rating,
review_text
FROM product_reviews
ORDER BY review_id
inputs: []
outputs:
- review_id
- product_name
- rating
- review_text
# Step 2: Format analysis prompt for each review
# PromptTemplate creates structured prompts from review data
- id: create_prompt
type: PromptTemplate
template: |
Analyze this product review in 1-2 sentences. Include:
- Overall sentiment (positive/negative/mixed)
- 2-3 key themes or points
Product: {product_name}
Rating: {rating}/5
Review: {review_text}
inputs:
- product_name
- rating
- review_text
outputs:
- analysis_prompt
# Step 3: Analyze each review with LLM
# LLMInference processes each message through the language model
- id: analyze_sentiment
type: LLMInference
model: nova_lite
inputs:
- analysis_prompt
outputs:
- llm_analysis
# Step 4: Write enriched results to Parquet file
# FileWriter batches all messages and writes once
- id: write_results
type: FileWriter
path: output_path
inputs:
- review_id
- product_name
- rating
- review_text
- llm_analysis
- output_path
outputs:
- result_file