AG-UI Dart SDK
A strongly-typed Dart implementation of the AG-UI protocol for standardizing agent-user interactions through event-based communication.
Features
β Implemented
- Core Protocol Support: Full implementation of AG-UI event types and message structures
- HTTP Client: Production-ready client with SSE streaming support
- Event Streaming: Real-time event processing with backpressure handling
- Tool Interactions: Support for tool calls and results with generative UI
- State Management: Handle state snapshots and deltas (JSON Patch RFC 6902)
- Message History: Track conversation context across multiple runs
- Type Safety: Strongly-typed models for all protocol entities
- Error Handling: Comprehensive error types with validation
π§ Planned
- WebSocket transport support
- Binary protocol encoding/decoding
- Advanced retry strategies
- Event caching and offline support
Requirements
- Dart SDK >=3.3.0
- HTTP connectivity to AG-UI compatible servers
Installation
This SDK is distributed via GitHub. Add it to your pubspec.yaml
using Git dependencies:
Option 1: Pin to a specific release tag (recommended for production)
dependencies:
ag_ui:
git:
url: https://github.com/mattsp1290/ag-ui.git
ref: v0.1.0 # Replace with desired version tag
path: sdks/community/dart
Option 2: Use a branch (for development)
dependencies:
ag_ui:
git:
url: https://github.com/mattsp1290/ag-ui.git
ref: main # or any branch name
path: sdks/community/dart
Then run:
dart pub get
Quick Start
import 'package:ag_ui/ag_ui.dart';
void main() async {
// Initialize client with base URL
final client = AgUiClient(
config: AgUiClientConfig(
baseUrl: 'http://localhost:8000',
defaultHeaders: {'Authorization': 'Bearer YOUR_API_KEY'}, // Optional
),
);
// Create input for the agent
final input = SimpleRunAgentInput(
messages: [
UserMessage(
id: 'msg_${DateTime.now().millisecondsSinceEpoch}',
content: 'Hello, create a haiku about AI',
),
],
);
// Run agent and stream events
await for (final event in client.runAgent('agentic_chat', input)) {
switch (event.type) {
case EventType.runStarted:
print('Run started');
break;
case EventType.textMessageContent:
final textEvent = event as TextMessageContentEvent;
print('Assistant: ${textEvent.content}');
break;
case EventType.runFinished:
print('Run complete');
break;
default:
print('Event: ${event.type}');
}
}
}
Usage
Initialize Client
import 'package:ag_ui/ag_ui.dart';
// Basic initialization
final client = AgUiClient(
config: AgUiClientConfig(
baseUrl: 'http://localhost:8000',
),
);
// With authentication and custom headers
import 'dart:io';
final authenticatedClient = AgUiClient(
config: AgUiClientConfig(
baseUrl: Platform.environment['AGUI_BASE_URL'] ?? 'http://localhost:8000',
defaultHeaders: {
'Authorization': 'Bearer ${Platform.environment['AGUI_API_KEY']}',
'X-Custom-Header': 'value',
},
requestTimeout: Duration(seconds: 30),
),
);
// With custom HTTP client for advanced networking
import 'package:http/http.dart' as http;
final customClient = AgUiClient(
config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
httpClient: http.Client()../* configure as needed */,
);
Send User Message and Stream Response
import 'dart:io';
import 'package:ag_ui/ag_ui.dart';
Future<void> sendMessage(String userInput) async {
final client = AgUiClient(
config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
);
// Prepare input with user message
final input = SimpleRunAgentInput(
threadId: 'thread_${DateTime.now().millisecondsSinceEpoch}',
messages: [
UserMessage(
id: 'msg_${DateTime.now().millisecondsSinceEpoch}',
content: userInput,
),
],
);
// Stream events from the agent
final stream = client.runAgenticChat(input);
await for (final event in stream) {
switch (event.type) {
case EventType.textMessageContent:
final textEvent = event as TextMessageContentEvent;
stdout.write(textEvent.text); // Stream tokens
break;
case EventType.textMessageEnd:
stdout.writeln(); // New line after message
break;
case EventType.messagesSnapshot:
final snapshot = event as MessagesSnapshotEvent;
print('Total messages: ${snapshot.messages.length}');
break;
default:
// Handle other event types as needed
break;
}
}
}
Handle Tool Calls
import 'package:ag_ui/ag_ui.dart';
import 'dart:convert';
Future<void> handleToolCalls() async {
final client = AgUiClient(
config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
);
final input = SimpleRunAgentInput(
messages: [
UserMessage(
id: 'msg_1',
content: 'Generate a haiku',
),
],
);
List<ToolCall> pendingToolCalls = [];
// First run - collect tool calls
await for (final event in client.runToolBasedGenerativeUi(input)) {
if (event is MessagesSnapshotEvent) {
// Extract tool calls from assistant messages
for (final message in event.messages) {
if (message is AssistantMessage && message.toolCalls != null) {
pendingToolCalls.addAll(message.toolCalls!);
}
}
} else if (event.type == EventType.runFinished && pendingToolCalls.isNotEmpty) {
// Process tool calls after run completes
break;
}
}
// Prepare tool results
final toolResults = pendingToolCalls.map((toolCall) {
// Process each tool call and generate result
final result = processToolCall(toolCall);
return ToolMessage(
id: 'tool_result_${DateTime.now().millisecondsSinceEpoch}',
toolCallId: toolCall.id,
content: result,
);
}).toList();
// Second run - send tool results
final followUpInput = SimpleRunAgentInput(
threadId: input.threadId,
messages: [
...input.messages,
...toolResults,
],
);
await for (final event in client.runToolBasedGenerativeUi(followUpInput)) {
if (event is TextMessageContentEvent) {
print('Assistant: ${event.text}');
}
}
}
String processToolCall(ToolCall toolCall) {
// Parse tool call arguments
final args = json.decode(toolCall.function.arguments);
// Process based on tool name
switch (toolCall.function.name) {
case 'generate_haiku':
return 'Haiku generated successfully';
case 'get_weather':
final location = args['location'];
return 'Weather for $location: Sunny, 72Β°F';
default:
return 'Tool processed';
}
}
Manage Conversation State
import 'dart:io';
import 'package:ag_ui/ag_ui.dart';
class ConversationManager {
final AgUiClient client;
final String threadId;
final List<Message> messageHistory = [];
Map<String, dynamic> state = {};
ConversationManager({required this.client})
: threadId = 'thread_${DateTime.now().millisecondsSinceEpoch}';
Future<void> sendMessage(String content) async {
// Add user message to history
final userMessage = UserMessage(
id: 'msg_${DateTime.now().millisecondsSinceEpoch}',
content: content,
);
messageHistory.add(userMessage);
// Create input with full history
final input = SimpleRunAgentInput(
threadId: threadId,
messages: messageHistory,
state: state,
);
// Process events
await for (final event in client.runSharedState(input)) {
switch (event.type) {
case EventType.stateSnapshot:
final stateEvent = event as StateSnapshotEvent;
state = stateEvent.snapshot;
print('State updated: $state');
break;
case EventType.stateDelta:
final deltaEvent = event as StateDeltaEvent;
// Apply JSON Patch operations to state
applyJsonPatch(state, deltaEvent.delta);
break;
case EventType.messagesSnapshot:
final snapshot = event as MessagesSnapshotEvent;
// Update message history from snapshot
messageHistory.clear();
messageHistory.addAll(snapshot.messages);
break;
case EventType.textMessageContent:
final textEvent = event as TextMessageContentEvent;
stdout.write(textEvent.text);
break;
default:
break;
}
}
}
void applyJsonPatch(Map<String, dynamic> target, List<dynamic> operations) {
// Apply RFC 6902 JSON Patch operations
for (final op in operations) {
switch (op['op']) {
case 'add':
// Implementation for add operation
break;
case 'replace':
// Implementation for replace operation
break;
case 'remove':
// Implementation for remove operation
break;
}
}
}
}
Error Handling and Cancellation
import 'dart:async';
import 'package:ag_ui/ag_ui.dart';
Future<void> robustAgentCall() async {
final client = AgUiClient(
config: AgUiClientConfig(
baseUrl: 'http://localhost:8000',
requestTimeout: Duration(seconds: 30),
),
);
// Create cancellation token
final cancelToken = CancelToken();
// Set up timeout
Timer? timeoutTimer;
timeoutTimer = Timer(Duration(seconds: 60), () {
print('Request timed out, cancelling...');
cancelToken.cancel();
});
try {
final input = SimpleRunAgentInput(
messages: [
UserMessage(
id: 'msg_1',
content: 'Process this request',
),
],
);
final stream = client.runAgent(
'agentic_chat',
input,
cancelToken: cancelToken,
);
await for (final event in stream) {
// Process events
print('Received: ${event.type}');
// Cancel on specific condition
if (shouldCancel(event)) {
cancelToken.cancel();
break;
}
}
} on ConnectionException catch (e) {
print('Connection error: ${e.message}');
// Retry logic here
} on ValidationError catch (e) {
print('Validation error: ${e.message}');
} on CancelledException {
print('Request was cancelled');
} catch (e) {
print('Unexpected error: $e');
} finally {
timeoutTimer?.cancel();
client.dispose(); // Clean up resources
}
}
bool shouldCancel(BaseEvent event) {
// Implement your cancellation logic
return false;
}
Custom Event Processing
import 'dart:io';
import 'package:ag_ui/ag_ui.dart';
class EventProcessor {
final Map<EventType, Function(BaseEvent)> handlers = {};
EventProcessor() {
// Register event handlers
handlers[EventType.runStarted] = (event) {
final runEvent = event as RunStartedEvent;
print('π Run started');
};
handlers[EventType.textMessageContent] = (event) {
final textEvent = event as TextMessageContentEvent;
stdout.write(textEvent.content);
};
handlers[EventType.toolCallStart] = (event) {
final toolEvent = event as ToolCallStartEvent;
print('\nπ§ Tool call started');
};
handlers[EventType.toolCallResult] = (event) {
final resultEvent = event as ToolCallResultEvent;
print('β
Tool result received');
};
handlers[EventType.runError] = (event) {
final errorEvent = event as RunErrorEvent;
print('β Error occurred');
};
handlers[EventType.runFinished] = (event) {
print('\n⨠Run complete');
};
}
Future<void> processStream(Stream<BaseEvent> eventStream) async {
await for (final event in eventStream) {
final handler = handlers[event.type];
if (handler != null) {
handler(event);
} else {
// Handle unknown event types
print('Unknown event: ${event.type}');
}
}
}
}
// Usage
void main() async {
final client = AgUiClient(
config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
);
final processor = EventProcessor();
final input = SimpleRunAgentInput(
messages: [UserMessage(id: 'msg_1', content: 'Hello')],
);
await processor.processStream(
client.runAgent('agentic_chat', input),
);
}
Using Environment Variables
import 'dart:io';
import 'package:ag_ui/ag_ui.dart';
AgUiClient createClientFromEnv() {
// Read configuration from environment
final baseUrl = Platform.environment['AGUI_BASE_URL'] ?? 'http://localhost:8000';
final apiKey = Platform.environment['AGUI_API_KEY'];
final timeout = int.tryParse(Platform.environment['AGUI_TIMEOUT'] ?? '30') ?? 30;
final headers = <String, String>{};
if (apiKey != null && apiKey.isNotEmpty) {
headers['Authorization'] = 'Bearer $apiKey';
}
return AgUiClient(
config: AgUiClientConfig(
baseUrl: baseUrl,
defaultHeaders: headers,
requestTimeout: Duration(seconds: timeout),
),
);
}
// Usage with environment variables:
// AGUI_BASE_URL=https://api.example.com AGUI_API_KEY=sk-xxx dart run main.dart
Complete End-to-End Example
import 'dart:io';
import 'dart:convert';
import 'package:ag_ui/ag_ui.dart';
Future<void> main() async {
// Initialize client
final client = AgUiClient(
config: AgUiClientConfig(
baseUrl: Platform.environment['AGUI_BASE_URL'] ?? 'http://localhost:8000',
),
);
// Get user input
stdout.write('Enter your message: ');
final userInput = stdin.readLineSync() ?? '';
if (userInput.isEmpty) {
print('No input provided');
return;
}
// Create conversation context
final threadId = 'thread_${DateTime.now().millisecondsSinceEpoch}';
final messages = <Message>[];
// Add user message
messages.add(UserMessage(
id: 'msg_${DateTime.now().millisecondsSinceEpoch}',
content: userInput,
));
// Track tool calls for processing
final pendingToolCalls = <ToolCall>[];
var runCount = 0;
while (true) {
runCount++;
print('\nπ Starting run #$runCount');
final input = SimpleRunAgentInput(
threadId: threadId,
messages: messages,
);
var runComplete = false;
var hasNewToolCalls = false;
try {
await for (final event in client.runToolBasedGenerativeUi(input)) {
switch (event.type) {
case EventType.runStarted:
print('π Run started');
break;
case EventType.textMessageContent:
final textEvent = event as TextMessageContentEvent;
stdout.write(textEvent.text);
break;
case EventType.messagesSnapshot:
final snapshot = event as MessagesSnapshotEvent;
// Update messages with latest snapshot
messages.clear();
messages.addAll(snapshot.messages);
// Check for new tool calls
for (final message in snapshot.messages) {
if (message is AssistantMessage && message.toolCalls != null) {
for (final toolCall in message.toolCalls!) {
if (!pendingToolCalls.any((tc) => tc.id == toolCall.id)) {
pendingToolCalls.add(toolCall);
hasNewToolCalls = true;
print('\nπ§ Tool call: ${toolCall.function.name}');
}
}
}
}
break;
case EventType.runFinished:
print('\nβ
Run #$runCount complete');
runComplete = true;
break;
case EventType.runError:
final errorEvent = event as RunErrorEvent;
print('\nβ Error occurred');
break;
default:
// Handle other events as needed
break;
}
}
} catch (e) {
print('\nβ Error during run: $e');
break;
}
// Process tool calls if any
if (runComplete && hasNewToolCalls) {
print('\nπ Processing ${pendingToolCalls.length} tool calls...');
for (final toolCall in pendingToolCalls) {
stdout.write('Enter result for ${toolCall.function.name} (or press Enter for default): ');
final userResult = stdin.readLineSync();
final result = userResult?.isNotEmpty == true
? userResult!
: 'Tool executed successfully';
messages.add(ToolMessage(
id: 'tool_result_${DateTime.now().millisecondsSinceEpoch}',
toolCallId: toolCall.id,
content: result,
));
}
pendingToolCalls.clear();
continue; // Start new run with tool results
}
// No more tool calls, conversation complete
break;
}
print('\nπ Conversation complete!');
client.dispose();
}
API Overview
Core Types
AgUiClient
Main client for interacting with AG-UI servers:
streamEvents(endpoint, input)
- Stream events from an endpointsendRequest(endpoint, input)
- Send one-shot requests- Configuration via
AgUiConfig
RunAgentInput
Input structure for agent runs:
RunAgentInput(
threadId: String, // Conversation thread ID
runId: String, // Unique run identifier
messages: List<Message>, // Conversation history
state: Map<String, dynamic>, // Application state
tools: List<Tool>, // Available tools
context: List<dynamic>, // Additional context
forwardedProps: Map<String, dynamic>, // Pass-through properties
)
Message Types
UserMessage
- User input messagesAssistantMessage
- Agent responses with optional tool callsSystemMessage
- System-level messagesToolMessage
- Tool execution results
Event Types
- Lifecycle:
RUN_STARTED
,RUN_FINISHED
- Messages:
TEXT_MESSAGE_STARTED
,TEXT_MESSAGE_CHUNK
,TEXT_MESSAGE_FINISHED
- State:
STATE_SNAPSHOT
,STATE_DELTA
- Tools:
TOOL_CALL_STARTED
,TOOL_CALL_FINISHED
,TOOL_RESULT
- UI:
GENERATIVE_UI_ELEMENT_*
Error Handling
try {
await for (final event in client.streamEvents('/agent', input)) {
// Process events
}
} on ValidationError catch (e) {
print('Invalid input: ${e.message}');
} on NetworkError catch (e) {
print('Network issue: ${e.message}');
} on ServerError catch (e) {
print('Server error: ${e.statusCode} - ${e.message}');
} catch (e) {
print('Unexpected error: $e');
}
Example Application
A complete example application is available at sdks/community/dart/example/
demonstrating:
- Interactive CLI for testing AG-UI servers
- Tool-based generative UI flows
- Message streaming and event handling
- Automatic tool response generation
Run the example:
cd sdks/community/dart/example
dart pub get
dart run ag_ui_example --help
Testing
The SDK includes comprehensive integration tests that validate compatibility with AG-UI servers. To run tests locally:
cd sdks/community/dart
# Run unit tests
dart test
# Run integration tests (requires local server)
cd test/integration
./helpers/start_server.sh # Start test server
dart test simple_qa_test.dart
./helpers/stop_server.sh # Stop test server
For Docker-based testing:
dart test simple_qa_docker_test.dart # Automatically manages container
Troubleshooting
Connection Timeouts
Adjust timeout in client configuration:
final client = AgUiClient(
config: AgUiConfig(
baseUrl: 'http://localhost:20203',
defaultTimeout: Duration(seconds: 60), // Increase timeout
),
);
SSL/TLS Issues
For self-signed certificates in development:
final client = AgUiClient(
config: AgUiConfig(
baseUrl: 'https://localhost:20203',
validateCertificates: false, // Development only!
),
);
Debug Logging
Enable debug output:
export DEBUG=true
dart run your_app.dart
Contributing
Contributions are welcome! Please:
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
Resources
- AG-UI Protocol Specification
- AG-UI Documentation
- TypeScript SDK Reference
- Python SDK Reference
- AG-UI Dojo - Interactive protocol demonstration
License
This SDK is part of the AG-UI Protocol project. See the main repository for license information.
Support
For issues, questions, or feature requests:
- Open an issue on GitHub
- Check existing discussions
- Review the protocol specification for protocol details
Libraries
- ag_ui
- AG-UI Dart SDK - Standardizing agent-user interactions