数据流#
本文档描述 Cloud Native MCP Server 中的请求和响应数据流。
请求流#
完整请求流程#
客户端
│
├─> HTTP/SSE 连接
│
├─> 认证中间件
│ ├─> 验证 API Key/Token
│ └─> 检查权限
│
├─> 速率限制中间件
│ └─> 检查配额
│
├─> 路由层
│ ├─> 解析服务和方法
│ └─> 验证参数
│
├─> 审计中间件
│ └─> 记录请求开始
│
├─> 服务管理器
│ └─> 路由到服务
│
├─> 缓存层
│ ├─> 检查缓存
│ └─> 返回缓存或继续
│
├─> 服务
│ ├─> 调用外部 API
│ ├─> 处理响应
│ └─> 更新缓存
│
├─> 审计中间件
│ └─> 记录请求完成
│
├─> 指标中间件
│ └─> 记录指标
│
└─> 响应返回客户端
详细步骤#
1. 客户端连接#
客户端通过以下方式之一连接到服务器:
- SSE:
GET /api/kubernetes/sse - Streamable-HTTP:
POST /api/kubernetes/streamable-http
请求格式 (JSON-RPC 2.0):
1
2
3
4
5
6
7
8
9
10
11
| {
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "list_pods",
"arguments": {
"namespace": "default"
}
}
}
|
2. 认证中间件#
验证客户端身份:
1
2
3
4
5
6
7
8
9
10
| // 检查认证头
apiKey := r.Header.Get("X-API-Key")
if apiKey == "" {
return errors.New("missing API key")
}
// 验证 API Key
if apiKey != config.Auth.APIKey {
return errors.New("invalid API key")
}
|
认证方式:
- API Key:
X-API-Key: your-key - Bearer Token:
Authorization: Bearer your-token - Basic Auth:
Authorization: Basic base64(user:pass)
3. 速率限制中间件#
检查客户端请求频率:
1
2
3
4
5
6
| // Token Bucket 算法
if rateLimiter.Allow() {
// 继续处理请求
} else {
return errors.New("rate limit exceeded")
}
|
配置:
1
2
3
4
| ratelimit:
enabled: true
requests_per_second: 100
burst: 200
|
4. 路由层#
解析请求并路由到正确的服务:
1
2
3
4
5
6
7
8
9
| // 解析工具名称
toolName := params["name"].(string)
// 路由到服务
serviceName := getToolService(toolName)
service := manager.GetService(serviceName)
// 调用工具
result, err := service.CallTool(ctx, toolName, params["arguments"].(map[string]interface{}))
|
5. 审计中间件#
记录请求信息:
1
2
3
4
5
6
7
8
9
| auditLog := &AuditLog{
Timestamp: time.Now(),
RequestID: getRequestID(r),
Tool: toolName,
Params: params["arguments"],
Status: "started",
}
auditLogger.Log(auditLog)
|
6. 服务管理器#
管理服务生命周期:
1
2
3
4
5
6
7
8
9
10
| // 获取服务
service := registry.GetService(serviceName)
// 检查服务健康
if err := service.HealthCheck(); err != nil {
return fmt.Errorf("service unavailable: %w", err)
}
// 调用工具
result, err := service.CallTool(ctx, toolName, arguments)
|
7. 缓存层#
检查和更新缓存:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| // 生成缓存键
cacheKey := fmt.Sprintf("%s:%s:%v", serviceName, toolName, arguments)
// 检查缓存
if cached, found := cache.Get(cacheKey); found {
return cached, nil
}
// 调用服务
result, err := service.CallTool(ctx, toolName, arguments)
// 更新缓存
if err == nil {
cache.Set(cacheKey, result, ttl)
}
|
8. 服务执行#
调用外部服务:
1
2
3
4
5
6
7
8
| // Kubernetes 示例
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, options)
// Grafana 示例
dashboards, err := grafanaClient.ListDashboards(ctx)
// Prometheus 示例
result, err := prometheusClient.Query(ctx, query)
|
9. 审计记录完成#
记录请求完成信息:
1
2
3
4
5
| auditLog.Status = "completed"
auditLog.Duration = time.Since(startTime)
auditLog.Error = err
auditLogger.Log(auditLog)
|
10. 指标记录#
记录性能指标:
1
2
| metrics.RecordRequest(toolName, "success", duration)
metrics.RecordCacheHit(serviceName, cacheHit)
|
11. 响应返回#
返回结果给客户端:
1
2
3
4
5
6
7
8
9
10
11
12
| {
"jsonrpc": "2.0",
"id": 1,
"result": {
"pods": [
{
"name": "pod-1",
"status": "Running"
}
]
}
}
|
响应流#
响应处理流程#
服务
│
├─> 处理结果
│
├─> 数据转换
│ ├─> 格式化
│ └─> 压缩
│
├─> 缓存更新
│ └─> 存储到缓存
│
├─> 指标更新
│ └─> 记录性能指标
│
└─> 返回响应
响应优化#
数据转换#
1
2
3
4
5
6
7
8
| // 格式化响应
formatted := formatResponse(result)
// 压缩响应
if config.Performance.CompressionEnabled {
compressed := compressResponse(formatted)
return compressed
}
|
缓存更新#
1
2
3
4
5
| // 存储到缓存
cache.Set(cacheKey, result, ttl)
// 记录缓存统计
metrics.RecordCacheStore(serviceName)
|
指标更新#
1
2
3
4
5
| // 记录请求指标
metrics.RecordRequest(toolName, status, duration)
// 记录延迟
metrics.RecordLatency(toolName, duration)
|
SSE 流式响应#
SSE 连接流程#
客户端
│
├─> GET /api/kubernetes/sse
│
├─> 认证中间件
│
├─> 建立持久连接
│
├─> 发送事件
│ ├─> Event: data
│ ├─> Event: update
│ └─> Event: complete
│
└─> 保持连接
SSE 事件格式#
event: data
data: {"type":"pod","name":"pod-1","status":"Running"}
event: data
data: {"type":"pod","name":"pod-2","status":"Pending"}
event: complete
data: {"message":"All data sent"}
SSE 配置#
1
2
3
4
| server:
mode: "sse"
writeTimeoutSec: 0 # 保持连接
idleTimeoutSec: 60
|
Streamable-HTTP 响应#
流式 HTTP 响应流程#
客户端
│
├─> POST /api/kubernetes/streamable-http
│
├─> 认证中间件
│
├─> 处理请求
│
├─> 流式返回数据
│ ├─> Chunk 1
│ ├─> Chunk 2
│ └─> Chunk N
│
└─> 完成
流式响应格式#
HTTP/1.1 200 OK
Content-Type: application/json
Transfer-Encoding: chunked
{"data": "chunk-1"}
{"data": "chunk-2"}
{"data": "chunk-3"}
错误处理#
错误响应格式#
1
2
3
4
5
6
7
8
9
10
11
| {
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Invalid params",
"data": {
"details": "namespace is required"
}
}
}
|
错误类型#
- InvalidParams: 参数无效
- NotFound: 资源不存在
- PermissionDenied: 权限不足
- Timeout: 请求超时
- InternalError: 内部错误
- ServiceUnavailable: 服务不可用
错误处理流程#
服务错误
│
├─> 错误转换
│ └─> 转换为标准错误格式
│
├─> 审计记录
│ └─> 记录错误信息
│
├─> 指标记录
│ └─> 记录错误指标
│
└─> 返回错误响应
性能优化#
缓存策略#
1
2
3
4
5
6
7
8
9
10
| // 缓存键生成
cacheKey := generateCacheKey(serviceName, toolName, arguments)
// LRU 缓存
if cached, found := cache.Get(cacheKey); found {
return cached, nil
}
// 缓存更新
cache.Set(cacheKey, result, ttl)
|
连接池#
1
2
3
4
5
6
7
8
| // HTTP 客户端连接池
client := &http.Client{
Transport: &http.Transport{
MaxIdleConns: 100,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 90 * time.Second,
},
}
|
批处理#
1
2
3
4
5
6
7
| // 批量获取资源
pods, err := k8sClient.CoreV1().Pods(namespace).List(ctx, options)
// 批量处理
for _, pod := range pods.Items {
processPod(pod)
}
|
相关文档#