Skip to main content
This integration is ideal for data engineers and analysts who work with DuckDB and need to enrich data with web intelligence directly in their SQL or Python workflows. Parallel provides a native DuckDB integration with two approaches: batch processing for efficiency, and SQL UDFs for flexibility.
View the complete demo notebook:

Features

  • Batch Processing: Process all rows in parallel with a single API call (recommended)
  • SQL UDF: Use parallel_enrich() directly in SQL queries
  • Progress Callbacks: Track enrichment progress for large datasets
  • Permanent Tables: Optionally save results to a new table

Installation

pip install parallel-web-tools[duckdb]
Or with all dependencies:
pip install parallel-web-tools[all]

Basic Usage - Batch Processing

Batch processing is the recommended approach for enriching multiple rows efficiently.
import duckdb
from parallel_web_tools.integrations.duckdb import enrich_table

# Create a connection and sample data
conn = duckdb.connect()
conn.execute("""
    CREATE TABLE companies AS SELECT * FROM (VALUES
        ('Google', 'google.com'),
        ('Microsoft', 'microsoft.com'),
        ('Apple', 'apple.com')
    ) AS t(name, website)
""")

# Enrich the table
result = enrich_table(
    conn,
    source_table="companies",
    input_columns={
        "company_name": "name",
        "website": "website",
    },
    output_columns=[
        "CEO name",
        "Founding year",
        "Headquarters city",
    ],
)

# Access results
print(result.relation.fetchdf())
print(f"Success: {result.success_count}, Errors: {result.error_count}")
Output:
namewebsiteceo_namefounding_yearheadquarters_city
Googlegoogle.comSundar Pichai1998Mountain View
Microsoftmicrosoft.comSatya Nadella1975Redmond
Appleapple.comTim Cook1976Cupertino

Function Parameters

ParameterTypeDefaultDescription
connDuckDBPyConnectionrequiredDuckDB connection
source_tablestrrequiredTable name or SQL query
input_columnsdict[str, str]requiredMapping of input descriptions to column names
output_columnslist[str]requiredList of output column descriptions
result_tablestr | NoneNoneOptional permanent table to create
api_keystr | NoneNoneAPI key (uses PARALLEL_API_KEY env var if not provided)
processorstr"lite-fast"Parallel processor to use
timeoutint600Timeout in seconds
include_basisboolFalseInclude citations in results
progress_callbackCallableNoneCallback for progress updates

Return Value

The function returns an EnrichmentResult dataclass:
@dataclass
class EnrichmentResult:
    relation: duckdb.DuckDBPyRelation  # Enriched data as DuckDB relation
    success_count: int                  # Number of successful rows
    error_count: int                    # Number of failed rows
    errors: list[dict]                  # Error details with row index
    elapsed_time: float                 # Processing time in seconds

Column Name Mapping

Output column descriptions are automatically converted to valid SQL identifiers. Field names are converted to snake_case:
DescriptionColumn Name
"CEO name"ceo_name
"Founding year (YYYY)"founding_year
"Annual revenue [USD]"annual_revenue

SQL Query as Source

You can pass a SQL query instead of a table name:
result = enrich_table(
    conn,
    source_table="""
        SELECT name, website
        FROM companies
        WHERE active = true
        LIMIT 100
    """,
    input_columns={"company_name": "name", "website": "website"},
    output_columns=["CEO name"],
)

Creating Permanent Tables

Save enriched results to a permanent table:
result = enrich_table(
    conn,
    source_table="companies",
    input_columns={"company_name": "name"},
    output_columns=["CEO name", "Founding year"],
    result_table="enriched_companies",
)

# Query the permanent table later
conn.execute("SELECT * FROM enriched_companies").fetchall()

Progress Tracking

Track progress for large enrichment jobs:
def on_progress(completed: int, total: int):
    print(f"Progress: {completed}/{total} ({100*completed/total:.0f}%)")

result = enrich_table(
    conn,
    source_table="companies",
    input_columns={"company_name": "name"},
    output_columns=["CEO name"],
    progress_callback=on_progress,
)

SQL UDF Usage

For flexibility in SQL queries, you can register a parallel_enrich() function:
import duckdb
import json
from parallel_web_tools.integrations.duckdb import register_parallel_functions

conn = duckdb.connect()
conn.execute("CREATE TABLE companies AS SELECT 'Google' as name")

# Register the UDF
register_parallel_functions(conn, processor="lite-fast")

# Use in SQL
results = conn.execute("""
    SELECT
        name,
        parallel_enrich(
            json_object('company_name', name),
            json_array('CEO name', 'Founding year')
        ) as enriched
    FROM companies
""").fetchall()

# Parse the JSON result
for name, enriched_json in results:
    data = json.loads(enriched_json)
    print(f"{name}: CEO = {data.get('ceo_name')}")
The SQL UDF processes rows individually. For better performance with multiple rows, use batch processing with enrich_table().

Including Citations

result = enrich_table(
    conn,
    source_table="companies",
    input_columns={"company_name": "name"},
    output_columns=["CEO name"],
    include_basis=True,
)

# Access citations in the _basis column
df = result.relation.fetchdf()
for _, row in df.iterrows():
    print(f"CEO: {row['ceo_name']}")
    print(f"Sources: {row['_basis']}")

Processor Selection

Choose a processor based on your speed vs thoroughness requirements. See Choose a Processor for detailed guidance and Pricing for cost information.
ProcessorSpeedBest For
lite-fastFastestBasic metadata, high volume
base-fastFastStandard enrichments
core-fastMediumCross-referenced data
pro-fastSlowerDeep research

Best Practices

Batch processing is significantly faster (4-5x or more) than the SQL UDF for multiple rows:
# Recommended - processes all rows in parallel
result = enrich_table(conn, "companies", ...)

# Slower - one API call per row
conn.execute("SELECT *, parallel_enrich(...) FROM companies")
Be specific in your output column descriptions for better results:
output_columns = [
    "CEO name (current CEO or equivalent leader)",
    "Founding year (YYYY format)",
    "Annual revenue (USD, most recent fiscal year)",
]
Errors don’t stop processing - partial results are returned:
result = enrich_table(conn, ...)

if result.error_count > 0:
    print(f"Failed rows: {result.error_count}")
    for error in result.errors:
        print(f"  Row {error['row']}: {error['error']}")

# Errors appear as NULL in the result
df = result.relation.fetchdf()
successful = df[df['ceo_name'].notna()]
  • Use lite-fast for high-volume, basic enrichments
  • Test with small batches before processing large tables
  • Store results in permanent tables to avoid re-enriching