Skip to main content

API Overview

The Parallel Ingest API provides endpoints for creating intelligent task runs that can perform web research and data extraction. The API is built around a stateful architecture where task creation and result retrieval are separate operations.

Endpoints

Suggest Task

POST /v1beta/tasks/suggest Generate a task specification based on user intent. This endpoint helps you create properly structured tasks by analyzing your requirements and suggesting appropriate schemas.

Request Parameters

ParameterTypeRequiredDescription
user_intentstringYesNatural language description of what you want to accomplish
previous_taskSuggestedTaskSpecNoPrevious task specification to iterate upon and improve, or to restrict input columns to a predefined set (see example below)

Response Schema

Returns a SuggestedTaskSpec object with the following fields:
FieldTypeDescription
input_schemaobjectJSON schema defining expected input structure
output_schemaobjectJSON schema defining expected output structure
inputsarraySample input data, if provided in the user intent
titlestringSuggested title for the task

Example Request

  • cURL
  • Python
  • TypeScript
curl -X POST "https://api.parallel.ai/v1beta/tasks/suggest" \
  -H "x-api-key: PARALLEL_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "user_intent": "Find the CEOs of tech companies"
  }'
With previous task iteration:
curl -X POST "https://api.parallel.ai/v1beta/tasks/suggest" \
  -H "x-api-key: PARALLEL_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "user_intent": "I want to also include the company website and founding year in the output schema",
    "previous_task": {
      "input_schema": {
        "type": "object",
        "properties": {
          "company_name": {
            "type": "string",
            "description": "Name of the company"
          }
        },
        "required": ["company_name"]
      },
      "output_schema": {
        "type": "object",
        "properties": {
          "ceo_name": {
            "type": "string",
            "description": "Current CEO of the company"
          }
        }
      }
    }
  }'

Example Response

{
  "input_schema": {
    "type": "object",
    "properties": {
      "company_name": {
        "type": "string",
        "description": "Name of the company"
      }
    },
    "required": ["company_name"]
  },
  "output_schema": {
    "type": "object",
    "properties": {
      "ceo_name": {
        "type": "string",
        "description": "Current CEO of the company"
      },
      "appointed_date": {
        "type": "string",
        "description": "Date when the CEO was appointed"
      }
    }
  },
  "inputs": [],
  "title": "Find Company CEO Information"
}

Suggest Processor

POST /v1beta/tasks/suggest-processor Enhance and optimize a task specification by suggesting the most appropriate processor and refining the schemas.

Suggest Processor Request Parameters

ParameterTypeRequiredDescription
task_specobjectYesTask specification object to be processed
choose_processors_fromarrayYesList of processors to choose from. Available: [“lite”, “base”, “core”, “pro”, “ultra”]

Suggest Processor Example Request

  • cURL
  • Python
  • TypeScript
curl -X POST "https://api.parallel.ai/v1beta/tasks/suggest-processor" \
  -H "x-api-key: PARALLEL_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "task_spec": {
      "input_schema": {
        "type": "object",
        "properties": {
          "company_name": {
            "type": "string"
          }
        }
      },
      "output_schema": {
        "type": "object",
        "properties": {
          "ceo_name": {
            "type": "string"
          }
        }
      }
    },
    "choose_processors_from": ["lite", "base", "core", "pro", "ultra"]
  }'

Suggest Processor Response Schema

FieldTypeDescription
recommended_processorsarrayList of recommended processors in priority order. We recommend using the first element.
Returns an enhanced task specification with additional fields and optimizations.

Suggest Processor Example Response

{
  "recommended_processors": ["pro"]
}

Examples

Select Input Columns from a Predefined Set

Sometimes you have a specific dataset with fixed columns and need to create a task that works exclusively with those columns. The previous_task parameter allows you to constrain the API to generate task specifications that match your exact data structure. When to use this approach:
  • You have a fixed dataset schema that cannot be modified
  • You want to ensure the task only uses your specific input columns
  • You need to provide examples that match your exact data format
  • You want to prevent the API from suggesting additional input fields
The workflow:
  1. Define Your Schema: Specify exactly which columns you want to use as inputs with their descriptions
  2. Provide Sample Data: Include examples that match your exact data format
  3. Generate a SuggestedTaskSpec: Use the helper function to create a properly formatted SuggestedTaskSpec object
  4. Refine with API: Pass this as previous_task to get a refined task spec that respects your column constraints
The API will use your predefined input schema as a foundation and refine the output schema while preserving your input columns. This guarantees the final task specification integrates seamlessly with your existing dataset.
import requests
import json

if __name__ == "__main__":

    user_intent = "Find the CEO, investments, and customer details for the company"

    columns_with_descriptions = [
        ("company_id", "The unique identifier of the company to retrieve executive, investment, and customer details for."),
        ("company_name", "The name of the company to identify and gather detailed information about."),
        ("company_website", "The domain of the company's website to assist in identifying the correct organization."),
        ("industry", "The primary industry the company operates in."),
        ("employee_count", "The exact number of employees at the company.")
    ]

    examples = [
        {
            "company_id": "comp_001",
            "company_name": "Parallel AI",
            "company_website": "parallel.ai",
            "industry": "AI",
            "employee_count": "25"
        },
        {
            "company_id": "comp_002",
            "company_name": "Google",
            "company_website": "google.com",
            "industry": "Software",
            "employee_count": "125000"
        }
    ]

    def get_suggested_task_spec(columns_with_descriptions, examples, title):
        all_valid_columns = {
            column_name: {
                "type": "string",
                "description": description
            }
            for column_name, description in columns_with_descriptions
        }

        return {
            "input_schema": {
                "type": "object",
                "properties": all_valid_columns
            },
            "output_schema": {
                "type": "object",
                "properties": {
                    "answer": {
                        "type": "string",
                        "description": "answer to the question"
                    }
                },
                "required": ["answer"],
            },
            "inputs": examples,
            "title": title
        }


    suggested_task_spec = get_suggested_task_spec(
        columns_with_descriptions=columns_with_descriptions,
        examples=examples,
        title="Company executive, investments, and customer details"
    )

    url = "https://api.parallel.ai/v1beta/tasks/suggest"
    headers = {
        "x-api-key": "PARALLEL_API_KEY",
        "Content-Type": "application/json"
    }
    data = {
        "user_intent": f"{user_intent}. Improve output_schema to include more descriptive fields, and only keep input fields that are relevant to answering the question.",
        "previous_task": suggested_task_spec
    }

    response = requests.post(url, headers=headers, json=data)
    result = response.json()
    print(json.dumps(result, indent=2))

End-to-End Ingest to Task Execution

The following Python script demonstrates the complete workflow of the Ingest API, from task suggestion to result retrieval:
#!/usr/bin/env python3
"""
End-to-end test script for Parallel Ingest API

This script demonstrates the complete workflow:
1. Suggest a task based on user intent
2. Suggest a processor for the task
3. Create and run the task
4. Retrieve the results

Usage:
    python test_ingest_api.py

Make sure to set your PARALLEL_API_KEY environment variable or update the script directly.
"""

import os
import requests
import json
import time
from typing import Dict, Any, Optional

# Configuration
API_KEY = "PARALLEL_API_KEY"
BASE_URL = "https://api.parallel.ai"

class IngestAPITester:
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "x-api-key": api_key,
            "Content-Type": "application/json"
        }

    def suggest_task(self, user_intent: str) -> Optional[Dict[str, Any]]:
        """Step 1: Suggest a task based on user intent"""
        print(f"🔍 Step 1: Suggesting task for intent: '{user_intent}'")

        url = f"{self.base_url}/v1beta/tasks/suggest"
        data = {"user_intent": user_intent}

        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()

            result = response.json()
            print("✅ Task suggestion successful!")
            print(f"   Title: {result.get('title', 'N/A')}")
            print(f"   Input schema: {json.dumps(result.get('input_schema', {}), indent=2)}")
            print(f"   Output schema: {json.dumps(result.get('output_schema', {}), indent=2)}")
            print()

            return result

        except requests.exceptions.RequestException as e:
            print(f"❌ Error suggesting task: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"   Response: {e.response.text}")
            return None

    def suggest_processor(self, task_spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Step 2: Suggest a processor for the task"""
        print("🔧 Step 2: Suggesting processor for the task")

        url = f"{self.base_url}/v1beta/tasks/suggest-processor"
        data = {
            "task_spec": task_spec,
            "choose_processors_from": ["lite", "core", "pro"]
        }

        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()

            result = response.json()
            print("✅ Processor suggestion successful!")

            # Extract the first recommended processor
            recommended_processors = result.get('recommended_processors', [])
            if recommended_processors:
                selected_processor = recommended_processors[0]
                print(f"   Recommended processors: {recommended_processors}")
                print(f"   Selected processor: {selected_processor}")
                result['selected_processor'] = selected_processor
            else:
                print("   ⚠️ No processors recommended, defaulting to 'core'")
                result['selected_processor'] = 'core'

            print(f"   Enhanced task spec received")
            print()

            return result

        except requests.exceptions.RequestException as e:
            print(f"❌ Error suggesting processor: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"   Response: {e.response.text}")
            return None

    def create_task_run(self, input_data: Any, processor: str = "core", task_spec: Optional[Dict] = None) -> Optional[str]:
        """Step 3: Create a task run"""
        print(f"🚀 Step 3: Creating task run with processor '{processor}'")

        url = f"{self.base_url}/v1/tasks/runs"
        data = {
            "input": input_data,
            "processor": processor
        }

        if task_spec:
            # Format the task_spec according to the documentation
            # Schemas need to be wrapped with type and json_schema fields
            formatted_task_spec = {}

            if "input_schema" in task_spec:
                formatted_task_spec["input_schema"] = {
                    "type": "json",
                    "json_schema": task_spec["input_schema"]
                }

            if "output_schema" in task_spec:
                formatted_task_spec["output_schema"] = {
                    "type": "json",
                    "json_schema": task_spec["output_schema"]
                }

            data["task_spec"] = formatted_task_spec

        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()

            result = response.json()
            run_id = result.get("run_id")
            status = result.get("status")

            print(f"✅ Task run created successfully!")
            print(f"   Run ID: {run_id}")
            print(f"   Status: {status}")
            print()

            return run_id

        except requests.exceptions.RequestException as e:
            print(f"❌ Error creating task run: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"   Response: {e.response.text}")
            return None

    def get_task_result(self, run_id: str, max_attempts: int = 30, wait_time: int = 10) -> Optional[Dict[str, Any]]:
        """Step 4: Get task results (with polling)"""
        print(f"📊 Step 4: Retrieving results for run {run_id}")

        url = f"{self.base_url}/v1/tasks/runs/{run_id}/result"
        headers = {"x-api-key": self.api_key}  # No Content-Type needed for GET

        for attempt in range(max_attempts):
            try:
                response = requests.get(url, headers=headers)

                if response.status_code == 200:
                    result = response.json()
                    status = result.get("run", {}).get("status")

                    if status == "completed":
                        print("✅ Task completed successfully!")
                        output = result.get("output", {})
                        print(f"   Content: {output.get('content', 'N/A')}")

                        # Show citations if available
                        citations = output.get("citations", [])
                        if citations:
                            print(f"   Citations: {len(citations)} sources")
                            for i, citation in enumerate(citations[:3], 1):  # Show first 3
                                print(f"     {i}. {citation}")

                        return result

                    elif status == "failed":
                        print("❌ Task failed!")
                        return result

                    else:
                        print(f"⏳ Task still {status}... (attempt {attempt + 1}/{max_attempts})")
                        time.sleep(wait_time)

                elif response.status_code == 404:
                    print(f"❌ Task run not found: {run_id}")
                    return None

                else:
                    response.raise_for_status()

            except requests.exceptions.RequestException as e:
                print(f"❌ Error getting task result: {e}")
                if hasattr(e, 'response') and e.response is not None:
                    print(f"   Response: {e.response.text}")
                return None

        print(f"⏰ Task did not complete within {max_attempts * wait_time} seconds")
        return None

    def run_end_to_end_test(self, user_intent: str, sample_input: Any):
        """Run the complete end-to-end test"""
        print("=" * 60)
        print("🧪 PARALLEL INGEST API - END-TO-END TEST")
        print("=" * 60)
        print()

        # Step 1: Suggest task
        task_suggestion = self.suggest_task(user_intent)
        if not task_suggestion:
            print("❌ Test failed at task suggestion step")
            return

        # Step 2: Suggest processor
        processor_suggestion = self.suggest_processor(task_suggestion)
        if not processor_suggestion:
            print("❌ Test failed at processor suggestion step")
            return

        # Step 3: Create task run
        selected_processor = processor_suggestion.get('selected_processor', 'core')
        run_id = self.create_task_run(
            input_data=sample_input,
            processor=selected_processor,
            task_spec=task_suggestion  # Use original task suggestion, not processor suggestion
        )
        if not run_id:
            print("❌ Test failed at task creation step")
            return

        # Step 4: Get results
        result = self.get_task_result(run_id)
        if result:
            print("🎉 End-to-end test completed successfully!")
        else:
            print("❌ Test failed at result retrieval step")


def main():
    """Main function to run the test"""

    # Check API key
    if API_KEY == "PARALLEL_API_KEY":
        print("⚠️  Please set your PARALLEL_API_KEY environment variable or update the script")
        print("   Example: export PARALLEL_API_KEY=your_actual_api_key")
        return

    # Initialize tester
    tester = IngestAPITester(API_KEY, BASE_URL)

    # Test configuration
    user_intent = "Given company_name and company_website, find the CEO information for technology companies"

    # Use object input that matches the expected schema
    sample_input = {
        "company_name": "Google",
        "company_website": "https://www.google.com"
    }

    # Run the test
    tester.run_end_to_end_test(user_intent, sample_input)


if __name__ == "__main__":
    main()
Running the Example
PARALLEL_API_KEY="PARALLEL_API_KEY" python3 ingest_script.py
This example demonstrates the complete workflow:
  1. Suggest Task: Generate a task specification from natural language intent
  2. Suggest Processor: Get processor recommendations and enhanced schemas
  3. Create Task Run: Submit the task for processing with proper schema formatting
  4. Get Results: Poll for completion and retrieve the final results
The script includes proper error handling, status polling, and demonstrates the correct format for task specifications required by the API.
I