Skip to content

LLM Processing Pipelines

Overview

An automated data processing pipeline that reads product reviews from a SQLite database and analyzes each review's sentiment using an LLM. This example demonstrates QType's dataflow capabilities with database sources, parallel LLM processing, and streaming results without requiring batch operations.

Architecture

flowchart TD
    subgraph APP ["📱 review_analysis_pipeline"]
        direction TB

    subgraph FLOW_0 ["🔄 analyze_reviews"]
        direction TB
        FLOW_0_START@{shape: circle, label: "▶️ Start"}
        FLOW_0_S0@{shape: rect, label: "⚙️ load_reviews"}
        FLOW_0_S1@{shape: doc, label: "📄 create_prompt"}
        FLOW_0_S2@{shape: rounded, label: "✨ analyze_sentiment"}
        FLOW_0_S3@{shape: rect, label: "⚙️ write_results"}
        FLOW_0_S0 -->|product_name| FLOW_0_S1
        FLOW_0_S0 -->|rating| FLOW_0_S1
        FLOW_0_S0 -->|review_text| FLOW_0_S1
        FLOW_0_S1 -->|analysis_prompt| FLOW_0_S2
        FLOW_0_S0 -->|review_id| FLOW_0_S3
        FLOW_0_S0 -->|product_name| FLOW_0_S3
        FLOW_0_S0 -->|rating| FLOW_0_S3
        FLOW_0_S0 -->|review_text| FLOW_0_S3
        FLOW_0_S2 -->|llm_analysis| FLOW_0_S3
        FLOW_0_START -->|output_path| FLOW_0_S3
    end

    subgraph RESOURCES ["🔧 Shared Resources"]
        direction LR
        MODEL_NOVA_LITE@{shape: rounded, label: "✨ nova_lite (aws-bedrock)" }
    end

    end

    FLOW_0_S2 -.->|uses| MODEL_NOVA_LITE

    %% Styling
    classDef appBox fill:none,stroke:#495057,stroke-width:3px
    classDef flowBox fill:#e1f5fe,stroke:#0277bd,stroke-width:2px
    classDef llmNode fill:#f3e5f5,stroke:#7b1fa2,stroke-width:2px
    classDef modelNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
    classDef authNode fill:#fff3e0,stroke:#ef6c00,stroke-width:2px
    classDef telemetryNode fill:#fce4ec,stroke:#c2185b,stroke-width:2px
    classDef resourceBox fill:#f5f5f5,stroke:#616161,stroke-width:1px

    class APP appBox
    class FLOW_0 flowBox
    class RESOURCES resourceBox
    class TELEMETRY telemetryNode

Complete Code

id: review_analysis_pipeline
description: |
  Automated product review analysis pipeline demonstrating dataflow processing.
  Reads reviews from SQLite database, analyzes sentiment with LLM, and writes
  enriched results to a Parquet file.

models:
  - type: Model
    id: nova_lite
    provider: aws-bedrock
    model_id: amazon.nova-lite-v1:0
    inference_params:
      temperature: 0.7
      max_tokens: 256

flows:
  - type: Flow
    id: analyze_reviews
    description: Batch process product reviews with LLM sentiment analysis

    variables:
      - id: review_id
        type: int
      - id: product_name
        type: text
      - id: rating
        type: int
      - id: review_text
        type: text
      - id: analysis_prompt
        type: text
      - id: llm_analysis
        type: text
      - id: output_path
        type: text
      - id: result_file
        type: text

    inputs:
      - output_path

    outputs:
      - result_file

    steps:
      # Step 1: Read reviews from SQLite database
      # SQLSource emits one message per database row
      - id: load_reviews
        type: SQLSource
        connection: "sqlite:///examples/data_processing/reviews.db"
        query: |
          SELECT
            review_id,
            product_name,
            rating,
            review_text
          FROM product_reviews
          ORDER BY review_id
        inputs: []
        outputs:
          - review_id
          - product_name
          - rating
          - review_text

      # Step 2: Format analysis prompt for each review
      # PromptTemplate creates structured prompts from review data
      - id: create_prompt
        type: PromptTemplate
        template: |
          Analyze this product review in 1-2 sentences. Include:
          - Overall sentiment (positive/negative/mixed)
          - 2-3 key themes or points

          Product: {product_name}
          Rating: {rating}/5
          Review: {review_text}
        inputs:
          - product_name
          - rating
          - review_text
        outputs:
          - analysis_prompt

      # Step 3: Analyze each review with LLM
      # LLMInference processes each message through the language model
      - id: analyze_sentiment
        type: LLMInference
        model: nova_lite
        inputs:
          - analysis_prompt
        outputs:
          - llm_analysis

      # Step 4: Write enriched results to Parquet file
      # FileWriter batches all messages and writes once
      - id: write_results
        type: FileWriter
        path: output_path
        inputs:
          - review_id
          - product_name
          - rating
          - review_text
          - llm_analysis
          - output_path
        outputs:
          - result_file

Key Features

  • SQLSource Step: Database source that executes SQL queries using SQLAlchemy connection strings and emits one message per result row, enabling parallel processing of database records through downstream steps
  • PromptTemplate Step: Template engine with curly-brace variable substitution ({product_name}, {rating}) that dynamically generates prompts from message variables for each review
  • LLMInference Step: Processes each message independently through the language model with automatic parallelization, invoking AWS Bedrock inference for all reviews concurrently
  • Multi-record Flow: Each database row becomes an independent FlowMessage flowing through the pipeline in parallel, carrying variables (review_id, product_name, rating, review_text) and accumulating new fields (llm_analysis) at each step
  • Message Sink: The final step accumulates all records and writes them to an output file.

Running the Example

Setup

First, create the sample database with product reviews:

python examples/data_processing/create_sample_db.py

This generates a SQLite database with 10 sample product reviews covering various products and sentiments.

Run the Pipeline

Process all reviews and generate the analysis with real-time progress monitoring:

qtype run -i '{"output_path":"results.parquet"}' --progress examples/data_processing/dataflow_pipelines.qtype.yaml

The --progress flag displays a live dashboard showing: - Message throughput for each step (msg/s) - Success/error counts - Processing duration with visual progress bars

Example output:

╭─────────────────────────────────────────────────────────────────────────────── Flow Progress ───────────────────────────────────────────────────────────────────────────────╮
│                                                                                                                                                                             │
│  Step load_reviews                      1.6 msg/s       ▁▁▁▁▃▃▃▃▅▅▅▅████████            ✔ 10 succeeded         ✖ 0 errors       ⟳ - hits      ✗ - misses      0:00:06       │
│  Step create_prompt                     1.6 msg/s       ▁▁▁▁▃▃▃▃▅▅▅▅████████            ✔ 10 succeeded         ✖ 0 errors       ⟳ - hits      ✗ - misses      0:00:06       │
│  Step analyze_sentiment                 2.0 msg/s       ▄▄▄▄▆▆▆▆▅▅▅▅███████▁            ✔ 10 succeeded         ✖ 0 errors       ⟳ - hits      ✗ - misses      0:00:04       │
│  Step write_results                    - msg/s                                          ✔ 1 succeeded          ✖ 0 errors       ⟳ - hits      ✗ - misses      0:00:00       │
│                                                                                                                                                                             │
╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯

You'll notice that the output shows 1 message for write_results and 10 for the others. That is because it is reporting the number of messages emitted from each step, and write_results is a sink that collects all messages.

The final message of the output will be the result file where the data are written:

2026-01-16 11:23:35,151 - INFO: ✅ Flow execution completed successfully
2026-01-16 11:23:35,151 - INFO: Processed 1 em
2026-01-16 11:23:35,152 - INFO: 
Results:
result_file: results.parquet

Learn More