Batch process Tasks at scale with the Parallel Task Group API
taskgroup_id
for identificationimport asyncio
import typing
import parallel
import pydantic
from parallel.types import JsonSchemaParam, TaskRun, TaskSpecParam
from parallel.types.task_run_result import OutputTaskRunJsonOutput
# Define your input and output models
class CompanyInput(pydantic.BaseModel):
company_name: str = pydantic.Field(description="Name of the company")
company_website: str = pydantic.Field(description="Company website URL")
class CompanyOutput(pydantic.BaseModel):
key_insights: list[str] = pydantic.Field(description="Key business insights")
market_position: str = pydantic.Field(description="Market positioning analysis")
# Define Group API types (these will be added to the Parallel SDK in a future release)
class TaskRunInputParam(parallel.BaseModel):
task_spec: TaskSpecParam | None = pydantic.Field(default=None)
input: str | dict[str, str] = pydantic.Field(description="Input to the task")
metadata: dict[str, str] | None = pydantic.Field(default=None)
processor: str = pydantic.Field(description="Processor to use for the task")
class TaskGroupStatus(parallel.BaseModel):
num_task_runs: int = pydantic.Field(description="Number of task runs in the group")
task_run_status_counts: dict[str, int] = pydantic.Field(
description="Number of task runs with each status"
)
is_active: bool = pydantic.Field(
description="True if at least one run in the group is currently active"
)
status_message: str | None = pydantic.Field(
description="Human-readable status message for the group"
)
class TaskGroupRunRequest(parallel.BaseModel):
default_task_spec: TaskSpecParam | None = pydantic.Field(default=None)
inputs: list[TaskRunInputParam] = pydantic.Field(description="List of task runs to execute")
class TaskGroupResponse(parallel.BaseModel):
taskgroup_id: str = pydantic.Field(description="ID of the group")
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
class TaskGroupRunResponse(parallel.BaseModel):
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
run_ids: list[str] = pydantic.Field(description="IDs of the newly created runs")
class TaskRunEvent(parallel.BaseModel):
type: typing.Literal["task_run"] = pydantic.Field(default="task_run")
event_id: str = pydantic.Field(description="Cursor to resume the event stream")
run: TaskRun = pydantic.Field(description="Task run object")
input: TaskRunInputParam | None = pydantic.Field(default=None)
output: OutputTaskRunJsonOutput | None = pydantic.Field(default=None)
class Error(parallel.BaseModel):
ref_id: str = pydantic.Field(description="Reference ID for the error")
message: str = pydantic.Field(description="Human-readable message")
detail: dict[str, typing.Any] | None = pydantic.Field(default=None)
class ErrorResponse(parallel.BaseModel):
type: typing.Literal["error"] = pydantic.Field(default="error")
error: Error = pydantic.Field(description="Error")
# Create reusable task specification
task_spec = TaskSpecParam(
input_schema=JsonSchemaParam(json_schema=CompanyInput.model_json_schema()),
output_schema=JsonSchemaParam(json_schema=CompanyOutput.model_json_schema()),
)
curl --request POST \
--url https://api.parallel.ai/v1beta/tasks/groups \
--header 'Content-Type: application/json' \
--header 'x-api-key: ${X_API_KEY}' \
--data '{}'
curl --request POST \
--url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID}/runs \
--header 'Content-Type: application/json' \
--header 'x-api-key: ${X_API_KEY}' \
--data '{
"inputs": [
{
"task_spec": {
"output_schema": "The founding date of the entity in the format MM-YYYY"
},
"input": "United Nations",
"processor": "core"
}
]
}'
# Get status of the group
curl --request GET \
--url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID} \
--header 'x-api-key: ${X_API_KEY}'
# Get status of all runs in the group
curl --request GET \
--no-buffer \
--url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID}/runs \
--header 'x-api-key: ${X_API_KEY}'
curl --request GET \
--no-buffer \
--url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID}/events \
--header 'x-api-key: ${X_API_KEY}'
async def process_companies_in_batches(
client: parallel.AsyncParallel,
taskgroup_id: str,
companies: list[dict[str, str]],
batch_size: int = 500,
) -> None:
total_created = 0
for i in range(0, len(companies), batch_size):
batch = companies[i : i + batch_size]
# Create run inputs for this batch
run_inputs = []
for company in batch:
input_data = CompanyInput(
company_name=company["company_name"],
company_website=company["company_website"],
)
run_inputs.append(
TaskRunInputParam(input=input_data.model_dump(),
processor="pro"),
)
# Add batch to group
run_request = TaskGroupRunRequest(
default_task_spec=task_spec, inputs=run_inputs
)
response = await client.post(
path=f"/v1beta/tasks/groups/{taskgroup_id}/runs",
cast_to=TaskGroupRunResponse,
body=run_request.model_dump(),
)
total_created += len(response.run_ids)
print(f"Processed {i + len(batch)} companies. Created {total_created} Tasks.")
async def process_with_error_handling(client: parallel.AsyncParallel, taskgroup_id: str) -> tuple[list[TaskRunEvent], list[ErrorResponse]]:
successful_results = []
failed_results = []
path = f"/v1beta/tasks/groups/{taskgroup_id}/runs"
path += "?include_input=true&include_output=true"
result_stream = await client.get(
path=path,
cast_to=TaskRunEvent | ErrorResponse | None,
stream=True,
stream_cls=AsyncStream[TaskRunEvent | ErrorResponse]
)
async for event in result_stream:
if isinstance(event, ErrorResponse):
failed_results.append(event)
continue
try:
# Validate the result
company_input = CompanyInput.model_validate(event.input.input)
company_output = CompanyOutput.model_validate(event.output.content)
successful_results.append(event)
except Exception as e:
print(f"Validation error: {e}")
failed_results.append(event)
print(f"Success: {len(successful_results)}, Failed: {len(failed_results)}")
return successful_results, failed_results
POST /v1beta/tasks/groups
{
"taskgroup_id": "tgrp_abc123",
"status": {
"num_task_runs": 0,
"task_run_status_counts": {},
"is_active": false,
}
}
POST /v1beta/tasks/groups/{taskgroup_id}/runs
GET /v1beta/tasks/groups/{taskgroup_id}
null
output.
GET /v1beta/tasks/groups/{taskgroup_id}/runs?include_input=true&include_output=true
GET /v1beta/tasks/groups/{taskgroup_id}/events
import asyncio
import typing
import parallel
import pydantic
from parallel.types import JsonSchemaParam, TaskRun, TaskSpecParam
from parallel.types.task_run_result import OutputTaskRunJsonOutput
# Define your input and output models
class CompanyInput(pydantic.BaseModel):
company_name: str = pydantic.Field(description="Name of the company")
company_website: str = pydantic.Field(description="Company website URL")
class CompanyOutput(pydantic.BaseModel):
key_insights: list[str] = pydantic.Field(description="Key business insights")
market_position: str = pydantic.Field(description="Market positioning analysis")
# Define Group API types (these will be added to the Parallel SDK in a future release)
class TaskRunInputParam(parallel.BaseModel):
task_spec: TaskSpecParam | None = pydantic.Field(default=None)
input: str | dict[str, str] = pydantic.Field(description="Input to the task")
metadata: dict[str, str] | None = pydantic.Field(default=None)
processor: str = pydantic.Field(description="Processor to use for the task")
class TaskGroupStatus(parallel.BaseModel):
num_task_runs: int = pydantic.Field(description="Number of task runs in the group")
task_run_status_counts: dict[str, int] = pydantic.Field(
description="Number of task runs with each status"
)
is_active: bool = pydantic.Field(
description="True if at least one run in the group is currently active"
)
status_message: str | None = pydantic.Field(
description="Human-readable status message for the group"
)
class TaskGroupRunRequest(parallel.BaseModel):
default_task_spec: TaskSpecParam | None = pydantic.Field(default=None)
inputs: list[TaskRunInputParam] = pydantic.Field(description="List of task runs to execute")
class TaskGroupResponse(parallel.BaseModel):
taskgroup_id: str = pydantic.Field(description="ID of the group")
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
class TaskGroupRunResponse(parallel.BaseModel):
status: TaskGroupStatus = pydantic.Field(description="Status of the group")
run_ids: list[str] = pydantic.Field(description="IDs of the newly created runs")
class TaskRunEvent(parallel.BaseModel):
type: typing.Literal["task_run"] = pydantic.Field(default="task_run")
event_id: str = pydantic.Field(description="Cursor to resume the event stream")
run: TaskRun = pydantic.Field(description="Task run object")
input: TaskRunInputParam | None = pydantic.Field(default=None)
output: OutputTaskRunJsonOutput | None = pydantic.Field(default=None)
class Error(parallel.BaseModel):
ref_id: str = pydantic.Field(description="Reference ID for the error")
message: str = pydantic.Field(description="Human-readable message")
detail: dict[str, typing.Any] | None = pydantic.Field(default=None)
class ErrorResponse(parallel.BaseModel):
type: typing.Literal["error"] = pydantic.Field(default="error")
error: Error = pydantic.Field(description="Error")
# Create reusable task specification
task_spec = TaskSpecParam(
input_schema=JsonSchemaParam(json_schema=CompanyInput.model_json_schema()),
output_schema=JsonSchemaParam(json_schema=CompanyOutput.model_json_schema()),
)
async def wait_for_completion(client: parallel.AsyncParallel, taskgroup_id: str) -> None:
while True:
response = await client.get(
path=f"/v1beta/tasks/groups/{taskgroup_id}", cast_to=TaskGroupResponse
)
status = response.status
print(f"Status: {status.task_run_status_counts}")
if not status.is_active:
print("All tasks completed!")
break
await asyncio.sleep(10)
async def get_all_results(client: parallel.AsyncParallel, taskgroup_id: str):
results = []
path = f"/v1beta/tasks/groups/{taskgroup_id}/runs"
path += "?include_input=true&include_output=true"
result_stream = await client.get(
path=path,
cast_to=TaskRunEvent | ErrorResponse | None,
stream=True,
stream_cls=parallel.AsyncStream[TaskRunEvent | ErrorResponse],
)
async for event in result_stream:
if isinstance(event, TaskRunEvent) and event.output:
company_input = CompanyInput.model_validate(event.input.input)
company_output = CompanyOutput.model_validate(event.output.content)
results.append(
{
"company": company_input.company_name,
"insights": company_output.key_insights,
"market_position": company_output.market_position,
}
)
return results
async def batch_company_research():
client = parallel.AsyncParallel(
base_url="https://api.parallel.ai",
api_key="your-api-key",
)
# Create task group
group_response = await client.post(
path="/v1beta/tasks/groups", cast_to=TaskGroupResponse, body={}
)
taskgroup_id = group_response.taskgroup_id
print(f"Created taskgroup id {taskgroup_id}")
# Define companies to research
companies = [
{"company_name": "Stripe", "company_website": "https://stripe.com"},
{"company_name": "Shopify", "company_website": "https://shopify.com"},
{"company_name": "Salesforce", "company_website": "https://salesforce.com"},
]
# Add Tasks to group
run_inputs = []
for company in companies:
input_data = CompanyInput(
company_name=company["company_name"],
company_website=company["company_website"],
)
run_inputs.append(
TaskRunInputParam(input=input_data.model_dump(), processor="pro")
)
response = await client.post(
path=f"/v1beta/tasks/groups/{taskgroup_id}/runs",
cast_to=TaskGroupRunResponse,
body=TaskGroupRunRequest(
default_task_spec=task_spec, inputs=run_inputs
).model_dump(),
)
print(f"Added {len(response.run_ids)} runs to taskgroup {taskgroup_id}")
# Wait for completion and get results
await wait_for_completion(client, taskgroup_id)
results = await get_all_results(client, taskgroup_id)
print(f"Successfully processed {len(results)} companies")
return results
# Run the batch job
results = asyncio.run(batch_company_research())