# Strands Agents integration

> Run Strands Agents AI workflows with durable execution using the Temporal Python SDK and Strands plugin.

Temporal's integration with [Strands Agents](https://strandsagents.com/) is an [SDK Plugin](/develop/plugins-guide) that
gives your Strands agents [Durable Execution](/temporal#durable-execution) via the Temporal platform. The plugin routes
model invocations, tool calls, MCP tool calls, and hooks through Temporal Activities, so every step your agent takes is
recorded in Workflow history and can survive crashes, restarts, and infrastructure failures.

> **Public Preview**

Code snippets in this guide are taken from the
[Strands Agents plugin samples](https://github.com/temporalio/samples-python/tree/main/strands_plugin). Refer to the
samples for the complete code.

## Get started

Install the plugin, then run a minimal Strands agent inside a Temporal Workflow.

### Prerequisites

- This guide assumes you are already familiar with Strands Agents. If you are not, refer to the
  [Strands Agents documentation](https://strandsagents.com/) for more details.
- If you are new to Temporal, read [Understanding Temporal](/evaluate/understanding-temporal) or take the
  [Temporal 101](https://learn.temporal.io/courses/temporal_101/) course.
- Set up your local development environment by following the
  [Set up your local development environment](/develop/python/set-up-your-local-python) guide. Leave the Temporal
  development server running if you want to test your code locally.

### Install the plugin

Install the Temporal Python SDK with Strands Agents support (requires `temporalio` 1.28.0 or later):

```bash
uv add "temporalio[strands-agents]"
```

or with pip:

```bash
pip install "temporalio[strands-agents]"
```

### Run a Strands agent with Durable Execution

The following example runs a Strands agent inside a Temporal Workflow. Model calls execute as Temporal Activities, which
means they get automatic retries, timeouts, and durable execution. If the Worker process crashes mid-conversation,
Temporal replays the Workflow and resumes from the last completed Activity.

**1. Define the Workflow**

Create a Workflow that holds a `TemporalAgent` and invokes it with a prompt. The `start_to_close_timeout` sets the
maximum time each model call Activity can run:

<!--SNIPSTART python-strands-hello-world-workflow-->
[strands_plugin/hello_world/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hello_world/workflow.py)
```py
from datetime import timedelta

from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent

@workflow.defn
class HelloWorldWorkflow:
    def __init__(self) -> None:
        self.agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60))

    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await self.agent.invoke_async(prompt)
        return str(result)

```
<!--SNIPEND-->

> **⚠️ Caution:**
>
> Inside a Workflow, always call `agent.invoke_async(message)`, not `agent(message)`. The synchronous form spawns a worker
> thread, which the Workflow sandbox blocks.
>

**2. Start a Worker**

Create a Worker that registers the Workflow and the `StrandsPlugin`. The plugin automatically registers the Activities
that handle model calls:

<!--SNIPSTART python-strands-hello-world-worker-->
[strands_plugin/hello_world/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hello_world/run_worker.py)
```py
import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

from strands_plugin.hello_world.workflow import HelloWorldWorkflow

async def main() -> None:
    plugin = StrandsPlugin()
    client = await Client.connect(
        os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
        plugins=[plugin],
    )

    worker = Worker(
        client,
        task_queue="strands-hello-world",
        workflows=[HelloWorldWorkflow],
    )
    print("Worker started. Ctrl+C to exit.")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```
<!--SNIPEND-->

**3. Run the Workflow**

Start the Workflow from a separate client script. This example sends the prompt "Write a haiku about durable execution"
and prints the agent's response:

<!--SNIPSTART python-strands-hello-world-run-workflow-->
[strands_plugin/hello_world/run_workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hello_world/run_workflow.py)
```py
import asyncio
import os

from temporalio.client import Client

from strands_plugin.hello_world.workflow import HelloWorldWorkflow

async def main() -> None:
    client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

    result = await client.execute_workflow(
        HelloWorldWorkflow.run,
        "Write a haiku about durable execution.",
        id="strands-hello-world",
        task_queue="strands-hello-world",
    )

    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())
```
<!--SNIPEND-->

## Build the agent

Customize which model provider your agent uses, add tools that run as Activities, subscribe to lifecycle events with
hooks, and connect to MCP servers.

### Choose and configure models

By default, `StrandsPlugin` uses Strands' own default model (`BedrockModel`). To use a different model, pass a `models`
mapping to `StrandsPlugin` on the Worker. When you provide a custom `models` mapping, each `TemporalAgent` must specify
which model to use by name.

Each entry in the mapping pairs a name with a factory function that creates a model provider (such as `AnthropicModel`
or `BedrockModel`). The provider is created on first use and reused for the Worker's lifetime:

```python
from strands.models.anthropic import AnthropicModel
from strands.models.bedrock import BedrockModel

# Workflow
@workflow.defn
class MultiModelWorkflow:
    def __init__(self) -> None:
        self.agent_a = TemporalAgent(
            model="claude",
            start_to_close_timeout=timedelta(seconds=60),
        )
        self.agent_b = TemporalAgent(
            model="bedrock",
            start_to_close_timeout=timedelta(seconds=60),
        )

# Worker
Worker(..., plugins=[StrandsPlugin(models={
    "claude": lambda: AnthropicModel(client_args={"api_key": "..."}),
    "bedrock": lambda: BedrockModel(),
})])
```

Each `TemporalAgent` carries its own Activity options (timeouts, retry policy, task queue, streaming topic) and
dispatches to a shared model Activity, which resolves the model name against the registered factories at runtime. A
model name not present in the `models` mapping raises `ValueError` inside the Activity.

### Run non-deterministic tools as Activities

Strands tools that perform I/O, access external services, or produce non-deterministic results need to run as Temporal
Activities rather than inline in the Workflow. Wrap each tool in an `@activity.defn` function, register the Activities
on the Worker, and pass them to the agent using `activity_as_tool`.

Define an Activity for the tool:

<!--SNIPSTART python-strands-tools-activity-->
[strands_plugin/tools/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/tools/workflow.py)
```py
@activity.defn
async def fetch_weather(city: str) -> dict:
    """Stub weather lookup — replace with a real HTTP call in production."""
    return {
        "city": city,
        "temperature_f": 72,
        "conditions": "sunny",
    }

```
<!--SNIPEND-->

Pass the Activity to the agent in the Workflow using `activity_as_tool`:

<!--SNIPSTART python-strands-tools-workflow-->
[strands_plugin/tools/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/tools/workflow.py)
```py
@workflow.defn
class ToolsWorkflow:
    def __init__(self) -> None:
        self.agent = TemporalAgent(
            start_to_close_timeout=timedelta(seconds=60),
            tools=[
                letter_counter,
                activity_as_tool(
                    fetch_weather,
                    start_to_close_timeout=timedelta(seconds=30),
                ),
                activity_as_tool(
                    environment_activity,
                    start_to_close_timeout=timedelta(seconds=30),
                ),
            ],
        )

    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await self.agent.invoke_async(prompt)
        return str(result)

```
<!--SNIPEND-->

Register the Activity functions on the Worker:

<!--SNIPSTART python-strands-tools-worker-->
[strands_plugin/tools/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/tools/run_worker.py)
```py
import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

from strands_plugin.tools.workflow import (
    ToolsWorkflow,
    environment_activity,
    fetch_weather,
)

async def main() -> None:
    plugin = StrandsPlugin()
    client = await Client.connect(
        os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
        plugins=[plugin],
    )

    worker = Worker(
        client,
        task_queue="strands-tools",
        workflows=[ToolsWorkflow],
        activities=[fetch_weather, environment_activity],
    )
    print("Worker started. Ctrl+C to exit.")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```
<!--SNIPEND-->

If you are using built-in `strands_tools`, wrap them in a thin async function decorated with `@activity.defn` so they
run as Temporal Activities.

### React to agent lifecycle events

Strands' [hook system](https://strandsagents.com/docs/user-guide/concepts/agents/hooks/) lets you subscribe callbacks to events in the agent lifecycle, such
as invocation start/end, model call before/after, tool call before/after, and message added. Use hooks to add logging,
metrics, or custom logic at each stage.

Pass `hooks=[MyHookProvider()]` to `TemporalAgent`. Hook callbacks fire in Workflow context, so deterministic callbacks
work without any extra setup.

For callbacks that need I/O (audit logging, metrics, alerting), use `activity_as_hook` to dispatch the work as a
Temporal Activity. The following example shows both patterns in one `HookProvider`. The `_record` callback runs in
Workflow context (deterministic), while `persist_tool_call` runs as an Activity (I/O-safe):

<!--SNIPSTART python-strands-hooks-activity-->
[strands_plugin/hooks/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hooks/workflow.py)
```py
@activity.defn
async def persist_tool_call(tool_name: str) -> None:
    # In production, write to a database / S3 / your audit pipeline.
    activity.logger.info(f"audit: tool {tool_name} completed")

```
<!--SNIPEND-->

<!--SNIPSTART python-strands-hooks-provider-->
[strands_plugin/hooks/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/hooks/workflow.py)
```py
class AuditHook(HookProvider):
    def __init__(self) -> None:
        self.fired: list[str] = []

    def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None:
        registry.add_callback(AfterToolCallEvent, self._record)
        registry.add_callback(
            AfterToolCallEvent,
            activity_as_hook(
                persist_tool_call,
                activity_input=lambda event: event.tool_use["name"],
                start_to_close_timeout=timedelta(seconds=15),
            ),
        )

    def _record(self, event: AfterToolCallEvent) -> None:
        self.fired.append(event.tool_use["name"])

```
<!--SNIPEND-->

> **⚠️ Caution:**
>
> Hook callbacks run in Workflow context, so they must be
> [deterministic](/develop/python/workflows/basics#workflow-logic-requirements). Do not use `time.time()`, `uuid.uuid4()`,
> or I/O inside hook callbacks. Use `activity_as_hook` for anything that requires I/O.
>

The `activity_input` parameter extracts serializable values from the event to pass as the Activity's input. Use a
dataclass or Pydantic model for multiple values. This is needed because hook events hold references to `Agent`,
`AgentTool` instances, and other objects that cannot cross the Activity boundary.

### Connect to MCP servers

If your agent needs access to tools provided by an [MCP](https://modelcontextprotocol.io/) server, configure the MCP
clients on the Worker and reference them by name in the Workflow.

`StrandsPlugin(mcp_clients=...)` takes a mapping of `name` to `MCPClient` factory, mirroring the `models` pattern. The
plugin registers a per-server Activity and connects at Worker startup to enumerate available tools. In the Workflow,
`TemporalMCPClient(server="name")` is a handle that references the server by name and carries per-call Activity options.

Define the Workflow with a `TemporalMCPClient`:

<!--SNIPSTART python-strands-mcp-workflow-->
[strands_plugin/mcp/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/mcp/workflow.py)
```py
from datetime import timedelta

from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent, TemporalMCPClient

@workflow.defn
class MCPWorkflow:
    def __init__(self) -> None:
        echo = TemporalMCPClient(
            server="echo",
            start_to_close_timeout=timedelta(seconds=30),
        )
        self.agent = TemporalAgent(
            start_to_close_timeout=timedelta(seconds=60),
            tools=[echo],
        )

    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await self.agent.invoke_async(prompt)
        return str(result)

```
<!--SNIPEND-->

Register the MCP client factory on the Worker:

<!--SNIPSTART python-strands-mcp-worker {"selectedLines": ["6-10", "17-25", "28-41"]}-->
[strands_plugin/mcp/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/mcp/run_worker.py)
```py
# ...
from mcp import StdioServerParameters, stdio_client
from strands.tools.mcp.mcp_client import MCPClient
from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker
# ...
def _make_echo_client() -> MCPClient:
    return MCPClient(
        lambda: stdio_client(
            StdioServerParameters(
                command=sys.executable,
                args=[str(ECHO_SERVER)],
            )
        )
    )
# ...
async def main() -> None:
    plugin = StrandsPlugin(mcp_clients={"echo": _make_echo_client})
    client = await Client.connect(
        os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
        plugins=[plugin],
    )

    worker = Worker(
        client,
        task_queue="strands-mcp",
        workflows=[MCPWorkflow],
    )
    print("Worker started. Ctrl+C to exit.")
    await worker.run()
```
<!--SNIPEND-->

Each factory returns a fully configured `MCPClient`, so you can pass options like `tool_filters`, `prefix`,
`elicitation_callback`, or `tasks_config` to it.

> **ℹ️ Info:**
>
> The plugin connects to each MCP server once at Worker startup to enumerate tools. The schema is frozen for the Worker's
> lifetime. Restart Workers to pick up MCP server changes. If a server is unavailable at startup, the Worker fails to
> start.
>

## Interact with the agent

Control the shape of agent responses, stream output in real time, and pause the agent for human approval.

### Add human approval gates

Some agent actions, such as deleting resources or sending messages, may require human approval before proceeding.
Strands offers two ways to interrupt an agent and wait for a response. Both work with the plugin.

In each case, `agent.invoke_async()` returns `AgentResult(stop_reason="interrupt", interrupts=[...])` instead of
raising. Pair this with a Signal handler that supplies responses, then resume by calling
`agent.invoke_async(responses)`.

#### Interrupt from a hook

A hook on an interruptible event such as `BeforeToolCallEvent` can pause the agent by calling
`event.interrupt(name, reason=...)`. The hook runs in Workflow context, so it must be deterministic.

Define the approval hook:

<!--SNIPSTART python-strands-human-in-the-loop-hook-->
[strands_plugin/human_in_the_loop/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/human_in_the_loop/workflow.py)
```py
class ApprovalHook(HookProvider):
    def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None:
        registry.add_callback(BeforeToolCallEvent, self._gate)

    def _gate(self, event: BeforeToolCallEvent) -> None:
        if event.tool_use["name"] != "delete_file":
            return
        approval = event.interrupt(
            "approval",
            reason=f"approve delete of {event.tool_use['input']['path']}?",
        )
        if approval != "approve":
            event.cancel_tool = "denied"

```
<!--SNIPEND-->

The Workflow waits for a Signal carrying the approval response, then resumes the agent:

<!--SNIPSTART python-strands-human-in-the-loop-workflow-->
[strands_plugin/human_in_the_loop/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/human_in_the_loop/workflow.py)
```py
@workflow.defn
class HumanInTheLoopWorkflow:
    def __init__(self) -> None:
        self.agent = TemporalAgent(
            start_to_close_timeout=timedelta(seconds=60),
            tools=[delete_file],
            hooks=[ApprovalHook()],
        )
        self._approval: Optional[str] = None
        self._pending_reason: Optional[str] = None

    @workflow.signal
    def approve(self, response: str) -> None:
        self._approval = response

    @workflow.query
    def pending_approval(self) -> Optional[str]:
        return self._pending_reason

    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await self.agent.invoke_async(prompt)
        while result.stop_reason == "interrupt":
            interrupts = list(result.interrupts or [])
            self._pending_reason = interrupts[0].reason if interrupts else None
            await workflow.wait_condition(lambda: self._approval is not None)
            response = self._approval
            self._approval = None
            self._pending_reason = None
            responses: list[InterruptResponseContent] = [
                {"interruptResponse": {"interruptId": i.id, "response": response}}
                for i in interrupts
            ]
            result = await self.agent.invoke_async(responses)
        return str(result)

```
<!--SNIPEND-->

#### Interrupt from a tool

A `@strands.tool` function can raise `InterruptException(Interrupt(...))` directly. The agent stops with the interrupt,
and the Workflow handles the resume the same way as for hooks:

```python
from strands import tool
from strands.interrupt import Interrupt, InterruptException

@tool
def delete_thing(name: str) -> str:
    raise InterruptException(
        Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?")
    )
```

The same approach works from an `activity_as_tool`-wrapped Activity. The plugin's failure converter preserves the
`Interrupt` payload across the Activity boundary, so `AgentResult.interrupts` is populated the same way.

Define the Activity that raises the interrupt:

<!--SNIPSTART python-strands-activity-interrupt-activity-->
[strands_plugin/activity_interrupt/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/activity_interrupt/workflow.py)
```py
@activity.defn
async def delete_thing(name: str) -> str:
    if name not in _APPROVED:
        _APPROVED.add(name)
        raise InterruptException(
            Interrupt(
                id=f"delete:{name}",
                name="approval",
                reason=f"approve delete of protected resource '{name}'?",
            )
        )
    return f"deleted {name}"

```
<!--SNIPEND-->

> **⚠️ Caution:**
>
> Activity-tool interrupts rely on the plugin's failure converter, which is installed via the client's data converter.
> Attach `StrandsPlugin` to the **client** (not just the Worker) for Activity-tool interrupts to work.
>

Workers built from that client pick up the plugin automatically:

<!--SNIPSTART python-strands-activity-interrupt-worker-->
[strands_plugin/activity_interrupt/run_worker.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/activity_interrupt/run_worker.py)
```py
import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

from strands_plugin.activity_interrupt.workflow import (
    ActivityInterruptWorkflow,
    delete_thing,
)

async def main() -> None:
    plugin = StrandsPlugin()
    # The plugin MUST be on the client so its failure converter is installed.
    client = await Client.connect(
        os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
        plugins=[plugin],
    )

    worker = Worker(
        client,
        task_queue="strands-activity-interrupt",
        workflows=[ActivityInterruptWorkflow],
        activities=[delete_thing],
    )
    print("Worker started. Ctrl+C to exit.")
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```
<!--SNIPEND-->

### Return structured data from an agent

To have the agent return a typed object instead of free-form text, pass a `structured_output_model` to `TemporalAgent`.
The plugin defaults to the [`pydantic_data_converter`](/develop/python/data-handling/data-conversion), so Pydantic types
serialize cleanly across the Activity and Workflow boundary:

<!--SNIPSTART python-strands-structured-output-workflow-->
[strands_plugin/structured_output/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/structured_output/workflow.py)
```py
from datetime import timedelta

from pydantic import BaseModel, Field
from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent

class PersonInfo(BaseModel):
    name: str = Field(description="Name of the person")
    age: int = Field(description="Age of the person")
    occupation: str = Field(description="Occupation of the person")

@workflow.defn
class StructuredOutputWorkflow:
    def __init__(self) -> None:
        self.agent = TemporalAgent(
            start_to_close_timeout=timedelta(seconds=60),
            structured_output_model=PersonInfo,
        )

    @workflow.run
    async def run(self, prompt: str) -> PersonInfo:
        result = await self.agent.invoke_async(prompt)
        assert isinstance(result.structured_output, PersonInfo)
        return result.structured_output

```
<!--SNIPEND-->

### Stream agent output to clients

For long-running agent calls, you may want to forward model output chunks to an external consumer as they arrive rather
than waiting for the full response.

Pass `streaming_topic="..."` to `TemporalAgent` and host a `WorkflowStream` on the Workflow. Each `StreamEvent` is
published from inside the model Activity. Subscribers read events through `WorkflowStreamClient`. Chunks are batched on
`streaming_batch_interval` (default 100 ms).

Define the Workflow with a `WorkflowStream` and a streaming topic:

<!--SNIPSTART python-strands-streaming-workflow-->
[strands_plugin/streaming/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/streaming/workflow.py)
```py
from datetime import timedelta

from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent
from temporalio.contrib.workflow_streams import WorkflowStream

@workflow.defn
class StreamingWorkflow:
    def __init__(self) -> None:
        self.stream = WorkflowStream()
        self.agent = TemporalAgent(
            start_to_close_timeout=timedelta(seconds=60),
            streaming_topic="events",
        )

    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await self.agent.invoke_async(prompt)
        return str(result)

```
<!--SNIPEND-->

Subscribe to the stream from a client:

<!--SNIPSTART python-strands-streaming-client-->
[strands_plugin/streaming/run_workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/streaming/run_workflow.py)
```py
import asyncio
import os
from datetime import timedelta

from strands.types.streaming import StreamEvent
from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from strands_plugin.streaming.workflow import StreamingWorkflow

async def main() -> None:
    client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
    workflow_id = "strands-streaming"

    handle = await client.start_workflow(
        StreamingWorkflow.run,
        "Count from 1 to 5, one number per sentence.",
        id=workflow_id,
        task_queue="strands-streaming",
    )

    async def consume() -> None:
        stream = WorkflowStreamClient.create(client, workflow_id)
        async for item in stream.subscribe(
            ["events"],
            from_offset=0,
            result_type=StreamEvent,
            poll_cooldown=timedelta(milliseconds=50),
        ):
            event: StreamEvent = item.data
            if "contentBlockDelta" in event:
                delta = event["contentBlockDelta"].get("delta", {})
                if "text" in delta:
                    print(delta["text"], end="", flush=True)
            elif "messageStop" in event:
                print()
                return

    consume_task = asyncio.create_task(consume())
    result = await handle.result()
    await asyncio.wait_for(consume_task, timeout=10.0)
    print(f"Final result: {result}")

if __name__ == "__main__":
    asyncio.run(main())
```
<!--SNIPEND-->

## Run in production

Configure retry policies, handle long-running chat sessions, and add distributed tracing.

### Configure retries

`TemporalAgent` disables Strands' built-in `ModelRetryStrategy` so that retries are handled exclusively by Temporal.
Configure retries with `retry_policy` on `TemporalAgent` for model calls, and on the Activity options accepted by
`activity_as_tool`, `activity_as_hook`, and `TemporalMCPClient` for their respective calls:

```python
from temporalio.common import RetryPolicy

TemporalAgent(
    start_to_close_timeout=timedelta(seconds=60),
    retry_policy=RetryPolicy(maximum_attempts=3),
)
```

Passing `retry_strategy=...` to `TemporalAgent(...)` raises `ValueError`. Remove the argument (or pass
`retry_strategy=None`) and use `retry_policy` instead.

### Handle long-running chat sessions

A chat-style Workflow accumulates message history with every turn. Over a long session, the Workflow's event history can
grow large enough to hit Temporal's per-Workflow history limit. To avoid this, use
[Continue-as-New](/develop/python/workflows/continue-as-new) to start a fresh Workflow execution while carrying the
agent's message history forward as input.

In this example, each user turn arrives as a Workflow [Update](/develop/python/workflows/message-passing#updates), so
the caller gets the agent's reply back from the same call. The `run` method creates the agent, then waits until either
the chat ends or Temporal suggests continue-as-new. When it does, the Workflow drains any in-flight updates and starts a
fresh execution with the agent's accumulated messages:

<!--SNIPSTART python-strands-continue-as-new-workflow-->
[strands_plugin/continue_as_new/workflow.py](https://github.com/temporalio/samples-python/blob/main/strands_plugin/continue_as_new/workflow.py)
```py
import asyncio
from dataclasses import dataclass, field
from datetime import timedelta

from strands.types.content import Messages
from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent

@dataclass
class ChatInput:
    messages: Messages = field(default_factory=list)

@workflow.defn
class ChatWorkflow:
    def __init__(self) -> None:
        self._done = False
        self._lock = asyncio.Lock()
        self._agent: TemporalAgent | None = None

    @workflow.update
    async def turn(self, prompt: str) -> str:
        await workflow.wait_condition(lambda: self._agent is not None)
        async with self._lock:
            assert self._agent is not None
            result = await self._agent.invoke_async(prompt)
            return str(result).strip()

    @workflow.signal
    def end_chat(self) -> None:
        self._done = True

    @workflow.query
    def messages(self) -> Messages:
        return list(self._agent.messages) if self._agent else []

    @workflow.run
    async def run(self, input: ChatInput) -> None:
        self._agent = TemporalAgent(
            start_to_close_timeout=timedelta(seconds=60),
            messages=list(input.messages),
        )

        await workflow.wait_condition(
            lambda: self._done or workflow.info().is_continue_as_new_suggested()
        )

        await workflow.wait_condition(workflow.all_handlers_finished)

        if not self._done:
            workflow.continue_as_new(ChatInput(messages=self._agent.messages))

```
<!--SNIPEND-->

### Add tracing with OpenTelemetry

To get distributed traces across model, tool, and MCP Activities, combine `StrandsPlugin` with the
[OpenTelemetry plugin](/develop/python/platform/observability#tracing). Register `OpenTelemetryPlugin` on the client and
`StrandsPlugin` on the Worker. Workers built from that client pick up the OpenTelemetry plugin automatically:

```python
import opentelemetry.trace
from temporalio.client import Client
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

opentelemetry.trace.set_tracer_provider(create_tracer_provider())

client = await Client.connect("localhost:7233", plugins=[OpenTelemetryPlugin()])

Worker(
    client,
    task_queue="strands",
    workflows=[MyWorkflow],
    plugins=[StrandsPlugin()],
)
```

Set the tracer provider before connecting the client.

### Snapshots are not supported

`TemporalAgent.take_snapshot()` and `TemporalAgent.load_snapshot()` raise `NotImplementedError`. Temporal's event
history already persists Workflow state durably at a finer granularity than Strands snapshots, so snapshots are
redundant inside a Workflow.

### Samples

The [Strands Agents plugin samples](https://github.com/temporalio/samples-python/tree/main/strands_plugin) demonstrate
all supported patterns end-to-end.
