#!/usr/bin/env python3
"""
End-to-end test script for Parallel Ingest API
This script demonstrates the complete workflow:
1. Suggest a task based on user intent
2. Suggest a processor for the task
3. Create and run the task
4. Retrieve the results
Usage:
python test_ingest_api.py
Make sure to set your PARALLEL_API_KEY environment variable or update the script directly.
"""
import os
import requests
import json
import time
from typing import Dict, Any, Optional
# Configuration
API_KEY = os.getenv("PARALLEL_API_KEY", "your-api-key-here")
BASE_URL = "https://api.parallel.ai"
class IngestAPITester:
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.headers = {
"x-api-key": api_key,
"Content-Type": "application/json"
}
def suggest_task(self, user_intent: str) -> Optional[Dict[str, Any]]:
"""Step 1: Suggest a task based on user intent"""
print(f"🔍 Step 1: Suggesting task for intent: '{user_intent}'")
url = f"{self.base_url}/beta/v1/suggest"
data = {"user_intent": user_intent}
try:
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
result = response.json()
print("✅ Task suggestion successful!")
print(f" Title: {result.get('title', 'N/A')}")
print(f" Input schema: {json.dumps(result.get('input_schema', {}), indent=2)}")
print(f" Output schema: {json.dumps(result.get('output_schema', {}), indent=2)}")
print()
return result
except requests.exceptions.RequestException as e:
print(f"❌ Error suggesting task: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
return None
def suggest_processor(self, task_spec: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Step 2: Suggest a processor for the task"""
print("🔧 Step 2: Suggesting processor for the task")
url = f"{self.base_url}/beta/v1/suggest-processor"
data = {
"task_spec": task_spec,
"choose_processors_from": ["lite", "core", "pro"]
}
try:
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
result = response.json()
print("✅ Processor suggestion successful!")
# Extract the first recommended processor
recommended_processors = result.get('recommended_processors', [])
if recommended_processors:
selected_processor = recommended_processors[0]
print(f" Recommended processors: {recommended_processors}")
print(f" Selected processor: {selected_processor}")
result['selected_processor'] = selected_processor
else:
print(" ⚠️ No processors recommended, defaulting to 'core'")
result['selected_processor'] = 'core'
print(f" Enhanced task spec received")
print()
return result
except requests.exceptions.RequestException as e:
print(f"❌ Error suggesting processor: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
return None
def create_task_run(self, input_data: Any, processor: str = "core", task_spec: Optional[Dict] = None) -> Optional[str]:
"""Step 3: Create a task run"""
print(f"🚀 Step 3: Creating task run with processor '{processor}'")
url = f"{self.base_url}/v1/tasks/runs"
data = {
"input": input_data,
"processor": processor
}
if task_spec:
# Format the task_spec according to the documentation
# Schemas need to be wrapped with type and json_schema fields
formatted_task_spec = {}
if "input_schema" in task_spec:
formatted_task_spec["input_schema"] = {
"type": "json",
"json_schema": task_spec["input_schema"]
}
if "output_schema" in task_spec:
formatted_task_spec["output_schema"] = {
"type": "json",
"json_schema": task_spec["output_schema"]
}
data["task_spec"] = formatted_task_spec
try:
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
result = response.json()
run_id = result.get("run_id")
status = result.get("status")
print(f"✅ Task run created successfully!")
print(f" Run ID: {run_id}")
print(f" Status: {status}")
print()
return run_id
except requests.exceptions.RequestException as e:
print(f"❌ Error creating task run: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
return None
def get_task_result(self, run_id: str, max_attempts: int = 30, wait_time: int = 10) -> Optional[Dict[str, Any]]:
"""Step 4: Get task results (with polling)"""
print(f"📊 Step 4: Retrieving results for run {run_id}")
url = f"{self.base_url}/v1/tasks/runs/{run_id}/result"
headers = {"x-api-key": self.api_key} # No Content-Type needed for GET
for attempt in range(max_attempts):
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
result = response.json()
status = result.get("run", {}).get("status")
if status == "completed":
print("✅ Task completed successfully!")
output = result.get("output", {})
print(f" Content: {output.get('content', 'N/A')}")
# Show citations if available
citations = output.get("citations", [])
if citations:
print(f" Citations: {len(citations)} sources")
for i, citation in enumerate(citations[:3], 1): # Show first 3
print(f" {i}. {citation}")
return result
elif status == "failed":
print("❌ Task failed!")
return result
else:
print(f"⏳ Task still {status}... (attempt {attempt + 1}/{max_attempts})")
time.sleep(wait_time)
elif response.status_code == 404:
print(f"❌ Task run not found: {run_id}")
return None
else:
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"❌ Error getting task result: {e}")
if hasattr(e, 'response') and e.response is not None:
print(f" Response: {e.response.text}")
return None
print(f"⏰ Task did not complete within {max_attempts * wait_time} seconds")
return None
def run_end_to_end_test(self, user_intent: str, sample_input: Any):
"""Run the complete end-to-end test"""
print("=" * 60)
print("🧪 PARALLEL INGEST API - END-TO-END TEST")
print("=" * 60)
print()
# Step 1: Suggest task
task_suggestion = self.suggest_task(user_intent)
if not task_suggestion:
print("❌ Test failed at task suggestion step")
return
# Step 2: Suggest processor
processor_suggestion = self.suggest_processor(task_suggestion)
if not processor_suggestion:
print("❌ Test failed at processor suggestion step")
return
# Step 3: Create task run
selected_processor = processor_suggestion.get('selected_processor', 'core')
run_id = self.create_task_run(
input_data=sample_input,
processor=selected_processor,
task_spec=task_suggestion # Use original task suggestion, not processor suggestion
)
if not run_id:
print("❌ Test failed at task creation step")
return
# Step 4: Get results
result = self.get_task_result(run_id)
if result:
print("🎉 End-to-end test completed successfully!")
else:
print("❌ Test failed at result retrieval step")
def main():
"""Main function to run the test"""
# Check API key
if API_KEY == "your-api-key-here":
print("⚠️ Please set your PARALLEL_API_KEY environment variable or update the script")
print(" Example: export PARALLEL_API_KEY=your_actual_api_key")
return
# Initialize tester
tester = IngestAPITester(API_KEY, BASE_URL)
# Test configuration
user_intent = "Given company_name and company_website, find the CEO information for technology companies"
# Use object input that matches the expected schema
sample_input = {
"company_name": "Google",
"company_website": "https://www.google.com"
}
# Run the test
tester.run_end_to_end_test(user_intent, sample_input)
if __name__ == "__main__":
main()