Introduction
Core Concepts
New Products
Task Group API
Batch process Tasks at scale with the Parallel Task Group API
The Parallel Task Group API enables you to batch process hundreds or thousands of Tasks efficiently. Instead of running Tasks one by one, you can organize them into groups, monitor their progress collectively, and retrieve results in bulk.
Additional features include:
- Monitor progress across all Tasks in a group
- Stream results as they become available
- Handle failures gracefully at the group level
Key Concepts
Task Groups
A Task Group is a container that organizes multiple task runs. Each group has:
- A unique
taskgroup_id
for identification - A status indicating overall progress
- The ability to add new Tasks dynamically
Group Status
Track progress with real-time status updates:
- Total number of task runs
- Count of runs by status (queued, running, completed, failed)
- Whether the group is still active
- Human-readable status messages
Quick Start
1. Define Types and Task Structure
import asyncio
import typing
import parallel
import pydantic
from parallel.types import JsonSchemaParam, TaskRun, TaskSpecParam
from parallel.types.task_run_result import OutputTaskRunJsonOutput
# Define your input and output models
class CompanyInput(pydantic.BaseModel):
company_name: str = pydantic.Field(description="Name of the company")
company_website: str = pydantic.Field(description="Company website URL")
class CompanyOutput(pydantic.BaseModel):
key_insights: list[str] = pydantic.Field(description="Key business insights")
market_position: str = pydantic.Field(description="Market positioning analysis")
# Define Group API types (these will be added to the Parallel SDK in a future release)
class TaskRunInputParam(parallel.BaseModel):
task_spec: TaskSpecParam | None = pydantic.Field(default=None)
input: str | dict[str, str] = pydantic.Field(description="Input to the task")
metadata: dict[str, str] | None = pydantic.Field(default=None)
processor: str = pydantic.Field(description="Processor to use for the task")
class TaskGroupStatus(parallel.BaseModel):
num_task_runs: int = pydantic.Field(description="Number of task runs in the group")
task_run_status_counts: dict[str, int] = pydantic.Field(
description="Number of task runs with each status"
)
is_active: bool = pydantic.Field(
description="True if at least one run in the group is currently active"
)
status_message: str | None = pydantic.Field(
description="Human-readable status message for the group"
)
class TaskGroupRunRequest(parallel.BaseModel):
default_task_spec: TaskSpecParam | None = pydantic.Field(default=None)
inputs: list[TaskRunInputParam] = pydantic.Field(description="List of task runs to execute")
class TaskGroupResponse(parallel.BaseModel):
taskgroup_id: str = pydantic.Field(description="ID of the group")
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
class TaskGroupRunResponse(parallel.BaseModel):
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
run_ids: list[str] = pydantic.Field(description="IDs of the newly created runs")
class TaskRunEvent(parallel.BaseModel):
type: typing.Literal["task_run"] = pydantic.Field(default="task_run")
event_id: str = pydantic.Field(description="Cursor to resume the event stream")
run: TaskRun = pydantic.Field(description="Task run object")
input: TaskRunInputParam | None = pydantic.Field(default=None)
output: OutputTaskRunJsonOutput | None = pydantic.Field(default=None)
class Error(parallel.BaseModel):
ref_id: str = pydantic.Field(description="Reference ID for the error")
message: str = pydantic.Field(description="Human-readable message")
detail: dict[str, typing.Any] | None = pydantic.Field(default=None)
class ErrorResponse(parallel.BaseModel):
type: typing.Literal["error"] = pydantic.Field(default="error")
error: Error = pydantic.Field(description="Error")
# Create reusable task specification
task_spec = TaskSpecParam(
input_schema=JsonSchemaParam(json_schema=CompanyInput.model_json_schema()),
output_schema=JsonSchemaParam(json_schema=CompanyOutput.model_json_schema()),
)
2. Create a Task Group
# Initialize the client
client = parallel.AsyncParallel(
base_url="https://beta.parallel.ai",
api_key="your-api-key",
)
# Create a new task group
group_response = await client.post(
path="/v1beta/tasks/groups",
cast_to=TaskGroupResponse,
body={}
)
taskgroup_id = group_response.taskgroup_id
print(f"Created task group: {taskgroup_id}")
3. Add Tasks to the Group
# Prepare your inputs
companies = [
{"company_name": "Acme Corp", "company_website": "https://acme.com"},
{"company_name": "TechStart", "company_website": "https://techstart.io"},
# ... more companies
]
# Create task run inputs
run_inputs = []
for company in companies:
input_data = CompanyInput(
company_name=company["company_name"],
company_website=company["company_website"]
)
run_input = TaskRunInputParam(
input=input_data.model_dump(),
processor="pro"
)
run_inputs.append(run_input)
# Add runs to the group
run_request = TaskGroupRunRequest(
default_task_spec=task_spec,
inputs=run_inputs
)
response = await client.post(
path=f"/v1beta/tasks/groups/{taskgroup_id}/runs",
cast_to=TaskGroupRunResponse,
body=run_request.model_dump()
)
print(f"Added {len(response.run_ids)} Tasks to group")
4. Monitor Progress
import asyncio
async def wait_for_completion(client: parallel.AsyncParallel, taskgroup_id: str) -> None:
while True:
response = await client.get(
path=f"/v1beta/tasks/groups/{taskgroup_id}",
cast_to=TaskGroupResponse,
)
status = response.status
print(f"Status: {status.task_run_status_counts}")
if not status.is_active:
print("All tasks completed!")
break
await asyncio.sleep(10)
await wait_for_completion(client, taskgroup_id)
5. Retrieve Results
# Stream all results from the group
async def get_all_results(client: parallel.AsyncParallel, taskgroup_id: str):
results = []
path = f"/v1beta/tasks/groups/{taskgroup_id}/runs"
path += "?include_input=true&include_output=true"
result_stream = await client.get(
path=path,
cast_to=TaskRunEvent | ErrorResponse | None,
stream=True,
stream_cls=parallel.AsyncStream[TaskRunEvent | ErrorResponse],
)
async for event in result_stream:
if isinstance(event, TaskRunEvent) and event.output:
company_input = CompanyInput.model_validate(event.input.input)
company_output = CompanyOutput.model_validate(event.output.content)
results.append(
{
"company": company_input.company_name,
"insights": company_output.key_insights,
"market_position": company_output.market_position,
}
)
return results
results = await get_all_results(client, taskgroup_id)
print(f"Processed {len(results)} companies successfully")
Batch Processing Pattern
For large datasets, process Tasks in batches to optimize performance:
async def process_companies_in_batches(
client: parallel.AsyncParallel,
taskgroup_id: str,
companies: list[dict[str, str]],
batch_size: int = 500,
) -> None:
total_created = 0
for i in range(0, len(companies), batch_size):
batch = companies[i : i + batch_size]
# Create run inputs for this batch
run_inputs = []
for company in batch:
input_data = CompanyInput(
company_name=company["company_name"],
company_website=company["company_website"],
)
run_inputs.append(
TaskRunInputParam(input=input_data.model_dump(),
processor="pro"),
)
# Add batch to group
run_request = TaskGroupRunRequest(
default_task_spec=task_spec, inputs=run_inputs
)
response = await client.post(
path=f"/v1beta/tasks/groups/{taskgroup_id}/runs",
cast_to=TaskGroupRunResponse,
body=run_request.model_dump(),
)
total_created += len(response.run_ids)
print(f"Processed {i + len(batch)} companies. Created {total_created} Tasks.")
Error Handling
The Group API provides robust error handling:
async def process_with_error_handling(client: parallel.AsyncParallel, taskgroup_id: str) -> tuple[list[TaskRunEvent], list[ErrorResponse]]:
successful_results = []
failed_results = []
path = f"/v1beta/tasks/groups/{taskgroup_id}/runs"
path += "?include_input=true&include_output=true"
result_stream = await client.get(
path=path,
cast_to=TaskRunEvent | ErrorResponse | None,
stream=True,
stream_cls=AsyncStream[TaskRunEvent | ErrorResponse]
)
async for event in result_stream:
if isinstance(event, ErrorResponse):
failed_results.append(event)
continue
try:
# Validate the result
company_input = CompanyInput.model_validate(event.input.input)
company_output = CompanyOutput.model_validate(event.output.content)
successful_results.append(event)
except Exception as e:
print(f"Validation error: {e}")
failed_results.append(event)
print(f"Success: {len(successful_results)}, Failed: {len(failed_results)}")
return successful_results, failed_results
API Reference
Create Task Group
POST /v1beta/tasks/groups
Response:
{
"taskgroup_id": "tgrp_abc123",
"status": {
"num_task_runs": 0,
"task_run_status_counts": {},
"is_active": false,
}
}
Add Runs to Group
POST /v1beta/tasks/groups/{taskgroup_id}/runs
Get Group Status
GET /v1beta/tasks/groups/{taskgroup_id}
Stream Group Results
Task runs are returned in the order they were added to the group. Completed tasks
include output, while incomplete tasks include run status and null
output.
GET /v1beta/tasks/groups/{taskgroup_id}/runs?include_input=true&include_output=true
Stream Group Events
Group events include status updates and task results as they complete (not included in the sample code above).
GET /v1beta/tasks/groups/{taskgroup_id}/events
Complete Example
Here’s a complete Python script that demonstrates the full workflow, including all of the setup code above.
import asyncio
import typing
import parallel
import pydantic
from parallel.types import JsonSchemaParam, TaskRun, TaskSpecParam
from parallel.types.task_run_result import OutputTaskRunJsonOutput
# Define your input and output models
class CompanyInput(pydantic.BaseModel):
company_name: str = pydantic.Field(description="Name of the company")
company_website: str = pydantic.Field(description="Company website URL")
class CompanyOutput(pydantic.BaseModel):
key_insights: list[str] = pydantic.Field(description="Key business insights")
market_position: str = pydantic.Field(description="Market positioning analysis")
# Define Group API types (these will be added to the Parallel SDK in a future release)
class TaskRunInputParam(parallel.BaseModel):
task_spec: TaskSpecParam | None = pydantic.Field(default=None)
input: str | dict[str, str] = pydantic.Field(description="Input to the task")
metadata: dict[str, str] | None = pydantic.Field(default=None)
processor: str = pydantic.Field(description="Processor to use for the task")
class TaskGroupStatus(parallel.BaseModel):
num_task_runs: int = pydantic.Field(description="Number of task runs in the group")
task_run_status_counts: dict[str, int] = pydantic.Field(
description="Number of task runs with each status"
)
is_active: bool = pydantic.Field(
description="True if at least one run in the group is currently active"
)
status_message: str | None = pydantic.Field(
description="Human-readable status message for the group"
)
class TaskGroupRunRequest(parallel.BaseModel):
default_task_spec: TaskSpecParam | None = pydantic.Field(default=None)
inputs: list[TaskRunInputParam] = pydantic.Field(description="List of task runs to execute")
class TaskGroupResponse(parallel.BaseModel):
taskgroup_id: str = pydantic.Field(description="ID of the group")
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
class TaskGroupRunResponse(parallel.BaseModel):
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
run_ids: list[str] = pydantic.Field(description="IDs of the newly created runs")
class TaskRunEvent(parallel.BaseModel):
type: typing.Literal["task_run"] = pydantic.Field(default="task_run")
event_id: str = pydantic.Field(description="Cursor to resume the event stream")
run: TaskRun = pydantic.Field(description="Task run object")
input: TaskRunInputParam | None = pydantic.Field(default=None)
output: OutputTaskRunJsonOutput | None = pydantic.Field(default=None)
class Error(parallel.BaseModel):
ref_id: str = pydantic.Field(description="Reference ID for the error")
message: str = pydantic.Field(description="Human-readable message")
detail: dict[str, typing.Any] | None = pydantic.Field(default=None)
class ErrorResponse(parallel.BaseModel):
type: typing.Literal["error"] = pydantic.Field(default="error")
error: Error = pydantic.Field(description="Error")
# Create reusable task specification
task_spec = TaskSpecParam(
input_schema=JsonSchemaParam(json_schema=CompanyInput.model_json_schema()),
output_schema=JsonSchemaParam(json_schema=CompanyOutput.model_json_schema()),
)
async def wait_for_completion(client: parallel.AsyncParallel, taskgroup_id: str) -> None:
while True:
response = await client.get(
path=f"/v1beta/tasks/groups/{taskgroup_id}", cast_to=TaskGroupResponse
)
status = response.status
print(f"Status: {status.task_run_status_counts}")
if not status.is_active:
print("All tasks completed!")
break
await asyncio.sleep(10)
async def get_all_results(client: parallel.AsyncParallel, taskgroup_id: str):
results = []
path = f"/v1beta/tasks/groups/{taskgroup_id}/runs"
path += "?include_input=true&include_output=true"
result_stream = await client.get(
path=path,
cast_to=TaskRunEvent | ErrorResponse | None,
stream=True,
stream_cls=parallel.AsyncStream[TaskRunEvent | ErrorResponse],
)
async for event in result_stream:
if isinstance(event, TaskRunEvent) and event.output:
company_input = CompanyInput.model_validate(event.input.input)
company_output = CompanyOutput.model_validate(event.output.content)
results.append(
{
"company": company_input.company_name,
"insights": company_output.key_insights,
"market_position": company_output.market_position,
}
)
return results
async def batch_company_research():
client = parallel.AsyncParallel(
base_url="https://beta.parallel.ai",
api_key="your-api-key",
)
# Create task group
group_response = await client.post(
path="/v1beta/tasks/groups", cast_to=TaskGroupResponse, body={}
)
taskgroup_id = group_response.taskgroup_id
print(f"Created taskgroup id {taskgroup_id}")
# Define companies to research
companies = [
{"company_name": "Stripe", "company_website": "https://stripe.com"},
{"company_name": "Shopify", "company_website": "https://shopify.com"},
{"company_name": "Salesforce", "company_website": "https://salesforce.com"},
]
# Add Tasks to group
run_inputs = []
for company in companies:
input_data = CompanyInput(
company_name=company["company_name"],
company_website=company["company_website"],
)
run_inputs.append(
TaskRunInputParam(input=input_data.model_dump(), processor="pro")
)
response = await client.post(
path=f"/v1beta/tasks/groups/{taskgroup_id}/runs",
cast_to=TaskGroupRunResponse,
body=TaskGroupRunRequest(
default_task_spec=task_spec, inputs=run_inputs
).model_dump(),
)
print(f"Added {len(response.run_ids)} runs to taskgroup {taskgroup_id}")
# Wait for completion and get results
await wait_for_completion(client, taskgroup_id)
results = await get_all_results(client, taskgroup_id)
print(f"Successfully processed {len(results)} companies")
return results
# Run the batch job
results = asyncio.run(batch_company_research())
- Key Concepts
- Task Groups
- Group Status
- Quick Start
- 1. Define Types and Task Structure
- 2. Create a Task Group
- 3. Add Tasks to the Group
- 4. Monitor Progress
- 5. Retrieve Results
- Batch Processing Pattern
- Error Handling
- API Reference
- Create Task Group
- Add Runs to Group
- Get Group Status
- Stream Group Results
- Stream Group Events
- Complete Example