Skip to content

Gather Results into a List

Combine fan-out processing results into a single list while preserving variables that have the same value across all messages (common ancestors).

QType YAML

variables:
  - id: processed_product
    type: text
  - id: all_processed
    type: list[text]

steps:
  - type: Collect
    id: aggregate
    inputs: [processed_product]
    outputs: [all_processed]

Explanation

  • Collect: Gathers all input values from multiple messages into a single list output
  • Common ancestors: Only variables that have the exact same value across ALL input messages are preserved in the output message
  • Fan-out pattern: Typically used after Explode to reverse the fan-out and accumulate results
  • Single output: Always produces exactly one output message containing the accumulate list

Understanding Common Ancestors

If you have these three messages flowing into Collect:

Message 1: {category: "Electronics", region: "US", product: "Phone", processed: "Processed: Phone"}
Message 2: {category: "Electronics", region: "US", product: "Laptop", processed: "Processed: Laptop"}
Message 3: {category: "Electronics", region: "US", product: "Tablet", processed: "Processed: Tablet"}

The Collect step will output:

{category: "Electronics", region: "US", all_processed: ["Processed: Phone", "Processed: Laptop", "Processed: Tablet"]}

Note that product is not preserved because it has different values across the messages. Only category and region (which are identical in all three messages) are included as common ancestors.

Complete Example

id: collect_example
description: Aggregate fan-out results preserving only common ancestor variables

flows:
  - type: Flow
    id: main
    description: Process products by category, then collect results

    variables:
      - id: category
        type: text
      - id: region
        type: text
      - id: products
        type: list[text]
      - id: product
        type: text
      - id: processed_product
        type: text
      - id: all_processed
        type: list[text]

    inputs:
      - category
      - region
      - products

    outputs:
      - all_processed

    steps:
      # Explode creates multiple messages, each with:
      # - category (same for all)
      # - region (same for all)
      # - product (different for each)
      - type: Explode
        id: fan_out
        inputs: [products]
        outputs: [product]

      # Each message still has category, region, and its unique product
      - type: PromptTemplate
        id: process
        inputs: [product]
        outputs: [processed_product]
        template: "Processed: {product}"

      # Collect aggregates all processed_product values into a list
      # Only category and region are preserved as "common ancestors"
      # (same value across all messages)
      # The unique product variable is NOT preserved (different in each)
      - type: Collect
        id: aggregate
        inputs: [processed_product]
        outputs: [all_processed]

Run the example:

qtype run examples/data_processing/collect_results.qtype.yaml \
  -i '{"category": "Electronics", "region": "US", "products": ["Phone", "Laptop", "Tablet"]}'

Output:

all_processed: ['Processed: Phone', 'Processed: Laptop', 'Processed: Tablet']

See Also