# LangGraph integration

Temporal's integration with [LangGraph](https://www.langchain.com/langgraph) gives your LangGraph AI agent workflows
durable execution, automatic retries, and timeouts via the Temporal platform.

The plugin supports both the LangGraph **Graph API** (`StateGraph` with nodes and edges) and the **Functional API**
(`@entrypoint` / `@task` decorators). Each graph node and task must specify whether it runs as a Temporal Activity or
directly inside the Workflow — Activity nodes get configurable timeouts and retry policies, while Workflow nodes run
inline and must be deterministic.

> **Pre-release**

Code snippets in this guide are taken from the
[LangGraph plugin samples](https://github.com/temporalio/samples-python/tree/main/langgraph_plugin). Refer to the
samples for the complete code.

## Prerequisites

- This guide assumes you are already familiar with LangGraph. If you aren't, refer to the
  [LangGraph documentation](https://langchain-ai.github.io/langgraph/) for more details.
- If you are new to Temporal, we recommend reading [Understanding Temporal](/evaluate/understanding-temporal) or taking
  the [Temporal 101](https://learn.temporal.io/courses/temporal_101/) course.
- Ensure you have set up your local development environment by following the
  [Set up your local development environment](/develop/python/set-up-your-local-python) guide. When you're done, leave the
  Temporal development server running if you want to test your code locally.

## Install the plugin

Install the Temporal Python SDK with LangGraph support (requires `temporalio` 1.27.0 or later):

```bash
uv add "temporalio[langgraph]"
```

or with pip:

```bash
pip install "temporalio[langgraph]"
```

> **📝 Note:**
>
> Python 3.11 or newer is required for the Functional API (`@entrypoint` / `@task`), for `interrupt()`, and for streaming
> from a node running in the Workflow. On older Python versions the plugin loads but emits a warning, and those features
> will not work because LangGraph relies on `contextvars` propagation through `asyncio.create_task()`, which is only
> available from Python 3.11 onward.
>

## Graph API

The Graph API uses `StateGraph` to define nodes and edges declaratively.

### Define a graph and Workflow

Build a `StateGraph`, then retrieve it inside your Workflow with the `graph()` helper:

```python
from datetime import timedelta

from langgraph.graph import START, StateGraph
from temporalio import workflow
from temporalio.contrib.langgraph import graph

async def process_query(query: str) -> str:
    """Process a query and return a response."""
    return f"Processed: {query}"

def build_graph() -> StateGraph:
    """Construct a single-node graph."""
    g = StateGraph(str)
    g.add_node(
        "process_query",
        process_query,
        metadata={
            "execute_in": "activity",
            "start_to_close_timeout": timedelta(seconds=10),
        },
    )
    g.add_edge(START, "process_query")
    return g

@workflow.defn
class HelloWorldWorkflow:
    @workflow.run
    async def run(self, query: str) -> str:
        return await graph("hello-world").compile().ainvoke(query)
```

### Configure the Worker

Create a `LangGraphPlugin` with your graphs and pass it to the Worker:

```python
import asyncio

from temporalio.client import Client
from temporalio.contrib.langgraph import LangGraphPlugin
from temporalio.worker import Worker

async def main() -> None:
    client = await Client.connect("localhost:7233")
    plugin = LangGraphPlugin(graphs={"hello-world": build_graph()})

    worker = Worker(
        client,
        task_queue="langgraph-hello-world",
        workflows=[HelloWorldWorkflow],
        plugins=[plugin],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())
```

### Set Activity options

Pass Activity options as node `metadata` when calling `add_node`. Every node must include `"execute_in"` set to either
`"activity"` or `"workflow"`; the plugin raises an error if it's missing.

```python
from datetime import timedelta
from temporalio.common import RetryPolicy

g = StateGraph(str)
g.add_node(
    "my_node",
    my_node,
    metadata={
        "execute_in": "activity",
        "start_to_close_timeout": timedelta(seconds=30),
        "retry_policy": RetryPolicy(maximum_attempts=3),
    },
)
```

> **⚠️ Warning:**
>
> Don't pass a LangGraph `retry_policy=` to `add_node` (or `@task(retry_policy=...)`) — the plugin raises an error if you
> do. Use Temporal's `RetryPolicy` via the node's `metadata` (Graph API) or `activity_options` (Functional API) instead.
>

### Shared defaults

To apply the same Activity options across every node and task, pass `default_activity_options` to `LangGraphPlugin`.
Per-node `metadata` (Graph API) and per-task `activity_options` (Functional API) override these defaults key by key:

```python
plugin = LangGraphPlugin(
    graphs={"my-graph": g},
    default_activity_options={
        "start_to_close_timeout": timedelta(seconds=30),
        "retry_policy": RetryPolicy(maximum_attempts=3),
    },
)
```

To mitigate potential determinism bugs, `execute_in` cannot be set in `default_activity_options` — you must set it on each node or task individually.  See [Activity vs. Workflow execution](#activity-vs-workflow-execution).

## Functional API

The Functional API uses `@entrypoint` and `@task` decorators, letting you orchestrate tasks with native Python
control flow (`while`, `if/else`, `for`) rather than declaring nodes and edges.

### Define tasks and a Workflow

```python
from datetime import timedelta

from langgraph.func import entrypoint as lg_entrypoint
from langgraph.func import task
from temporalio import workflow
from temporalio.contrib.langgraph import entrypoint

@task
def agent_think(query: str, history: list[str]) -> dict:
    """Decide the next action based on query and tool history."""
    tool_results = [h for h in history if h.startswith("[Tool]")]
    if len(tool_results) < 2:
        return {"action": "tool", "tool_name": "search", "tool_input": query}
    return {"action": "final", "answer": f"Found: {'; '.join(tool_results)}"}

@task
def execute_tool(tool_name: str, tool_input: str) -> str:
    """Execute a tool by name."""
    return f"[Tool] Result for {tool_name}({tool_input})"

@lg_entrypoint()
async def react_agent(query: str) -> dict:
    """ReAct agent loop: think -> act -> observe -> repeat."""
    history: list[str] = []
    while True:
        decision = await agent_think(query, history)
        if decision["action"] == "final":
            return {"answer": decision["answer"], "steps": len(history)}
        result = await execute_tool(decision["tool_name"], decision["tool_input"])
        history.append(result)

all_tasks = [agent_think, execute_tool]

activity_options = {
    t.func.__name__: {
        "execute_in": "activity",
        "start_to_close_timeout": timedelta(seconds=30),
    }
    for t in all_tasks
}

@workflow.defn
class ReactAgentWorkflow:
    @workflow.run
    async def run(self, query: str) -> dict:
        return await entrypoint("react-agent").ainvoke(query)
```

### Configure the Worker with the Functional API

```python
from temporalio.contrib.langgraph import LangGraphPlugin

plugin = LangGraphPlugin(
    entrypoints={"react-agent": react_agent},
    tasks=all_tasks,
    activity_options=activity_options,
)

worker = Worker(
    client,
    task_queue="langgraph-react-agent",
    workflows=[ReactAgentWorkflow],
    plugins=[plugin],
)
```

## Checkpointer

If your LangGraph code requires a checkpointer (for example, if you're using interrupts), use `InMemorySaver`. Temporal
handles durability, so third-party checkpointers (like PostgreSQL or Redis) are not needed.

```python
import langgraph.checkpoint.memory

g = graph("my-graph").compile(
    checkpointer=langgraph.checkpoint.memory.InMemorySaver(),
)
```

## Runtime context

LangGraph's run-scoped context (`context_schema`) is reconstructed on the Activity side, so nodes and tasks can read
from `runtime.context`:

```python
from langgraph.runtime import Runtime
from typing_extensions import TypedDict

from temporalio.contrib.langgraph import graph

class Context(TypedDict):
    user_id: str

async def my_node(state: State, runtime: Runtime[Context]) -> dict:
    return {"user": runtime.context["user_id"]}

# In the Workflow:
g = graph("my-graph").compile()
await g.ainvoke({...}, context=Context(user_id="alice"))
```

Your `context` object must be serializable by the configured Temporal payload converter, since it crosses the Activity
boundary.

## Continue-as-new

Long-running graphs can hit Temporal's per-Event history size limit. Use Temporal's
[continue-as-new](/develop/python/workflows/continue-as-new) to start a fresh execution while preserving the results of nodes
and tasks that have already completed.

The `cache()` helper returns the current task-result cache as a serializable dict. Pass it to `graph(name, cache=...)`
or `entrypoint(name, cache=...)` in the new run to skip re-executing nodes that already produced a result.

```python
from temporalio import workflow
from temporalio.contrib.langgraph import cache, graph

@workflow.defn
class LongRunningWorkflow:
    @workflow.run
    async def run(self, state: State, prior_cache: dict | None = None) -> State:
        g = graph("my-graph", cache=prior_cache).compile()
        # ... run some steps, then continue-as-new before history grows too large ...
        workflow.continue_as_new(args=[state, cache()])
```

## Tracing

For tracing your LangGraph Workflows and Activities, we recommend the
[Temporal LangSmith plugin](https://github.com/temporalio/sdk-python/tree/main/temporalio/contrib/langsmith). It
composes with `LangGraphPlugin` — pass both plugins to your Worker.

## Stores are not supported

LangGraph's `Store` (for example, `InMemoryStore` passed via `graph.compile(store=...)` or `@entrypoint(store=...)`)
isn't accessible inside Activity-wrapped nodes: the Store holds live state that can't cross the Activity boundary, and
Activities may run on a different worker than the Workflow. If you pass a store, the plugin logs a warning on first use
and `runtime.store` is `None` inside nodes.

Use Workflow state for per-run memory, or an external database (Postgres, Redis, etc.) configured on each worker if you
need shared memory across runs.

## Activity vs. Workflow execution

Every graph node and `@task` must specify `execute_in` — set it to `"activity"` to run as a Temporal Activity, or
`"workflow"` to run directly inside the Workflow. The plugin raises an error if you forget to set it.

`execute_in` must be set per node or task; it cannot be set in `default_activity_options`.

Understanding when to use each mode is important for correctness and durability.

### When to use an Activity

Use `execute_in: "activity"` when a node does any of the following:

- **Makes network calls** — LLM calls, HTTP requests, database queries, or any I/O. Activities can do I/O; Workflows
  cannot.
- **Has non-deterministic behavior** — anything that can return different results on re-execution (random numbers,
  current time, external data). Workflows must be deterministic.
- **Is long-running or may fail** — Activities get configurable timeouts, automatic retries, and heartbeating. If an LLM
  call times out or a service is unavailable, Temporal retries the Activity without re-running the entire Workflow.
- **Calls `interrupt()`** — LangGraph's `interrupt()` is supported in Activity nodes. The plugin serializes the
  interrupt and propagates it back to the Workflow for human-in-the-loop patterns.

### When to run in the Workflow

Use `execute_in: "workflow"` when a node:

- **Orchestrates other graphs** — a node that calls `graph("child").compile().ainvoke(state)` to dispatch to a subgraph.
  The subgraph's own nodes still run as Activities, but the orchestration logic runs in the Workflow.
- **Performs pure state transformations** — deterministic data reshaping, merging, or filtering with no I/O.
- **Is a lightweight routing step** — when a node's only job is to decide what happens next and you want to avoid the
  overhead of an Activity round-trip.

> **⚠️ Warning:**
>
> Workflow code must be [deterministic](/develop/python/workflows/basics#workflow-logic-requirements). A node running in
> the Workflow **must not** make network calls, use `random`, read the system clock, or do file I/O. Violating this causes
> non-determinism errors on replay.
>

### Where LangGraph primitives run

Not all LangGraph primitives are node functions. Some run in the Workflow context regardless of the `execute_in` setting:

| Primitive | Runs in | Notes |
| --- | --- | --- |
| Node functions | Activity or Workflow | Controlled by `execute_in` in node `metadata` (required) |
| `@task` functions | Activity or Workflow | Controlled by `execute_in` in `activity_options` (required) |
| Conditional edge functions (`add_conditional_edges`) | Workflow | Always runs in the Workflow. Must be **deterministic** and **async** (sync functions trigger `run_in_executor`, which is not allowed in the Temporal sandbox). |
| `interrupt()` | Activity | Call `interrupt()` inside Activity nodes. The plugin serializes the interrupt and propagates it to the Workflow. |
| `Command(resume=...)` | Workflow | Used from Workflow code to resume after an interrupt. |
| `InMemorySaver` checkpointer | Workflow | Runs in-process. Temporal handles durability — external checkpointers are not needed. |

> **💡 Tip:**
>
> Conditional edge functions like `should_continue` must be `async def`, not plain `def`. Synchronous functions cause
> LangGraph to use `run_in_executor`, which is not supported inside Temporal's Workflow sandbox.
>
> ```python
> # ✅ Correct: async conditional edge function
> async def should_continue(state: AgentState) -> str:
>     if state["messages"][-1].startswith("[Agent]") and "Calling" in state["messages"][-1]:
>         return "tools"
>     return END
>
> g.add_conditional_edges("agent", should_continue)
> ```
>

### Syntax

```python
# Graph API
g.add_node("my_node", my_node, metadata={"execute_in": "workflow"})

# Functional API
plugin = LangGraphPlugin(
    tasks=[my_task],
    activity_options={"my_task": {"execute_in": "workflow"}},
)
```

### Example: subgraph orchestration

A common pattern is a parent node that runs in the Workflow and dispatches to a child graph whose nodes run as
Activities:

```python
async def parent_node(state: State) -> dict[str, str]:
    return await graph("child").compile().ainvoke(state)

parent = StateGraph(State)
parent.add_node("parent_node", parent_node, metadata={"execute_in": "workflow"})
parent.add_edge(START, "parent_node")

plugin = LangGraphPlugin(graphs={"parent": parent, "child": child})
```

## Human-in-the-loop

LangGraph's `interrupt()` works with Temporal signals and queries to support human-in-the-loop patterns:

1. A graph node calls `interrupt(draft)`, pausing execution.
2. The Workflow exposes the pending draft via a Temporal query.
3. An external process (UI, CLI) queries the draft and sends approval via a Temporal signal.
4. The graph resumes — `interrupt()` returns the signal value and the node completes.

See the [human-in-the-loop samples](https://github.com/temporalio/samples-python/tree/main/langgraph_plugin/graph_api/human_in_the_loop) for
complete working examples using both Graph and Functional APIs.

## Samples

The [LangGraph plugin samples](https://github.com/temporalio/samples-python/tree/main/langgraph_plugin)
demonstrate all supported patterns across both APIs.
