API Overview

The Parallel Ingest API provides endpoints for creating intelligent task runs that can perform web research and data extraction. The API is built around a stateful architecture where task creation and result retrieval are separate operations.

Endpoints

Suggest Task

POST /beta/v1/suggest Generate a task specification based on user intent. This endpoint helps you create properly structured tasks by analyzing your requirements and suggesting appropriate schemas.

Request Parameters

ParameterTypeRequiredDescription
user_intentstringYesNatural language description of what you want to accomplish

Example Request

curl -X POST "https://api.parallel.ai/beta/v1/suggest" \
  -H "x-api-key: PARALLEL_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "user_intent": "Find the CEOs of tech companies"
  }'

Response Schema

FieldTypeDescription
input_schemaobjectJSON schema defining expected input structure
output_schemaobjectJSON schema defining expected output structure
inputsarraySample input data, if provided in the user intent
titlestringSuggested title for the task

Example Response

{
  "input_schema": {
    "type": "object",
    "properties": {
      "company_name": {
        "type": "string",
        "description": "Name of the company"
      }
    },
    "required": ["company_name"]
  },
  "output_schema": {
    "type": "object",
    "properties": {
      "ceo_name": {
        "type": "string",
        "description": "Current CEO of the company"
      },
      "appointed_date": {
        "type": "string",
        "description": "Date when the CEO was appointed"
      }
    }
  },
  "inputs": [],
  "title": "Find Company CEO Information"
}

Suggest Processor

POST /beta/v1/suggest-processor Enhance and optimize a task specification by suggesting the most appropriate processor and refining the schemas.

Request Parameters

ParameterTypeRequiredDescription
task_specobjectYesTask specification object to be processed
choose_processors_fromarrayYesList of processors to choose from. Available: [“lite”, “base”, “core”, “pro”, “ultra”]

Example Request

curl -X POST "https://api.parallel.ai/beta/v1/suggest-processor" \
  -H "x-api-key: PARALLEL_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "task_spec": {
      "input_schema": {
        "type": "object",
        "properties": {
          "company_name": {
            "type": "string"
          }
        }
      },
      "output_schema": {
        "type": "object",
        "properties": {
          "ceo_name": {
            "type": "string"
          }
        }
      }
    },
    "choose_processors_from": ["lite", "base", "core", "pro", "ultra"]
  }'

Response Schema

FieldTypeDescription
recommended_processorsarrayList of recommended processors in priority order. We recommend using the first element.
Returns an enhanced task specification with additional fields and optimizations.

Example Response

{
  "recommended_processors": ["pro"]
}

End-to-End Example

The following Python script demonstrates the complete workflow of the Ingest API, from task suggestion to result retrieval:
#!/usr/bin/env python3
"""
End-to-end test script for Parallel Ingest API

This script demonstrates the complete workflow:
1. Suggest a task based on user intent
2. Suggest a processor for the task
3. Create and run the task
4. Retrieve the results

Usage:
    python test_ingest_api.py

Make sure to set your PARALLEL_API_KEY environment variable or update the script directly.
"""

import os
import requests
import json
import time
from typing import Dict, Any, Optional

# Configuration
API_KEY = os.getenv("PARALLEL_API_KEY", "your-api-key-here")
BASE_URL = "https://api.parallel.ai"

class IngestAPITester:
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {
            "x-api-key": api_key,
            "Content-Type": "application/json"
        }

    def suggest_task(self, user_intent: str) -> Optional[Dict[str, Any]]:
        """Step 1: Suggest a task based on user intent"""
        print(f"🔍 Step 1: Suggesting task for intent: '{user_intent}'")

        url = f"{self.base_url}/beta/v1/suggest"
        data = {"user_intent": user_intent}

        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()

            result = response.json()
            print("✅ Task suggestion successful!")
            print(f"   Title: {result.get('title', 'N/A')}")
            print(f"   Input schema: {json.dumps(result.get('input_schema', {}), indent=2)}")
            print(f"   Output schema: {json.dumps(result.get('output_schema', {}), indent=2)}")
            print()

            return result

        except requests.exceptions.RequestException as e:
            print(f"❌ Error suggesting task: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"   Response: {e.response.text}")
            return None

    def suggest_processor(self, task_spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
        """Step 2: Suggest a processor for the task"""
        print("🔧 Step 2: Suggesting processor for the task")

        url = f"{self.base_url}/beta/v1/suggest-processor"
        data = {
            "task_spec": task_spec,
            "choose_processors_from": ["lite", "core", "pro"]
        }

        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()

            result = response.json()
            print("✅ Processor suggestion successful!")

            # Extract the first recommended processor
            recommended_processors = result.get('recommended_processors', [])
            if recommended_processors:
                selected_processor = recommended_processors[0]
                print(f"   Recommended processors: {recommended_processors}")
                print(f"   Selected processor: {selected_processor}")
                result['selected_processor'] = selected_processor
            else:
                print("   ⚠️ No processors recommended, defaulting to 'core'")
                result['selected_processor'] = 'core'

            print(f"   Enhanced task spec received")
            print()

            return result

        except requests.exceptions.RequestException as e:
            print(f"❌ Error suggesting processor: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"   Response: {e.response.text}")
            return None

    def create_task_run(self, input_data: Any, processor: str = "core", task_spec: Optional[Dict] = None) -> Optional[str]:
        """Step 3: Create a task run"""
        print(f"🚀 Step 3: Creating task run with processor '{processor}'")

        url = f"{self.base_url}/v1/tasks/runs"
        data = {
            "input": input_data,
            "processor": processor
        }

        if task_spec:
            # Format the task_spec according to the documentation
            # Schemas need to be wrapped with type and json_schema fields
            formatted_task_spec = {}

            if "input_schema" in task_spec:
                formatted_task_spec["input_schema"] = {
                    "type": "json",
                    "json_schema": task_spec["input_schema"]
                }

            if "output_schema" in task_spec:
                formatted_task_spec["output_schema"] = {
                    "type": "json",
                    "json_schema": task_spec["output_schema"]
                }

            data["task_spec"] = formatted_task_spec

        try:
            response = requests.post(url, headers=self.headers, json=data)
            response.raise_for_status()

            result = response.json()
            run_id = result.get("run_id")
            status = result.get("status")

            print(f"✅ Task run created successfully!")
            print(f"   Run ID: {run_id}")
            print(f"   Status: {status}")
            print()

            return run_id

        except requests.exceptions.RequestException as e:
            print(f"❌ Error creating task run: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"   Response: {e.response.text}")
            return None

    def get_task_result(self, run_id: str, max_attempts: int = 30, wait_time: int = 10) -> Optional[Dict[str, Any]]:
        """Step 4: Get task results (with polling)"""
        print(f"📊 Step 4: Retrieving results for run {run_id}")

        url = f"{self.base_url}/v1/tasks/runs/{run_id}/result"
        headers = {"x-api-key": self.api_key}  # No Content-Type needed for GET

        for attempt in range(max_attempts):
            try:
                response = requests.get(url, headers=headers)

                if response.status_code == 200:
                    result = response.json()
                    status = result.get("run", {}).get("status")

                    if status == "completed":
                        print("✅ Task completed successfully!")
                        output = result.get("output", {})
                        print(f"   Content: {output.get('content', 'N/A')}")

                        # Show citations if available
                        citations = output.get("citations", [])
                        if citations:
                            print(f"   Citations: {len(citations)} sources")
                            for i, citation in enumerate(citations[:3], 1):  # Show first 3
                                print(f"     {i}. {citation}")

                        return result

                    elif status == "failed":
                        print("❌ Task failed!")
                        return result

                    else:
                        print(f"⏳ Task still {status}... (attempt {attempt + 1}/{max_attempts})")
                        time.sleep(wait_time)

                elif response.status_code == 404:
                    print(f"❌ Task run not found: {run_id}")
                    return None

                else:
                    response.raise_for_status()

            except requests.exceptions.RequestException as e:
                print(f"❌ Error getting task result: {e}")
                if hasattr(e, 'response') and e.response is not None:
                    print(f"   Response: {e.response.text}")
                return None

        print(f"⏰ Task did not complete within {max_attempts * wait_time} seconds")
        return None

    def run_end_to_end_test(self, user_intent: str, sample_input: Any):
        """Run the complete end-to-end test"""
        print("=" * 60)
        print("🧪 PARALLEL INGEST API - END-TO-END TEST")
        print("=" * 60)
        print()

        # Step 1: Suggest task
        task_suggestion = self.suggest_task(user_intent)
        if not task_suggestion:
            print("❌ Test failed at task suggestion step")
            return

        # Step 2: Suggest processor
        processor_suggestion = self.suggest_processor(task_suggestion)
        if not processor_suggestion:
            print("❌ Test failed at processor suggestion step")
            return

        # Step 3: Create task run
        selected_processor = processor_suggestion.get('selected_processor', 'core')
        run_id = self.create_task_run(
            input_data=sample_input,
            processor=selected_processor,
            task_spec=task_suggestion  # Use original task suggestion, not processor suggestion
        )
        if not run_id:
            print("❌ Test failed at task creation step")
            return

        # Step 4: Get results
        result = self.get_task_result(run_id)
        if result:
            print("🎉 End-to-end test completed successfully!")
        else:
            print("❌ Test failed at result retrieval step")


def main():
    """Main function to run the test"""

    # Check API key
    if API_KEY == "your-api-key-here":
        print("⚠️  Please set your PARALLEL_API_KEY environment variable or update the script")
        print("   Example: export PARALLEL_API_KEY=your_actual_api_key")
        return

    # Initialize tester
    tester = IngestAPITester(API_KEY, BASE_URL)

    # Test configuration
    user_intent = "Given company_name and company_website, find the CEO information for technology companies"

    # Use object input that matches the expected schema
    sample_input = {
        "company_name": "Google",
        "company_website": "https://www.google.com"
    }

    # Run the test
    tester.run_end_to_end_test(user_intent, sample_input)


if __name__ == "__main__":
    main()

Running the Example

PARALLEL_API_KEY="your_api_key_here" python3 ingest_script.py
This example demonstrates the complete workflow:
  1. Suggest Task: Generate a task specification from natural language intent
  2. Suggest Processor: Get processor recommendations and enhanced schemas
  3. Create Task Run: Submit the task for processing with proper schema formatting
  4. Get Results: Poll for completion and retrieve the final results
The script includes proper error handling, status polling, and demonstrates the correct format for task specifications required by the API.