Skip to content

Read Data from SQL Databases

Query relational databases and process results row-by-row using the SQLSource step, which supports any database accessible via SQLAlchemy connection strings.

QType YAML

steps:
  - type: SQLSource
    id: load_reviews
    connection: "sqlite:///data/reviews.db"
    query: |
      SELECT
        review_id,
        product_name,
        rating,
        review_text
      FROM product_reviews
      WHERE rating >= 4
      ORDER BY review_id
    outputs:
      - review_id
      - product_name
      - rating
      - review_text

Explanation

  • SQLSource: Step type that executes SQL queries and emits one message per database row
  • connection: SQLAlchemy-format connection string (e.g., sqlite:///path.db, postgresql://user:pass@host/db)
  • query: SQL query to execute; column names must match output variable IDs
  • outputs: Variables to populate from query result columns (order must match SELECT clause)
  • auth: Optional reference to AuthorizationProvider for database credentials

Complete Example

id: review_analysis_pipeline
description: |
  Automated product review analysis pipeline demonstrating dataflow processing.
  Reads reviews from SQLite database, analyzes sentiment with LLM, and writes
  enriched results to a Parquet file.

models:
  - type: Model
    id: nova_lite
    provider: aws-bedrock
    model_id: amazon.nova-lite-v1:0
    inference_params:
      temperature: 0.7
      max_tokens: 256

flows:
  - type: Flow
    id: analyze_reviews
    description: Batch process product reviews with LLM sentiment analysis

    variables:
      - id: review_id
        type: int
      - id: product_name
        type: text
      - id: rating
        type: int
      - id: review_text
        type: text
      - id: analysis_prompt
        type: text
      - id: llm_analysis
        type: text
      - id: output_path
        type: text
      - id: result_file
        type: text

    inputs:
      - output_path

    outputs:
      - result_file

    steps:
      # Step 1: Read reviews from SQLite database
      # SQLSource emits one message per database row
      - id: load_reviews
        type: SQLSource
        connection: "sqlite:///examples/data_processing/reviews.db"
        query: |
          SELECT
            review_id,
            product_name,
            rating,
            review_text
          FROM product_reviews
          ORDER BY review_id
        inputs: []
        outputs:
          - review_id
          - product_name
          - rating
          - review_text

      # Step 2: Format analysis prompt for each review
      # PromptTemplate creates structured prompts from review data
      - id: create_prompt
        type: PromptTemplate
        template: |
          Analyze this product review in 1-2 sentences. Include:
          - Overall sentiment (positive/negative/mixed)
          - 2-3 key themes or points

          Product: {product_name}
          Rating: {rating}/5
          Review: {review_text}
        inputs:
          - product_name
          - rating
          - review_text
        outputs:
          - analysis_prompt

      # Step 3: Analyze each review with LLM
      # LLMInference processes each message through the language model
      - id: analyze_sentiment
        type: LLMInference
        model: nova_lite
        inputs:
          - analysis_prompt
        outputs:
          - llm_analysis

      # Step 4: Write enriched results to Parquet file
      # FileWriter batches all messages and writes once
      - id: write_results
        type: FileWriter
        path: output_path
        inputs:
          - review_id
          - product_name
          - rating
          - review_text
          - llm_analysis
          - output_path
        outputs:
          - result_file

See Also