LLM Processing Pipelines¶
Overview¶
An automated data processing pipeline that reads product reviews from a SQLite database and analyzes each review's sentiment using an LLM. This example demonstrates QType's dataflow capabilities with database sources, parallel LLM processing, and streaming results without requiring batch operations.
Architecture¶
flowchart TD
subgraph APP ["📱 review_analysis_pipeline"]
direction TB
subgraph FLOW_0 ["🔄 analyze_reviews"]
direction TB
FLOW_0_START@{shape: circle, label: "▶️ Start"}
FLOW_0_S0@{shape: rect, label: "⚙️ load_reviews"}
FLOW_0_S1@{shape: doc, label: "📄 create_prompt"}
FLOW_0_S2@{shape: rounded, label: "✨ analyze_sentiment"}
FLOW_0_S3@{shape: rect, label: "⚙️ write_results"}
FLOW_0_S0 -->|product_name| FLOW_0_S1
FLOW_0_S0 -->|rating| FLOW_0_S1
FLOW_0_S0 -->|review_text| FLOW_0_S1
FLOW_0_S1 -->|analysis_prompt| FLOW_0_S2
FLOW_0_S0 -->|review_id| FLOW_0_S3
FLOW_0_S0 -->|product_name| FLOW_0_S3
FLOW_0_S0 -->|rating| FLOW_0_S3
FLOW_0_S0 -->|review_text| FLOW_0_S3
FLOW_0_S2 -->|llm_analysis| FLOW_0_S3
FLOW_0_START -->|output_path| FLOW_0_S3
end
subgraph RESOURCES ["🔧 Shared Resources"]
direction LR
MODEL_NOVA_LITE@{shape: rounded, label: "✨ nova_lite (aws-bedrock)" }
end
end
FLOW_0_S2 -.->|uses| MODEL_NOVA_LITE
%% Styling
classDef appBox fill:none,stroke:#495057,stroke-width:3px
classDef flowBox fill:#e1f5fe,stroke:#0277bd,stroke-width:2px
classDef llmNode fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
classDef modelNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef authNode fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
classDef telemetryNode fill:#fce4ec,stroke:#c2185b,stroke-width:2px
classDef resourceBox fill:#f5f5f5,stroke:#616161,stroke-width:1px
class APP appBox
class FLOW_0 flowBox
class RESOURCES resourceBox
class TELEMETRY telemetryNode
Complete Code¶
id: review_analysis_pipeline
description: |
Automated product review analysis pipeline demonstrating dataflow processing.
Reads reviews from SQLite database, analyzes sentiment with LLM, and writes
enriched results to a Parquet file.
models:
- type: Model
id: nova_lite
provider: aws-bedrock
model_id: amazon.nova-lite-v1:0
inference_params:
temperature: 0.7
max_tokens: 256
flows:
- type: Flow
id: analyze_reviews
description: Batch process product reviews with LLM sentiment analysis
variables:
- id: review_id
type: int
- id: product_name
type: text
- id: rating
type: int
- id: review_text
type: text
- id: analysis_prompt
type: text
- id: llm_analysis
type: text
- id: output_path
type: text
- id: result_file
type: text
inputs:
- output_path
outputs:
- result_file
steps:
# Step 1: Read reviews from SQLite database
# SQLSource emits one message per database row
- id: load_reviews
type: SQLSource
connection: "sqlite:///examples/data_processing/reviews.db"
query: |
SELECT
review_id,
product_name,
rating,
review_text
FROM product_reviews
ORDER BY review_id
inputs: []
outputs:
- review_id
- product_name
- rating
- review_text
# Step 2: Format analysis prompt for each review
# PromptTemplate creates structured prompts from review data
- id: create_prompt
type: PromptTemplate
template: |
Analyze this product review in 1-2 sentences. Include:
- Overall sentiment (positive/negative/mixed)
- 2-3 key themes or points
Product: {product_name}
Rating: {rating}/5
Review: {review_text}
inputs:
- product_name
- rating
- review_text
outputs:
- analysis_prompt
# Step 3: Analyze each review with LLM
# LLMInference processes each message through the language model
- id: analyze_sentiment
type: LLMInference
model: nova_lite
inputs:
- analysis_prompt
outputs:
- llm_analysis
# Step 4: Write enriched results to Parquet file
# FileWriter batches all messages and writes once
- id: write_results
type: FileWriter
path: output_path
inputs:
- review_id
- product_name
- rating
- review_text
- llm_analysis
- output_path
outputs:
- result_file
Key Features¶
- SQLSource Step: Database source that executes SQL queries using SQLAlchemy connection strings and emits one message per result row, enabling parallel processing of database records through downstream steps
- PromptTemplate Step: Template engine with curly-brace variable substitution (
{product_name},{rating}) that dynamically generates prompts from message variables for each review - LLMInference Step: Processes each message independently through the language model with automatic parallelization, invoking AWS Bedrock inference for all reviews concurrently
- Multi-record Flow: Each database row becomes an independent FlowMessage flowing through the pipeline in parallel, carrying variables (review_id, product_name, rating, review_text) and accumulating new fields (llm_analysis) at each step
- Message Sink: The final step accumulates all records and writes them to an output file.
Running the Example¶
Setup¶
First, create the sample database with product reviews:
This generates a SQLite database with 10 sample product reviews covering various products and sentiments.
Run the Pipeline¶
Process all reviews and generate the analysis with real-time progress monitoring:
qtype run -i '{"output_path":"results.parquet"}' --progress examples/data_processing/dataflow_pipelines.qtype.yaml
The --progress flag displays a live dashboard showing:
- Message throughput for each step (msg/s)
- Success/error counts
- Processing duration with visual progress bars
Example output:
╭─────────────────────────────────────────────────────────────────────────────── Flow Progress ───────────────────────────────────────────────────────────────────────────────╮
│ │
│ Step load_reviews 1.6 msg/s ▁▁▁▁▃▃▃▃▅▅▅▅████████ ✔ 10 succeeded ✖ 0 errors ⟳ - hits ✗ - misses 0:00:06 │
│ Step create_prompt 1.6 msg/s ▁▁▁▁▃▃▃▃▅▅▅▅████████ ✔ 10 succeeded ✖ 0 errors ⟳ - hits ✗ - misses 0:00:06 │
│ Step analyze_sentiment 2.0 msg/s ▄▄▄▄▆▆▆▆▅▅▅▅███████▁ ✔ 10 succeeded ✖ 0 errors ⟳ - hits ✗ - misses 0:00:04 │
│ Step write_results - msg/s ✔ 1 succeeded ✖ 0 errors ⟳ - hits ✗ - misses 0:00:00 │
│ │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
You'll notice that the output shows 1 message for write_results and 10 for the others. That is because it is reporting the number of messages emitted from each step, and write_results is a sink that collects all messages.
The final message of the output will be the result file where the data are written:
2026-01-16 11:23:35,151 - INFO: ✅ Flow execution completed successfully
2026-01-16 11:23:35,151 - INFO: Processed 1 em
2026-01-16 11:23:35,152 - INFO:
Results:
result_file: results.parquet
Learn More¶
- Tutorial: Your First QType Application
- Example: Simple Chatbot