Enrich data at scale using Parallel’s native DuckDB integration with batch processing
This integration is ideal for data engineers and analysts who work with DuckDB and need to enrich data with web intelligence directly in their SQL or Python workflows.Parallel provides a native DuckDB integration with two approaches: batch processing for efficiency, and SQL UDFs for flexibility.
The function returns an EnrichmentResult dataclass:
Copy
Ask AI
@dataclassclass EnrichmentResult: relation: duckdb.DuckDBPyRelation # Enriched data as DuckDB relation success_count: int # Number of successful rows error_count: int # Number of failed rows errors: list[dict] # Error details with row index elapsed_time: float # Processing time in seconds
For flexibility in SQL queries, you can register a parallel_enrich() function:
Copy
Ask AI
import duckdbimport jsonfrom parallel_web_tools.integrations.duckdb import register_parallel_functionsconn = duckdb.connect()conn.execute("CREATE TABLE companies AS SELECT 'Google' as name")# Register the UDFregister_parallel_functions(conn, processor="lite-fast")# Use in SQLresults = conn.execute(""" SELECT name, parallel_enrich( json_object('company_name', name), json_array('CEO name', 'Founding year') ) as enriched FROM companies""").fetchall()# Parse the JSON resultfor name, enriched_json in results: data = json.loads(enriched_json) print(f"{name}: CEO = {data.get('ceo_name')}")
The SQL UDF processes rows individually. For better performance with multiple rows, use batch processing with enrich_table().
Batch processing is significantly faster (4-5x or more) than the SQL UDF for multiple rows:
Copy
Ask AI
# Recommended - processes all rows in parallelresult = enrich_table(conn, "companies", ...)# Slower - one API call per rowconn.execute("SELECT *, parallel_enrich(...) FROM companies")
Use specific descriptions
Be specific in your output column descriptions for better results:
Copy
Ask AI
output_columns = [ "CEO name (current CEO or equivalent leader)", "Founding year (YYYY format)", "Annual revenue (USD, most recent fiscal year)",]
Handle errors gracefully
Errors don’t stop processing - partial results are returned:
Copy
Ask AI
result = enrich_table(conn, ...)if result.error_count > 0: print(f"Failed rows: {result.error_count}") for error in result.errors: print(f" Row {error['row']}: {error['error']}")# Errors appear as NULL in the resultdf = result.relation.fetchdf()successful = df[df['ceo_name'].notna()]
Cost management
Use lite-fast for high-volume, basic enrichments
Test with small batches before processing large tables
Store results in permanent tables to avoid re-enriching