Client¶
The Client layer in Protolink provides a high-level interface for agent-to-agent communication. It abstracts transport details and offers convenient methods for sending tasks, messages, and retrieving agent metadata.
AgentClient¶
The AgentClient is the primary entry point for programmatic agent interactions. It wraps a transport and provides a unified interface for communicating with Protolink agents.
Design Philosophy: Async vs Sync¶
Protolink's client architecture exposes two APIs to accommodate different workflows:
- Async API (Recommended): The core implementation. Ideal for modern applications, web servers (e.g., FastAPI), and high-performance multi-agent orchestration where non-blocking I/O is crucial.
- Sync API (
client.sync): A thin, blocking wrapper over the async methods. Designed for simple scripts, CLI tools, and environments where managing anasyncioevent loop is cumbersome.
Async Loop Constraint
The Sync API (client.sync) uses asyncio.run() under the hood. It cannot be used inside an already running event loop (e.g., inside an async function). If you are inside an async def, always use the standard Async API.
Quick Start¶
from protolink.client import AgentClient
from protolink.models import Task
# Create a client (transport type + URL)
client = AgentClient(transport="http", url="http://localhost:8000")
# Create a task with an inference request
task = Task.create_infer(prompt="Book me a vacation to Santorini")
# Send to a remote agent
result = await client.send_task(agent_url="http://localhost:8010", task=task)
# Get the response
print(result.get_last_part_content())
Constructor¶
AgentClient(transport: Transport | TransportType, url: str | None = None)
| Parameter | Type | Description |
|---|---|---|
transport |
Transport ⎪ str |
A Transport instance or type string ("http", "websocket", etc.) |
url |
str ⎪ None |
Base URL when using a transport type string |
timeout |
int |
Timeout in seconds for the request (default: 300) |
Examples:
# Using transport type string
client = AgentClient(transport="http", url="http://localhost:8000", timeout=120)
# Using an existing transport instance
from protolink.transport import HTTPTransport
transport = HTTPTransport(url="http://localhost:8000")
client = AgentClient(transport=transport)
Core Methods¶
send_task()¶
Sends a Task to a remote agent and returns the processed result.
async def send_task(agent_url: str, task: Task) -> Task
| Parameter | Description |
|---|---|
agent_url |
The full URL of the target agent (e.g., "http://localhost:8010") |
task |
The Task object to send |
Example:
from protolink.models import Task
# Create an infer task
task = Task.create_infer(prompt="What's the weather in Athens?")
# Send and get result
result = await client.send_task("http://localhost:8010", task)
print(result.get_last_part_content())
send_task_streaming()¶
Sends a task and yields streamed events as they arrive. This is the public client API for live task progress, LLM chunks, tool events, and final task completion.
async def send_task_streaming(agent_url: str, task: Task) -> AsyncIterator[Any]
Transport Support
Requires a transport that advertises streaming support and implements subscribe(). Supported choices include "sse", "json-rpc", "websocket", and "runtime". Plain "http" remains request/response only and raises NotImplementedError.
Example with SSE JSON-RPC:
from protolink.client import AgentClient
from protolink.models import Task
client = AgentClient(transport="sse", url="http://localhost:8000")
task = Task.create_infer(prompt="Write a short haiku about agents")
async for event in client.send_task_streaming("http://localhost:8010", task):
if event.get("type") == "task_llm_stream":
print(event.get("content") or "", end="", flush=True)
if event.get("final"):
print("\nstream complete")
Applications that need a stable UI or replay contract can normalize these transport events with RunEvent.from_task_event(...) or record them through InMemoryEventSink. See Runtime for the versioned run-event envelope.
SSE and WebSocket transports recursively convert nested Protolink models and dataclasses into JSON-compatible values. Tool and delegated-agent events therefore preserve structured results such as ToolOutput inside content or metadata; clients do not need a custom encoder for these framework event payloads.
cancel_task()¶
Requests best-effort cancellation of a task currently executing on an agent and returns the task after the request is accepted.
async def cancel_task(
agent_url: str,
task_id: str,
*,
reason: str | None = None,
metadata: dict[str, Any] | None = None,
) -> Task
The task ID is known before submission because the caller creates the Task. Cancellation should be sent from another coroutine or control handler after the task has been accepted, usually after the first streamed status or progress event.
import asyncio
task = Task.create_infer(prompt="Perform long-running work")
running = asyncio.create_task(client.send_task(agent_url, task))
# Wait for application-specific acceptance or progress before canceling.
await task_started.wait()
canceled = await client.cancel_task(
agent_url,
task.id,
reason="Stopped by the user",
metadata={"source": "cli"},
)
result = await running
assert canceled.state.value == "canceled"
assert result.state.value == "canceled"
cancel_task() uses the A2A-style POST /tasks/cancel operation over HTTP, SSE JSON-RPC, WebSocket, and RuntimeTransport. Cancellation is a control-plane request: WebSocket sends it over a separate connection so it does not queue behind the active task stream.
Cancellation is intentionally best-effort. Async work normally stops at an await boundary; synchronous work and external systems may need their own cooperative cancellation or rollback mechanism. See Runtime cancellation for lifecycle, custom-handler, and side-effect guidance.
compact_history()¶
Requests LLM conversation-history compaction from an agent over the control plane.
async def compact_history(
agent_url: str,
*,
strategy: str = "recent",
max_messages: int = 20,
max_tokens: int = 4000,
preserve_recent: int = 6,
summary_max_tokens: int = 512,
session_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> HistoryCompactionResult
This uses the built-in COMPACT_HISTORY_REQUEST spec (POST /llm/history/compact). It does not send a Task, does not create a model-visible tool, and does not add anything to the LLM prompt.
report = await client.compact_history(
agent_url,
strategy="tokens",
max_tokens=8_000,
preserve_recent=6,
session_id="customer-42",
)
When the target agent has state=["conversation"] and session_id is supplied, the agent loads that session history, compacts it, and saves it back.
State Control Plane¶
Inspect, reset, or compact a remote agent's persistent state without sending a model-visible task.
state = await client.describe_state(
agent_url,
session_id="customer-42",
)
reset = await client.reset_state(
agent_url,
session_id="customer-42",
)
compacted = await client.compact_state(
agent_url,
session_id="customer-42",
strategy="tokens",
max_tokens=8_000,
)
These methods return StateOperationResult. They use control-channel request
specs: DESCRIBE_STATE_REQUEST (POST /state/describe),
RESET_STATE_REQUEST (POST /state/reset), and COMPACT_STATE_REQUEST
(POST /state/compact).
send_message()¶
Convenience wrapper that creates a Task from a Message, sends it, and returns the response message.
async def send_message(agent_url: str, message: Message) -> Message
Example:
from protolink.models import Message
response = await client.send_message(
agent_url="http://localhost:8010",
message=Message.user("Hello, agent!")
)
print(response.parts[0].content)
get_agent_card()¶
Retrieves the public AgentCard from a remote agent. Useful for discovery and capability inspection.
async def get_agent_card(agent_url: str) -> AgentCard
Example:
card = await client.get_agent_card("http://localhost:8010")
print(f"Agent: {card.name}")
print(f"Description: {card.description}")
print(f"Skills: {[s.id for s in card.skills]}")
Synchronous API¶
The AgentClient provides synchronous versions of its core methods for use in non-async contexts (scripts, notebooks, CLI tools). These are accessible via the client.sync property.
Internally, these methods use asyncio.run() to handle the asynchronous transport logic.
Do Not Use in Async Loops
The synchronous API should NOT be used inside an active event loop (e.g., inside FastAPI endpoints or async Jupyter cells) as it uses asyncio.run(), which will raise a RuntimeError.
| Async Method | Synchronous Equivalent | Description |
|---|---|---|
send_task() |
client.sync.send_task() |
Synchronously send a task and wait for the result. |
send_task_streaming() |
client.sync.send_task_streaming() |
Synchronously iterate over streamed task events. |
cancel_task() |
client.sync.cancel_task() |
Synchronously request cancellation of a task running elsewhere. |
compact_history() |
client.sync.compact_history() |
Synchronously request LLM history compaction from an agent. |
describe_state() |
client.sync.describe_state() |
Synchronously inspect remote persistent state. |
reset_state() |
client.sync.reset_state() |
Synchronously reset remote persistent state. |
compact_state() |
client.sync.compact_state() |
Synchronously compact remote persistent conversation state. |
send_message() |
client.sync.send_message() |
Synchronously send a message and wait for the response message. |
get_agent_card() |
client.sync.get_agent_card() |
Synchronously retrieve an agent's public card. |
Example:
from protolink.client import AgentClient
from protolink.models import Task
client = AgentClient(transport="http", url="http://localhost:8000")
task = Task.create_infer(prompt="Hello, agent!")
# No 'await' or 'async def' needed. Use the .sync property!
result = client.sync.send_task("http://localhost:8010", task)
print(result.get_last_part_content())
Synchronous streaming example:
client = AgentClient(transport="sse", url="http://localhost:8000")
task = Task.create_infer(prompt="Stream this response")
for event in client.sync.send_task_streaming("http://localhost:8010", task):
print(event)
ClientRequestSpec¶
ClientRequestSpec defines the contract for an API endpoint in a transport-agnostic way.
@dataclass(frozen=True)
class ClientRequestSpec:
name: str # Human-readable name (e.g., "send_task")
path: str # URL path (e.g., "/tasks/")
method: HttpMethod # HTTP method (e.g., "POST")
response_parser: Callable # Function to parse response data
request_source: str # Where to put request data ("body", "query", etc.)
channel: str = "default" # Multiplexed transport channel
Built-in Request Specs¶
| Spec | Path | Method | Description |
|---|---|---|---|
TASK_REQUEST |
/tasks/ |
POST | Send a task to an agent |
TASK_CANCEL_REQUEST |
/tasks/cancel |
POST | Cancel an active task over a control channel |
COMPACT_HISTORY_REQUEST |
/llm/history/compact |
POST | Compact the target agent's LLM history over a control channel |
DESCRIBE_STATE_REQUEST |
/state/describe |
POST | Inspect target agent state over a control channel |
RESET_STATE_REQUEST |
/state/reset |
POST | Reset target agent state over a control channel |
COMPACT_STATE_REQUEST |
/state/compact |
POST | Compact target agent conversation state over a control channel |
AGENT_CARD_REQUEST |
/.well-known/agent.json |
GET | Retrieve agent metadata |
TASK_STREAM_REQUEST |
/tasks/stream |
POST | Send task with streaming |
How It Works¶
When you call a method like send_task():
- The client selects the appropriate
ClientRequestSpec(e.g.,TASK_REQUEST) - Passes the spec and data to
transport.send() - The transport uses the spec to construct the wire request
This pattern allows new endpoints without modifying transport implementations.