Gather Results into a List¶
Combine fan-out processing results into a single list while preserving variables that have the same value across all messages (common ancestors).
QType YAML¶
variables:
- id: processed_product
type: text
- id: all_processed
type: list[text]
steps:
- type: Collect
id: aggregate
inputs: [processed_product]
outputs: [all_processed]
Explanation¶
- Collect: Gathers all input values from multiple messages into a single list output
- Common ancestors: Only variables that have the exact same value across ALL input messages are preserved in the output message
- Fan-out pattern: Typically used after
Explodeto reverse the fan-out and accumulate results - Single output: Always produces exactly one output message containing the accumulate list
Understanding Common Ancestors¶
If you have these three messages flowing into Collect:
Message 1: {category: "Electronics", region: "US", product: "Phone", processed: "Processed: Phone"}
Message 2: {category: "Electronics", region: "US", product: "Laptop", processed: "Processed: Laptop"}
Message 3: {category: "Electronics", region: "US", product: "Tablet", processed: "Processed: Tablet"}
The Collect step will output:
{category: "Electronics", region: "US", all_processed: ["Processed: Phone", "Processed: Laptop", "Processed: Tablet"]}
Note that product is not preserved because it has different values across the messages. Only category and region (which are identical in all three messages) are included as common ancestors.
Complete Example¶
id: collect_example
description: Aggregate fan-out results preserving only common ancestor variables
flows:
- type: Flow
id: main
description: Process products by category, then collect results
variables:
- id: category
type: text
- id: region
type: text
- id: products
type: list[text]
- id: product
type: text
- id: processed_product
type: text
- id: all_processed
type: list[text]
inputs:
- category
- region
- products
outputs:
- all_processed
steps:
# Explode creates multiple messages, each with:
# - category (same for all)
# - region (same for all)
# - product (different for each)
- type: Explode
id: fan_out
inputs: [products]
outputs: [product]
# Each message still has category, region, and its unique product
- type: PromptTemplate
id: process
inputs: [product]
outputs: [processed_product]
template: "Processed: {product}"
# Collect aggregates all processed_product values into a list
# Only category and region are preserved as "common ancestors"
# (same value across all messages)
# The unique product variable is NOT preserved (different in each)
- type: Collect
id: aggregate
inputs: [processed_product]
outputs: [all_processed]
Run the example:
qtype run examples/data_processing/collect_results.qtype.yaml \
-i '{"category": "Electronics", "region": "US", "products": ["Phone", "Laptop", "Tablet"]}'
Output: