Skip to main content
This API is in beta and is accessible via the /v1beta/tasks/groups endpoint.
The Parallel Task Group API enables you to batch process hundreds or thousands of Tasks efficiently. Instead of running Tasks one by one, you can organize them into groups, monitor their progress collectively, and retrieve results in bulk. The API is comprised of the following endpoints: Creation: To run a batch of tasks in a group, you first need to create a task group, after which you can add runs to it, which will be queued and processed.
  • POST /v1beta/tasks/groups (Create task-group)
  • POST /v1beta/tasks/groups/{taskgroup_id}/runs (Add runs)
Progress Snapshot: At any moment during the task, you can get an instant snapshot of the state of it using GET /{taskgroup_id} and GET /{taskgroup_id}/runs. Please note that the runs endpoint streams back the requested runs instantly (using SSE) to allow for large payloads without pagination, and it doesn’t wait for runs to complete. Runs in a task group are stored indefinitely, so unless you have high performance requirements, you may not need to keep your own state of the intermediate results. However, it’s recommended to still do so after the task group is completed.
  • GET /v1beta/tasks/groups/{taskgroup_id} (Get task-group summary)
  • GET /v1beta/tasks/groups/{taskgroup_id}/runs (Fetch task group runs)
Realtime updates: You may want to provide efficient real-time updates to your app. For a high-level summary and run completion events, you can use GET /{taskgroup_id}/events. To also retrieve the task run result upon completion you can use the task run endpoint
  • GET /v1beta/tasks/groups/{taskgroup_id}/events (Stream task-group events)
  • GET /v1/tasks/runs/{run_id}/result (Get task-run result)
To determine whether a task group is fully completed, you can either use realtime update events, or you can poll the task-group summary endpoint. You can also keep adding runs to your task group indefinitely.

Key Concepts

Task Groups

A Task Group is a container that organizes multiple task runs. Each group has:
  • A unique taskgroup_id for identification
  • A status indicating overall progress
  • The ability to add new Tasks dynamically

Group Status

Track progress with real-time status updates:
  • Total number of task runs
  • Count of runs by status (queued, running, completed, failed)
  • Whether the group is still active
  • Human-readable status messages

Quick Start

1. Define Types and Task Structure

# Define task specification as a variable
TASK_SPEC='{
  "input_schema": {
    "json_schema": {
      "type": "object",
      "properties": {
        "company_name": {
          "type": "string",
          "description": "Name of the company"
        },
        "company_website": {
          "type": "string",
          "description": "Company website URL"
        }
      },
      "required": ["company_name", "company_website"]
    }
  },
  "output_schema": {
    "json_schema": {
      "type": "object",
      "properties": {
        "key_insights": {
          "type": "array",
          "items": {"type": "string"},
          "description": "Key business insights"
        },
        "market_position": {
          "type": "string",
          "description": "Market positioning analysis"
        }
      },
      "required": ["key_insights", "market_position"]
    }
  }
}'

2. Create a Task Group

# Create task group and capture the ID
response=$(curl --request POST \
  --url https://api.parallel.ai/v1beta/tasks/groups \
  --header 'Content-Type: application/json' \
  --header 'x-api-key: ${PARALLEL_API_KEY}' \
  --data '{}')

# Extract taskgroup_id from response
TASKGROUP_ID=$(echo $response | jq -r '.taskgroup_id')
echo "Created task group: $TASKGROUP_ID"

3. Add Tasks to the Group

curl --request POST \
  --url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID}/runs \
  --header 'Content-Type: application/json' \
  --header 'x-api-key: ${PARALLEL_API_KEY}' \
  --data '{
  "default_task_spec": '$TASK_SPEC',
  "inputs": [
    {
      "input": {
        "company_name": "Acme Corp",
        "company_website": "https://acme.com"
      },
      "processor": "pro"
    },
    {
      "input": {
        "company_name": "TechStart",
        "company_website": "https://techstart.io"
      },
      "processor": "pro"
    }
  ]
}'

4. Monitor Progress

# Get status of the group
curl --request GET \
  --url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID} \
  --header 'x-api-key: ${PARALLEL_API_KEY}'

# Get status of all runs in the group
curl --request GET \
  --no-buffer \
  --url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID}/runs \
  --header 'x-api-key: ${PARALLEL_API_KEY}'

5. Retrieve Results

curl --request GET \
  --no-buffer \
  --url https://api.parallel.ai/v1beta/tasks/groups/${TASKGROUP_ID}/events \
  --header 'x-api-key: ${PARALLEL_API_KEY}'

Batch Processing Pattern

For large datasets, process Tasks in batches to optimize performance:
async function processCompaniesInBatches(
  client: Parallel,
  taskgroupId: string,
  companies: Array<{ company_name: string; company_website: string }>,
  batchSize: number = 500
): Promise<void> {
  let totalCreated = 0;

  for (let i = 0; i < companies.length; i += batchSize) {
    const batch = companies.slice(i, i + batchSize);

    // Create run inputs for this batch using SDK types
    const runInputs: Array<Parallel.Beta.BetaRunInput> = batch.map(
      (company) => ({
        input: {
          company_name: company.company_name,
          company_website: company.company_website,
        },
        processor: "pro",
      })
    );

    // Add batch to group
    const response = await client.beta.taskGroup.addRuns(taskgroupId, {
      default_task_spec: taskSpec,
      inputs: runInputs,
    });

    totalCreated += response.run_ids.length;

    console.log(
      `Processed ${i + batch.length} companies. Created ${totalCreated} Tasks.`
    );
  }
}

Error Handling

The Group API provides robust error handling:
async function processWithErrorHandling(
  client: Parallel,
  taskgroupId: string
): Promise<{
  successful: Array<Parallel.Beta.TaskGroupGetRunsResponse>;
  failed: Array<Parallel.Beta.TaskGroupGetRunsResponse>;
}> {
  const successful: Array<Parallel.Beta.TaskGroupGetRunsResponse> = [];
  const failed: Array<Parallel.Beta.TaskGroupGetRunsResponse> = [];

  const runStream = await client.beta.taskGroup.getRuns(taskgroupId, {
    include_input: true,
    include_output: true,
  });

  for await (const event of runStream) {
    if (event.type === "error") {
      failed.push(event);
      continue;
    }

    if (event.type === "task_run.state") {
      try {
        // Validate the result
        const input = event.input?.input as CompanyInput;
        const output = event.output
          ? ((event.output as Parallel.TaskRunJsonOutput)
              .content as CompanyOutput)
          : null;

        if (input && output) {
          successful.push(event);
        }
      } catch (e) {
        console.error("Validation error:", e);
        failed.push(event);
      }
    }
  }

  console.log(`Success: ${successful.length}, Failed: ${failed.length}`);
  return { successful, failed };
}

Complete Example

Here’s a complete script that demonstrates the full workflow, including all of the setup code above.
import Parallel from "parallel-web";

// Define your input and output types
interface CompanyInput {
  company_name: string;
  company_website: string;
}

interface CompanyOutput {
  key_insights: string[];
  market_position: string;
}

// Use SDK types for Task Group API
type TaskGroupObject = Parallel.Beta.TaskGroup;
type TaskGroupGetRunsResponse = Parallel.Beta.TaskGroupGetRunsResponse;

// Create reusable task specification using SDK types
const taskSpec: Parallel.TaskSpec = {
  input_schema: {
    type: "json",
    json_schema: {
      type: "object",
      properties: {
        company_name: {
          type: "string",
          description: "Name of the company",
        },
        company_website: {
          type: "string",
          description: "Company website URL",
        },
      },
      required: ["company_name", "company_website"],
    },
  },
  output_schema: {
    type: "json",
    json_schema: {
      type: "object",
      properties: {
        key_insights: {
          type: "array",
          items: { type: "string" },
          description: "Key business insights",
        },
        market_position: {
          type: "string",
          description: "Market positioning analysis",
        },
      },
      required: ["key_insights", "market_position"],
    },
  },
};

async function waitForCompletion(
  client: Parallel,
  taskgroupId: string
): Promise<void> {
  while (true) {
    const response = await client.beta.taskGroup.retrieve(taskgroupId);

    const status = response.status;
    console.log("Status:", status.task_run_status_counts);

    if (!status.is_active) {
      console.log("All tasks completed!");
      break;
    }

    await new Promise((resolve) => setTimeout(resolve, 10000));
  }
}

async function getAllResults(
  client: Parallel,
  taskgroupId: string
): Promise<
  Array<{ company: string; insights: string[]; market_position: string }>
> {
  const results: Array<{
    company: string;
    insights: string[];
    market_position: string;
  }> = [];

  const runStream = await client.beta.taskGroup.getRuns(taskgroupId, {
    include_input: true,
    include_output: true,
  });

  for await (const event of runStream) {
    if (event.type === "task_run.state" && event.output) {
      const input = event.input?.input as CompanyInput;
      const output = (event.output as Parallel.TaskRunJsonOutput)
        .content as CompanyOutput;

      results.push({
        company: input.company_name,
        insights: output.key_insights,
        market_position: output.market_position,
      });
    }
  }

  return results;
}

async function batchCompanyResearch(): Promise<
  Array<{ company: string; insights: string[]; market_position: string }>
> {
  const client = new Parallel({
    apiKey: process.env.PARALLEL_API_KEY,
  });

  // Create task group
  const groupResponse = await client.beta.taskGroup.create({});
  const taskgroupId = groupResponse.taskgroup_id;
  console.log(`Created taskgroup id ${taskgroupId}`);

  // Define companies to research
  const companies = [
    { company_name: "Stripe", company_website: "https://stripe.com" },
    { company_name: "Shopify", company_website: "https://shopify.com" },
    { company_name: "Salesforce", company_website: "https://salesforce.com" },
  ];

  // Add Tasks to group
  const runInputs: Array<Parallel.Beta.BetaRunInput> = companies.map(
    (company) => ({
      input: {
        company_name: company.company_name,
        company_website: company.company_website,
      },
      processor: "pro",
    })
  );

  const response = await client.beta.taskGroup.addRuns(taskgroupId, {
    default_task_spec: taskSpec,
    inputs: runInputs,
  });

  console.log(
    `Added ${response.run_ids.length} runs to taskgroup ${taskgroupId}`
  );

  // Wait for completion and get results
  await waitForCompletion(client, taskgroupId);
  const results = await getAllResults(client, taskgroupId);
  console.log(`Successfully processed ${results.length} companies`);
  return results;
}

// Run the batch job
const results = await batchCompanyResearch();