Merge pull request #1242 from Ethan0x0000/feat/anthropic-openai-endpoint-compat
支持 Anthropic Responses / Chat Completions 兼容端点并完善会话一致性与错误可观测性
This commit is contained in:
@@ -643,6 +643,7 @@ urlFallbackLoop:
|
||||
AccountID: p.account.ID,
|
||||
AccountName: p.account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
@@ -720,6 +721,7 @@ urlFallbackLoop:
|
||||
AccountName: p.account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "retry",
|
||||
Message: upstreamMsg,
|
||||
Detail: getUpstreamDetail(respBody),
|
||||
@@ -754,6 +756,7 @@ urlFallbackLoop:
|
||||
AccountName: p.account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "retry",
|
||||
Message: upstreamMsg,
|
||||
Detail: getUpstreamDetail(respBody),
|
||||
|
||||
485
backend/internal/service/gateway_forward_as_chat_completions.go
Normal file
485
backend/internal/service/gateway_forward_as_chat_completions.go
Normal file
@@ -0,0 +1,485 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
||||
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/tidwall/gjson"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ForwardAsChatCompletions accepts an OpenAI Chat Completions API request body,
|
||||
// converts it to Anthropic Messages format (chained via Responses format),
|
||||
// forwards to the Anthropic upstream, and converts the response back to Chat
|
||||
// Completions format. This enables Chat Completions clients to access Anthropic
|
||||
// models through Anthropic platform groups.
|
||||
func (s *GatewayService) ForwardAsChatCompletions(
|
||||
ctx context.Context,
|
||||
c *gin.Context,
|
||||
account *Account,
|
||||
body []byte,
|
||||
parsed *ParsedRequest,
|
||||
) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
// 1. Parse Chat Completions request
|
||||
var ccReq apicompat.ChatCompletionsRequest
|
||||
if err := json.Unmarshal(body, &ccReq); err != nil {
|
||||
return nil, fmt.Errorf("parse chat completions request: %w", err)
|
||||
}
|
||||
originalModel := ccReq.Model
|
||||
clientStream := ccReq.Stream
|
||||
includeUsage := ccReq.StreamOptions != nil && ccReq.StreamOptions.IncludeUsage
|
||||
|
||||
// 2. Convert CC → Responses → Anthropic (chained conversion)
|
||||
responsesReq, err := apicompat.ChatCompletionsToResponses(&ccReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("convert chat completions to responses: %w", err)
|
||||
}
|
||||
|
||||
anthropicReq, err := apicompat.ResponsesToAnthropicRequest(responsesReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("convert responses to anthropic: %w", err)
|
||||
}
|
||||
|
||||
// 3. Force upstream streaming
|
||||
anthropicReq.Stream = true
|
||||
reqStream := true
|
||||
|
||||
// 4. Model mapping
|
||||
mappedModel := originalModel
|
||||
if account.Type == AccountTypeAPIKey {
|
||||
mappedModel = account.GetMappedModel(originalModel)
|
||||
}
|
||||
if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey {
|
||||
normalized := claude.NormalizeModelID(originalModel)
|
||||
if normalized != originalModel {
|
||||
mappedModel = normalized
|
||||
}
|
||||
}
|
||||
anthropicReq.Model = mappedModel
|
||||
|
||||
logger.L().Debug("gateway forward_as_chat_completions: model mapping applied",
|
||||
zap.Int64("account_id", account.ID),
|
||||
zap.String("original_model", originalModel),
|
||||
zap.String("mapped_model", mappedModel),
|
||||
zap.Bool("client_stream", clientStream),
|
||||
)
|
||||
|
||||
// 5. Marshal Anthropic request body
|
||||
anthropicBody, err := json.Marshal(anthropicReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal anthropic request: %w", err)
|
||||
}
|
||||
|
||||
// 6. Apply Claude Code mimicry for OAuth accounts
|
||||
isClaudeCode := false // CC API is never Claude Code
|
||||
shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode
|
||||
|
||||
if shouldMimicClaudeCode {
|
||||
if !strings.Contains(strings.ToLower(mappedModel), "haiku") &&
|
||||
!systemIncludesClaudeCodePrompt(anthropicReq.System) {
|
||||
anthropicBody = injectClaudeCodePrompt(anthropicBody, anthropicReq.System)
|
||||
}
|
||||
}
|
||||
|
||||
// 7. Enforce cache_control block limit
|
||||
anthropicBody = enforceCacheControlLimit(anthropicBody)
|
||||
|
||||
// 8. Get access token
|
||||
token, tokenType, err := s.GetAccessToken(ctx, account)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get access token: %w", err)
|
||||
}
|
||||
|
||||
// 9. Get proxy URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// 10. Build upstream request
|
||||
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
|
||||
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode)
|
||||
releaseUpstreamCtx()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("build upstream request: %w", err)
|
||||
}
|
||||
|
||||
// 11. Send request
|
||||
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
||||
if err != nil {
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
setOpsUpstreamError(c, 0, safeErr, "")
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
|
||||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// 12. Handle error response with failover
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
|
||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
})
|
||||
if s.rateLimitService != nil {
|
||||
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
||||
}
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: resp.StatusCode,
|
||||
ResponseBody: respBody,
|
||||
}
|
||||
}
|
||||
|
||||
writeGatewayCCError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
|
||||
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
|
||||
}
|
||||
|
||||
// 13. Extract reasoning effort from CC request body
|
||||
reasoningEffort := extractCCReasoningEffortFromBody(body)
|
||||
|
||||
// 14. Handle normal response
|
||||
// Read Anthropic SSE → convert to Responses events → convert to CC format
|
||||
var result *ForwardResult
|
||||
var handleErr error
|
||||
if clientStream {
|
||||
result, handleErr = s.handleCCStreamingFromAnthropic(resp, c, originalModel, mappedModel, reasoningEffort, startTime, includeUsage)
|
||||
} else {
|
||||
result, handleErr = s.handleCCBufferedFromAnthropic(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
|
||||
}
|
||||
|
||||
return result, handleErr
|
||||
}
|
||||
|
||||
// extractCCReasoningEffortFromBody reads reasoning effort from a Chat Completions
|
||||
// request body. It checks both nested (reasoning.effort) and flat (reasoning_effort)
|
||||
// formats used by OpenAI-compatible clients.
|
||||
func extractCCReasoningEffortFromBody(body []byte) *string {
|
||||
raw := strings.TrimSpace(gjson.GetBytes(body, "reasoning.effort").String())
|
||||
if raw == "" {
|
||||
raw = strings.TrimSpace(gjson.GetBytes(body, "reasoning_effort").String())
|
||||
}
|
||||
if raw == "" {
|
||||
return nil
|
||||
}
|
||||
normalized := normalizeOpenAIReasoningEffort(raw)
|
||||
if normalized == "" {
|
||||
return nil
|
||||
}
|
||||
return &normalized
|
||||
}
|
||||
|
||||
// handleCCBufferedFromAnthropic reads Anthropic SSE events, assembles the full
|
||||
// response, then converts Anthropic → Responses → Chat Completions.
|
||||
func (s *GatewayService) handleCCBufferedFromAnthropic(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
originalModel string,
|
||||
mappedModel string,
|
||||
reasoningEffort *string,
|
||||
startTime time.Time,
|
||||
) (*ForwardResult, error) {
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
maxLineSize := defaultMaxLineSize
|
||||
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
|
||||
maxLineSize = s.cfg.Gateway.MaxLineSize
|
||||
}
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
|
||||
var finalResp *apicompat.AnthropicResponse
|
||||
var usage ClaudeUsage
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "event: ") {
|
||||
continue
|
||||
}
|
||||
|
||||
if !scanner.Scan() {
|
||||
break
|
||||
}
|
||||
dataLine := scanner.Text()
|
||||
if !strings.HasPrefix(dataLine, "data: ") {
|
||||
continue
|
||||
}
|
||||
payload := dataLine[6:]
|
||||
|
||||
var event apicompat.AnthropicStreamEvent
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// message_start carries the initial response structure and cache usage
|
||||
if event.Type == "message_start" && event.Message != nil {
|
||||
finalResp = event.Message
|
||||
mergeAnthropicUsage(&usage, event.Message.Usage)
|
||||
}
|
||||
|
||||
// message_delta carries final usage and stop_reason
|
||||
if event.Type == "message_delta" {
|
||||
if event.Usage != nil {
|
||||
mergeAnthropicUsage(&usage, *event.Usage)
|
||||
}
|
||||
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
|
||||
finalResp.StopReason = event.Delta.StopReason
|
||||
}
|
||||
}
|
||||
if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil {
|
||||
finalResp.Content = append(finalResp.Content, *event.ContentBlock)
|
||||
}
|
||||
if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil {
|
||||
idx := *event.Index
|
||||
if idx < len(finalResp.Content) {
|
||||
switch event.Delta.Type {
|
||||
case "text_delta":
|
||||
finalResp.Content[idx].Text += event.Delta.Text
|
||||
case "thinking_delta":
|
||||
finalResp.Content[idx].Thinking += event.Delta.Thinking
|
||||
case "input_json_delta":
|
||||
finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
logger.L().Warn("forward_as_cc buffered: read error",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if finalResp == nil {
|
||||
writeGatewayCCError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response")
|
||||
return nil, fmt.Errorf("upstream stream ended without response")
|
||||
}
|
||||
|
||||
// Update usage from accumulated delta
|
||||
if usage.InputTokens > 0 || usage.OutputTokens > 0 {
|
||||
finalResp.Usage = apicompat.AnthropicUsage{
|
||||
InputTokens: usage.InputTokens,
|
||||
OutputTokens: usage.OutputTokens,
|
||||
CacheCreationInputTokens: usage.CacheCreationInputTokens,
|
||||
CacheReadInputTokens: usage.CacheReadInputTokens,
|
||||
}
|
||||
}
|
||||
|
||||
// Chain: Anthropic → Responses → Chat Completions
|
||||
responsesResp := apicompat.AnthropicToResponsesResponse(finalResp)
|
||||
ccResp := apicompat.ResponsesToChatCompletions(responsesResp, originalModel)
|
||||
|
||||
if s.responseHeaderFilter != nil {
|
||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
}
|
||||
c.JSON(http.StatusOK, ccResp)
|
||||
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
UpstreamModel: mappedModel,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: false,
|
||||
Duration: time.Since(startTime),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// handleCCStreamingFromAnthropic reads Anthropic SSE events, converts each
|
||||
// to Responses events, then to Chat Completions chunks, and writes them.
|
||||
func (s *GatewayService) handleCCStreamingFromAnthropic(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
originalModel string,
|
||||
mappedModel string,
|
||||
reasoningEffort *string,
|
||||
startTime time.Time,
|
||||
includeUsage bool,
|
||||
) (*ForwardResult, error) {
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
|
||||
if s.responseHeaderFilter != nil {
|
||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
}
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||||
c.Writer.Header().Set("Connection", "keep-alive")
|
||||
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
||||
c.Writer.WriteHeader(http.StatusOK)
|
||||
|
||||
// Use Anthropic→Responses state machine, then convert Responses→CC
|
||||
anthState := apicompat.NewAnthropicEventToResponsesState()
|
||||
anthState.Model = originalModel
|
||||
ccState := apicompat.NewResponsesEventToChatState()
|
||||
ccState.Model = originalModel
|
||||
ccState.IncludeUsage = includeUsage
|
||||
|
||||
var usage ClaudeUsage
|
||||
var firstTokenMs *int
|
||||
firstChunk := true
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
maxLineSize := defaultMaxLineSize
|
||||
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
|
||||
maxLineSize = s.cfg.Gateway.MaxLineSize
|
||||
}
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
|
||||
resultWithUsage := func() *ForwardResult {
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
UpstreamModel: mappedModel,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: true,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}
|
||||
}
|
||||
|
||||
writeChunk := func(chunk apicompat.ChatCompletionsChunk) bool {
|
||||
sse, err := apicompat.ChatChunkToSSE(chunk)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
||||
return true // client disconnected
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
processAnthropicEvent := func(event *apicompat.AnthropicStreamEvent) bool {
|
||||
if firstChunk {
|
||||
firstChunk = false
|
||||
ms := int(time.Since(startTime).Milliseconds())
|
||||
firstTokenMs = &ms
|
||||
}
|
||||
|
||||
// Extract usage from message_delta
|
||||
if event.Type == "message_delta" && event.Usage != nil {
|
||||
mergeAnthropicUsage(&usage, *event.Usage)
|
||||
}
|
||||
// Also capture usage from message_start (carries cache fields)
|
||||
if event.Type == "message_start" && event.Message != nil {
|
||||
mergeAnthropicUsage(&usage, event.Message.Usage)
|
||||
}
|
||||
|
||||
// Chain: Anthropic event → Responses events → CC chunks
|
||||
responsesEvents := apicompat.AnthropicEventToResponsesEvents(event, anthState)
|
||||
for _, resEvt := range responsesEvents {
|
||||
ccChunks := apicompat.ResponsesEventToChatChunks(&resEvt, ccState)
|
||||
for _, chunk := range ccChunks {
|
||||
if disconnected := writeChunk(chunk); disconnected {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
c.Writer.Flush()
|
||||
return false
|
||||
}
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "event: ") {
|
||||
continue
|
||||
}
|
||||
|
||||
if !scanner.Scan() {
|
||||
break
|
||||
}
|
||||
dataLine := scanner.Text()
|
||||
if !strings.HasPrefix(dataLine, "data: ") {
|
||||
continue
|
||||
}
|
||||
payload := dataLine[6:]
|
||||
|
||||
var event apicompat.AnthropicStreamEvent
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if processAnthropicEvent(&event) {
|
||||
return resultWithUsage(), nil
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
logger.L().Warn("forward_as_cc stream: read error",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// Finalize both state machines
|
||||
finalResEvents := apicompat.FinalizeAnthropicResponsesStream(anthState)
|
||||
for _, resEvt := range finalResEvents {
|
||||
ccChunks := apicompat.ResponsesEventToChatChunks(&resEvt, ccState)
|
||||
for _, chunk := range ccChunks {
|
||||
writeChunk(chunk) //nolint:errcheck
|
||||
}
|
||||
}
|
||||
finalCCChunks := apicompat.FinalizeResponsesChatStream(ccState)
|
||||
for _, chunk := range finalCCChunks {
|
||||
writeChunk(chunk) //nolint:errcheck
|
||||
}
|
||||
|
||||
// Write [DONE] marker
|
||||
fmt.Fprint(c.Writer, "data: [DONE]\n\n") //nolint:errcheck
|
||||
c.Writer.Flush()
|
||||
|
||||
return resultWithUsage(), nil
|
||||
}
|
||||
|
||||
// writeGatewayCCError writes an error in OpenAI Chat Completions format for
|
||||
// the Anthropic-upstream CC forwarding path.
|
||||
func writeGatewayCCError(c *gin.Context, statusCode int, errType, message string) {
|
||||
c.JSON(statusCode, gin.H{
|
||||
"error": gin.H{
|
||||
"type": errType,
|
||||
"message": message,
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
//go:build unit
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExtractCCReasoningEffortFromBody(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
t.Run("nested reasoning.effort", func(t *testing.T) {
|
||||
got := extractCCReasoningEffortFromBody([]byte(`{"reasoning":{"effort":"HIGH"}}`))
|
||||
require.NotNil(t, got)
|
||||
require.Equal(t, "high", *got)
|
||||
})
|
||||
|
||||
t.Run("flat reasoning_effort", func(t *testing.T) {
|
||||
got := extractCCReasoningEffortFromBody([]byte(`{"reasoning_effort":"x-high"}`))
|
||||
require.NotNil(t, got)
|
||||
require.Equal(t, "xhigh", *got)
|
||||
})
|
||||
|
||||
t.Run("missing effort", func(t *testing.T) {
|
||||
require.Nil(t, extractCCReasoningEffortFromBody([]byte(`{"model":"gpt-5"}`)))
|
||||
})
|
||||
}
|
||||
|
||||
func TestHandleCCBufferedFromAnthropic_PreservesMessageStartCacheUsageAndReasoning(t *testing.T) {
|
||||
t.Parallel()
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
reasoningEffort := "high"
|
||||
resp := &http.Response{
|
||||
Header: http.Header{"x-request-id": []string{"rid_cc_buffered"}},
|
||||
Body: io.NopCloser(strings.NewReader(strings.Join([]string{
|
||||
`event: message_start`,
|
||||
`data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4.5","stop_reason":"","usage":{"input_tokens":12,"cache_read_input_tokens":9,"cache_creation_input_tokens":3}}}`,
|
||||
``,
|
||||
`event: content_block_start`,
|
||||
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":"hello"}}`,
|
||||
``,
|
||||
`event: message_delta`,
|
||||
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":7}}`,
|
||||
``,
|
||||
}, "\n"))),
|
||||
}
|
||||
|
||||
svc := &GatewayService{}
|
||||
result, err := svc.handleCCBufferedFromAnthropic(resp, c, "gpt-5", "claude-sonnet-4.5", &reasoningEffort, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, 12, result.Usage.InputTokens)
|
||||
require.Equal(t, 7, result.Usage.OutputTokens)
|
||||
require.Equal(t, 9, result.Usage.CacheReadInputTokens)
|
||||
require.Equal(t, 3, result.Usage.CacheCreationInputTokens)
|
||||
require.NotNil(t, result.ReasoningEffort)
|
||||
require.Equal(t, "high", *result.ReasoningEffort)
|
||||
}
|
||||
|
||||
func TestHandleCCStreamingFromAnthropic_PreservesMessageStartCacheUsageAndReasoning(t *testing.T) {
|
||||
t.Parallel()
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
reasoningEffort := "medium"
|
||||
resp := &http.Response{
|
||||
Header: http.Header{"x-request-id": []string{"rid_cc_stream"}},
|
||||
Body: io.NopCloser(strings.NewReader(strings.Join([]string{
|
||||
`event: message_start`,
|
||||
`data: {"type":"message_start","message":{"id":"msg_2","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4.5","stop_reason":"","usage":{"input_tokens":20,"cache_read_input_tokens":11,"cache_creation_input_tokens":4}}}`,
|
||||
``,
|
||||
`event: content_block_start`,
|
||||
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":"hello"}}`,
|
||||
``,
|
||||
`event: message_delta`,
|
||||
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":8}}`,
|
||||
``,
|
||||
`event: message_stop`,
|
||||
`data: {"type":"message_stop"}`,
|
||||
``,
|
||||
}, "\n"))),
|
||||
}
|
||||
|
||||
svc := &GatewayService{}
|
||||
result, err := svc.handleCCStreamingFromAnthropic(resp, c, "gpt-5", "claude-sonnet-4.5", &reasoningEffort, time.Now(), true)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, 20, result.Usage.InputTokens)
|
||||
require.Equal(t, 8, result.Usage.OutputTokens)
|
||||
require.Equal(t, 11, result.Usage.CacheReadInputTokens)
|
||||
require.Equal(t, 4, result.Usage.CacheCreationInputTokens)
|
||||
require.NotNil(t, result.ReasoningEffort)
|
||||
require.Equal(t, "medium", *result.ReasoningEffort)
|
||||
require.Contains(t, rec.Body.String(), `[DONE]`)
|
||||
}
|
||||
518
backend/internal/service/gateway_forward_as_responses.go
Normal file
518
backend/internal/service/gateway_forward_as_responses.go
Normal file
@@ -0,0 +1,518 @@
|
||||
package service
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/apicompat"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/claude"
|
||||
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
|
||||
"github.com/Wei-Shaw/sub2api/internal/util/responseheaders"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/tidwall/gjson"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ForwardAsResponses accepts an OpenAI Responses API request body, converts it
|
||||
// to Anthropic Messages format, forwards to the Anthropic upstream, and converts
|
||||
// the response back to Responses format. This enables OpenAI Responses API
|
||||
// clients to access Anthropic models through Anthropic platform groups.
|
||||
//
|
||||
// The method follows the same pattern as OpenAIGatewayService.ForwardAsAnthropic
|
||||
// but in reverse direction: Responses → Anthropic upstream → Responses.
|
||||
func (s *GatewayService) ForwardAsResponses(
|
||||
ctx context.Context,
|
||||
c *gin.Context,
|
||||
account *Account,
|
||||
body []byte,
|
||||
parsed *ParsedRequest,
|
||||
) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
// 1. Parse Responses request
|
||||
var responsesReq apicompat.ResponsesRequest
|
||||
if err := json.Unmarshal(body, &responsesReq); err != nil {
|
||||
return nil, fmt.Errorf("parse responses request: %w", err)
|
||||
}
|
||||
originalModel := responsesReq.Model
|
||||
clientStream := responsesReq.Stream
|
||||
|
||||
// 2. Convert Responses → Anthropic
|
||||
anthropicReq, err := apicompat.ResponsesToAnthropicRequest(&responsesReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("convert responses to anthropic: %w", err)
|
||||
}
|
||||
|
||||
// 3. Force upstream streaming (Anthropic works best with streaming)
|
||||
anthropicReq.Stream = true
|
||||
reqStream := true
|
||||
|
||||
// 4. Model mapping
|
||||
mappedModel := originalModel
|
||||
reasoningEffort := ExtractResponsesReasoningEffortFromBody(body)
|
||||
if account.Type == AccountTypeAPIKey {
|
||||
mappedModel = account.GetMappedModel(originalModel)
|
||||
}
|
||||
if mappedModel == originalModel && account.Platform == PlatformAnthropic && account.Type != AccountTypeAPIKey {
|
||||
normalized := claude.NormalizeModelID(originalModel)
|
||||
if normalized != originalModel {
|
||||
mappedModel = normalized
|
||||
}
|
||||
}
|
||||
anthropicReq.Model = mappedModel
|
||||
|
||||
logger.L().Debug("gateway forward_as_responses: model mapping applied",
|
||||
zap.Int64("account_id", account.ID),
|
||||
zap.String("original_model", originalModel),
|
||||
zap.String("mapped_model", mappedModel),
|
||||
zap.Bool("client_stream", clientStream),
|
||||
)
|
||||
|
||||
// 5. Marshal Anthropic request body
|
||||
anthropicBody, err := json.Marshal(anthropicReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("marshal anthropic request: %w", err)
|
||||
}
|
||||
|
||||
// 6. Apply Claude Code mimicry for OAuth accounts (non-Claude-Code endpoints)
|
||||
isClaudeCode := false // Responses API is never Claude Code
|
||||
shouldMimicClaudeCode := account.IsOAuth() && !isClaudeCode
|
||||
|
||||
if shouldMimicClaudeCode {
|
||||
if !strings.Contains(strings.ToLower(mappedModel), "haiku") &&
|
||||
!systemIncludesClaudeCodePrompt(anthropicReq.System) {
|
||||
anthropicBody = injectClaudeCodePrompt(anthropicBody, anthropicReq.System)
|
||||
}
|
||||
}
|
||||
|
||||
// 7. Enforce cache_control block limit
|
||||
anthropicBody = enforceCacheControlLimit(anthropicBody)
|
||||
|
||||
// 8. Get access token
|
||||
token, tokenType, err := s.GetAccessToken(ctx, account)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get access token: %w", err)
|
||||
}
|
||||
|
||||
// 9. Get proxy URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// 10. Build upstream request
|
||||
upstreamCtx, releaseUpstreamCtx := detachStreamUpstreamContext(ctx, reqStream)
|
||||
upstreamReq, err := s.buildUpstreamRequest(upstreamCtx, c, account, anthropicBody, token, tokenType, mappedModel, reqStream, shouldMimicClaudeCode)
|
||||
releaseUpstreamCtx()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("build upstream request: %w", err)
|
||||
}
|
||||
|
||||
// 11. Send request
|
||||
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
||||
if err != nil {
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
safeErr := sanitizeUpstreamErrorMessage(err.Error())
|
||||
setOpsUpstreamError(c, 0, safeErr, "")
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream request failed")
|
||||
return nil, fmt.Errorf("upstream request failed: %s", safeErr)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// 12. Handle error response with failover
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
|
||||
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
})
|
||||
if s.rateLimitService != nil {
|
||||
s.rateLimitService.HandleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody)
|
||||
}
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: resp.StatusCode,
|
||||
ResponseBody: respBody,
|
||||
}
|
||||
}
|
||||
|
||||
// Non-failover error: return Responses-formatted error to client
|
||||
writeResponsesError(c, mapUpstreamStatusCode(resp.StatusCode), "server_error", upstreamMsg)
|
||||
return nil, fmt.Errorf("upstream error: %d %s", resp.StatusCode, upstreamMsg)
|
||||
}
|
||||
|
||||
// 13. Handle normal response (convert Anthropic → Responses)
|
||||
var result *ForwardResult
|
||||
var handleErr error
|
||||
if clientStream {
|
||||
result, handleErr = s.handleResponsesStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
|
||||
} else {
|
||||
result, handleErr = s.handleResponsesBufferedStreamingResponse(resp, c, originalModel, mappedModel, reasoningEffort, startTime)
|
||||
}
|
||||
|
||||
return result, handleErr
|
||||
}
|
||||
|
||||
// ExtractResponsesReasoningEffortFromBody reads Responses API reasoning.effort
|
||||
// and normalizes it for usage logging.
|
||||
func ExtractResponsesReasoningEffortFromBody(body []byte) *string {
|
||||
raw := strings.TrimSpace(gjson.GetBytes(body, "reasoning.effort").String())
|
||||
if raw == "" {
|
||||
return nil
|
||||
}
|
||||
normalized := normalizeOpenAIReasoningEffort(raw)
|
||||
if normalized == "" {
|
||||
return nil
|
||||
}
|
||||
return &normalized
|
||||
}
|
||||
|
||||
func mergeAnthropicUsage(dst *ClaudeUsage, src apicompat.AnthropicUsage) {
|
||||
if dst == nil {
|
||||
return
|
||||
}
|
||||
if src.InputTokens > 0 {
|
||||
dst.InputTokens = src.InputTokens
|
||||
}
|
||||
if src.OutputTokens > 0 {
|
||||
dst.OutputTokens = src.OutputTokens
|
||||
}
|
||||
if src.CacheReadInputTokens > 0 {
|
||||
dst.CacheReadInputTokens = src.CacheReadInputTokens
|
||||
}
|
||||
if src.CacheCreationInputTokens > 0 {
|
||||
dst.CacheCreationInputTokens = src.CacheCreationInputTokens
|
||||
}
|
||||
}
|
||||
|
||||
// handleResponsesBufferedStreamingResponse reads all Anthropic SSE events from
|
||||
// the upstream streaming response, assembles them into a complete Anthropic
|
||||
// response, converts to Responses API JSON format, and writes it to the client.
|
||||
func (s *GatewayService) handleResponsesBufferedStreamingResponse(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
originalModel string,
|
||||
mappedModel string,
|
||||
reasoningEffort *string,
|
||||
startTime time.Time,
|
||||
) (*ForwardResult, error) {
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
maxLineSize := defaultMaxLineSize
|
||||
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
|
||||
maxLineSize = s.cfg.Gateway.MaxLineSize
|
||||
}
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
|
||||
// Accumulate the final Anthropic response from streaming events
|
||||
var finalResp *apicompat.AnthropicResponse
|
||||
var usage ClaudeUsage
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "event: ") {
|
||||
continue
|
||||
}
|
||||
eventType := strings.TrimPrefix(line, "event: ")
|
||||
|
||||
// Read the data line
|
||||
if !scanner.Scan() {
|
||||
break
|
||||
}
|
||||
dataLine := scanner.Text()
|
||||
if !strings.HasPrefix(dataLine, "data: ") {
|
||||
continue
|
||||
}
|
||||
payload := dataLine[6:]
|
||||
|
||||
var event apicompat.AnthropicStreamEvent
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
logger.L().Warn("forward_as_responses buffered: failed to parse event",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
zap.String("event_type", eventType),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
// message_start carries the initial response structure
|
||||
if event.Type == "message_start" && event.Message != nil {
|
||||
finalResp = event.Message
|
||||
mergeAnthropicUsage(&usage, event.Message.Usage)
|
||||
}
|
||||
|
||||
// message_delta carries final usage and stop_reason
|
||||
if event.Type == "message_delta" {
|
||||
if event.Usage != nil {
|
||||
mergeAnthropicUsage(&usage, *event.Usage)
|
||||
}
|
||||
if event.Delta != nil && event.Delta.StopReason != "" && finalResp != nil {
|
||||
finalResp.StopReason = event.Delta.StopReason
|
||||
}
|
||||
}
|
||||
|
||||
// Accumulate content blocks
|
||||
if event.Type == "content_block_start" && event.ContentBlock != nil && finalResp != nil {
|
||||
finalResp.Content = append(finalResp.Content, *event.ContentBlock)
|
||||
}
|
||||
if event.Type == "content_block_delta" && event.Delta != nil && finalResp != nil && event.Index != nil {
|
||||
idx := *event.Index
|
||||
if idx < len(finalResp.Content) {
|
||||
switch event.Delta.Type {
|
||||
case "text_delta":
|
||||
finalResp.Content[idx].Text += event.Delta.Text
|
||||
case "thinking_delta":
|
||||
finalResp.Content[idx].Thinking += event.Delta.Thinking
|
||||
case "input_json_delta":
|
||||
finalResp.Content[idx].Input = appendRawJSON(finalResp.Content[idx].Input, event.Delta.PartialJSON)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
logger.L().Warn("forward_as_responses buffered: read error",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if finalResp == nil {
|
||||
writeResponsesError(c, http.StatusBadGateway, "server_error", "Upstream stream ended without a response")
|
||||
return nil, fmt.Errorf("upstream stream ended without response")
|
||||
}
|
||||
|
||||
// Update usage from accumulated delta
|
||||
if usage.InputTokens > 0 || usage.OutputTokens > 0 {
|
||||
finalResp.Usage = apicompat.AnthropicUsage{
|
||||
InputTokens: usage.InputTokens,
|
||||
OutputTokens: usage.OutputTokens,
|
||||
CacheCreationInputTokens: usage.CacheCreationInputTokens,
|
||||
CacheReadInputTokens: usage.CacheReadInputTokens,
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to Responses format
|
||||
responsesResp := apicompat.AnthropicToResponsesResponse(finalResp)
|
||||
responsesResp.Model = originalModel // Use original model name
|
||||
|
||||
if s.responseHeaderFilter != nil {
|
||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
}
|
||||
c.JSON(http.StatusOK, responsesResp)
|
||||
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
UpstreamModel: mappedModel,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: false,
|
||||
Duration: time.Since(startTime),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// handleResponsesStreamingResponse reads Anthropic SSE events from upstream,
|
||||
// converts each to Responses SSE events, and writes them to the client.
|
||||
func (s *GatewayService) handleResponsesStreamingResponse(
|
||||
resp *http.Response,
|
||||
c *gin.Context,
|
||||
originalModel string,
|
||||
mappedModel string,
|
||||
reasoningEffort *string,
|
||||
startTime time.Time,
|
||||
) (*ForwardResult, error) {
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
|
||||
if s.responseHeaderFilter != nil {
|
||||
responseheaders.WriteFilteredHeaders(c.Writer.Header(), resp.Header, s.responseHeaderFilter)
|
||||
}
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||||
c.Writer.Header().Set("Connection", "keep-alive")
|
||||
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
||||
c.Writer.WriteHeader(http.StatusOK)
|
||||
|
||||
state := apicompat.NewAnthropicEventToResponsesState()
|
||||
state.Model = originalModel
|
||||
var usage ClaudeUsage
|
||||
var firstTokenMs *int
|
||||
firstChunk := true
|
||||
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
maxLineSize := defaultMaxLineSize
|
||||
if s.cfg != nil && s.cfg.Gateway.MaxLineSize > 0 {
|
||||
maxLineSize = s.cfg.Gateway.MaxLineSize
|
||||
}
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLineSize)
|
||||
|
||||
resultWithUsage := func() *ForwardResult {
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: usage,
|
||||
Model: originalModel,
|
||||
UpstreamModel: mappedModel,
|
||||
ReasoningEffort: reasoningEffort,
|
||||
Stream: true,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}
|
||||
}
|
||||
|
||||
// processEvent handles a single parsed Anthropic SSE event.
|
||||
processEvent := func(event *apicompat.AnthropicStreamEvent) bool {
|
||||
if firstChunk {
|
||||
firstChunk = false
|
||||
ms := int(time.Since(startTime).Milliseconds())
|
||||
firstTokenMs = &ms
|
||||
}
|
||||
|
||||
// Extract usage from message_delta
|
||||
if event.Type == "message_delta" && event.Usage != nil {
|
||||
mergeAnthropicUsage(&usage, *event.Usage)
|
||||
}
|
||||
// Also capture usage from message_start
|
||||
if event.Type == "message_start" && event.Message != nil {
|
||||
mergeAnthropicUsage(&usage, event.Message.Usage)
|
||||
}
|
||||
|
||||
// Convert to Responses events
|
||||
events := apicompat.AnthropicEventToResponsesEvents(event, state)
|
||||
for _, evt := range events {
|
||||
sse, err := apicompat.ResponsesEventToSSE(evt)
|
||||
if err != nil {
|
||||
logger.L().Warn("forward_as_responses stream: failed to marshal event",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
continue
|
||||
}
|
||||
if _, err := fmt.Fprint(c.Writer, sse); err != nil {
|
||||
logger.L().Info("forward_as_responses stream: client disconnected",
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
return true // client disconnected
|
||||
}
|
||||
}
|
||||
if len(events) > 0 {
|
||||
c.Writer.Flush()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
finalizeStream := func() (*ForwardResult, error) {
|
||||
if finalEvents := apicompat.FinalizeAnthropicResponsesStream(state); len(finalEvents) > 0 {
|
||||
for _, evt := range finalEvents {
|
||||
sse, err := apicompat.ResponsesEventToSSE(evt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
fmt.Fprint(c.Writer, sse) //nolint:errcheck
|
||||
}
|
||||
c.Writer.Flush()
|
||||
}
|
||||
return resultWithUsage(), nil
|
||||
}
|
||||
|
||||
// Read Anthropic SSE events
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "event: ") {
|
||||
continue
|
||||
}
|
||||
eventType := strings.TrimPrefix(line, "event: ")
|
||||
|
||||
// Read data line
|
||||
if !scanner.Scan() {
|
||||
break
|
||||
}
|
||||
dataLine := scanner.Text()
|
||||
if !strings.HasPrefix(dataLine, "data: ") {
|
||||
continue
|
||||
}
|
||||
payload := dataLine[6:]
|
||||
|
||||
var event apicompat.AnthropicStreamEvent
|
||||
if err := json.Unmarshal([]byte(payload), &event); err != nil {
|
||||
logger.L().Warn("forward_as_responses stream: failed to parse event",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
zap.String("event_type", eventType),
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
if processEvent(&event) {
|
||||
return resultWithUsage(), nil
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||||
logger.L().Warn("forward_as_responses stream: read error",
|
||||
zap.Error(err),
|
||||
zap.String("request_id", requestID),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
return finalizeStream()
|
||||
}
|
||||
|
||||
// appendRawJSON appends a JSON fragment string to existing raw JSON.
|
||||
func appendRawJSON(existing json.RawMessage, fragment string) json.RawMessage {
|
||||
if len(existing) == 0 {
|
||||
return json.RawMessage(fragment)
|
||||
}
|
||||
return json.RawMessage(string(existing) + fragment)
|
||||
}
|
||||
|
||||
// writeResponsesError writes an error response in OpenAI Responses API format.
|
||||
func writeResponsesError(c *gin.Context, statusCode int, code, message string) {
|
||||
c.JSON(statusCode, gin.H{
|
||||
"error": gin.H{
|
||||
"code": code,
|
||||
"message": message,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// mapUpstreamStatusCode maps upstream HTTP status codes to appropriate client-facing codes.
|
||||
func mapUpstreamStatusCode(code int) int {
|
||||
if code >= 500 {
|
||||
return http.StatusBadGateway
|
||||
}
|
||||
return code
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
//go:build unit
|
||||
|
||||
package service
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestExtractResponsesReasoningEffortFromBody(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
got := ExtractResponsesReasoningEffortFromBody([]byte(`{"model":"claude-sonnet-4.5","reasoning":{"effort":"HIGH"}}`))
|
||||
require.NotNil(t, got)
|
||||
require.Equal(t, "high", *got)
|
||||
|
||||
require.Nil(t, ExtractResponsesReasoningEffortFromBody([]byte(`{"model":"claude-sonnet-4.5"}`)))
|
||||
}
|
||||
|
||||
func TestHandleResponsesBufferedStreamingResponse_PreservesMessageStartCacheUsage(t *testing.T) {
|
||||
t.Parallel()
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
resp := &http.Response{
|
||||
Header: http.Header{"x-request-id": []string{"rid_buffered"}},
|
||||
Body: io.NopCloser(strings.NewReader(strings.Join([]string{
|
||||
`event: message_start`,
|
||||
`data: {"type":"message_start","message":{"id":"msg_1","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4.5","stop_reason":"","usage":{"input_tokens":12,"cache_read_input_tokens":9,"cache_creation_input_tokens":3}}}`,
|
||||
``,
|
||||
`event: content_block_start`,
|
||||
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":"hello"}}`,
|
||||
``,
|
||||
`event: message_delta`,
|
||||
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":7}}`,
|
||||
``,
|
||||
}, "\n"))),
|
||||
}
|
||||
|
||||
svc := &GatewayService{}
|
||||
result, err := svc.handleResponsesBufferedStreamingResponse(resp, c, "claude-sonnet-4.5", "claude-sonnet-4.5", nil, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, 12, result.Usage.InputTokens)
|
||||
require.Equal(t, 7, result.Usage.OutputTokens)
|
||||
require.Equal(t, 9, result.Usage.CacheReadInputTokens)
|
||||
require.Equal(t, 3, result.Usage.CacheCreationInputTokens)
|
||||
require.Contains(t, rec.Body.String(), `"cached_tokens":9`)
|
||||
}
|
||||
|
||||
func TestHandleResponsesStreamingResponse_PreservesMessageStartCacheUsage(t *testing.T) {
|
||||
t.Parallel()
|
||||
gin.SetMode(gin.TestMode)
|
||||
|
||||
rec := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(rec)
|
||||
|
||||
resp := &http.Response{
|
||||
Header: http.Header{"x-request-id": []string{"rid_stream"}},
|
||||
Body: io.NopCloser(strings.NewReader(strings.Join([]string{
|
||||
`event: message_start`,
|
||||
`data: {"type":"message_start","message":{"id":"msg_2","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4.5","stop_reason":"","usage":{"input_tokens":20,"cache_read_input_tokens":11,"cache_creation_input_tokens":4}}}`,
|
||||
``,
|
||||
`event: content_block_start`,
|
||||
`data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":"hello"}}`,
|
||||
``,
|
||||
`event: message_delta`,
|
||||
`data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":8}}`,
|
||||
``,
|
||||
`event: message_stop`,
|
||||
`data: {"type":"message_stop"}`,
|
||||
``,
|
||||
}, "\n"))),
|
||||
}
|
||||
|
||||
svc := &GatewayService{}
|
||||
result, err := svc.handleResponsesStreamingResponse(resp, c, "claude-sonnet-4.5", "claude-sonnet-4.5", nil, time.Now())
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, result)
|
||||
require.Equal(t, 20, result.Usage.InputTokens)
|
||||
require.Equal(t, 8, result.Usage.OutputTokens)
|
||||
require.Equal(t, 11, result.Usage.CacheReadInputTokens)
|
||||
require.Equal(t, 4, result.Usage.CacheCreationInputTokens)
|
||||
require.Contains(t, rec.Body.String(), `response.completed`)
|
||||
}
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"regexp"
|
||||
"sort"
|
||||
"strings"
|
||||
"unsafe"
|
||||
|
||||
@@ -34,6 +36,9 @@ var (
|
||||
patternEmptyTextSpaced = []byte(`"text": ""`)
|
||||
patternEmptyTextSp1 = []byte(`"text" : ""`)
|
||||
patternEmptyTextSp2 = []byte(`"text" :""`)
|
||||
|
||||
sessionUserAgentProductPattern = regexp.MustCompile(`([A-Za-z0-9._-]+)/[A-Za-z0-9._-]+`)
|
||||
sessionUserAgentVersionPattern = regexp.MustCompile(`\bv?\d+(?:\.\d+){1,3}\b`)
|
||||
)
|
||||
|
||||
// SessionContext 粘性会话上下文,用于区分不同来源的请求。
|
||||
@@ -75,6 +80,49 @@ type ParsedRequest struct {
|
||||
OnUpstreamAccepted func()
|
||||
}
|
||||
|
||||
// NormalizeSessionUserAgent reduces UA noise for sticky-session and digest hashing.
|
||||
// It preserves the set of product names from Product/Version tokens while
|
||||
// discarding version-only changes and incidental comments.
|
||||
func NormalizeSessionUserAgent(raw string) string {
|
||||
raw = strings.TrimSpace(raw)
|
||||
if raw == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
matches := sessionUserAgentProductPattern.FindAllStringSubmatch(raw, -1)
|
||||
if len(matches) == 0 {
|
||||
return normalizeSessionUserAgentFallback(raw)
|
||||
}
|
||||
|
||||
products := make([]string, 0, len(matches))
|
||||
seen := make(map[string]struct{}, len(matches))
|
||||
for _, match := range matches {
|
||||
if len(match) < 2 {
|
||||
continue
|
||||
}
|
||||
product := strings.ToLower(strings.TrimSpace(match[1]))
|
||||
if product == "" {
|
||||
continue
|
||||
}
|
||||
if _, exists := seen[product]; exists {
|
||||
continue
|
||||
}
|
||||
seen[product] = struct{}{}
|
||||
products = append(products, product)
|
||||
}
|
||||
if len(products) == 0 {
|
||||
return normalizeSessionUserAgentFallback(raw)
|
||||
}
|
||||
sort.Strings(products)
|
||||
return strings.Join(products, "+")
|
||||
}
|
||||
|
||||
func normalizeSessionUserAgentFallback(raw string) string {
|
||||
normalized := strings.ToLower(strings.Join(strings.Fields(raw), " "))
|
||||
normalized = sessionUserAgentVersionPattern.ReplaceAllString(normalized, "")
|
||||
return strings.Join(strings.Fields(normalized), " ")
|
||||
}
|
||||
|
||||
// ParseGatewayRequest 解析网关请求体并返回结构化结果。
|
||||
// protocol 指定请求协议格式(domain.PlatformAnthropic / domain.PlatformGemini),
|
||||
// 不同协议使用不同的 system/messages 字段名。
|
||||
|
||||
@@ -658,7 +658,7 @@ func (s *GatewayService) GenerateSessionHash(parsed *ParsedRequest) string {
|
||||
if parsed.SessionContext != nil {
|
||||
_, _ = combined.WriteString(parsed.SessionContext.ClientIP)
|
||||
_, _ = combined.WriteString(":")
|
||||
_, _ = combined.WriteString(parsed.SessionContext.UserAgent)
|
||||
_, _ = combined.WriteString(NormalizeSessionUserAgent(parsed.SessionContext.UserAgent))
|
||||
_, _ = combined.WriteString(":")
|
||||
_, _ = combined.WriteString(strconv.FormatInt(parsed.SessionContext.APIKeyID, 10))
|
||||
_, _ = combined.WriteString("|")
|
||||
@@ -4148,6 +4148,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
@@ -4174,6 +4175,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "signature_error",
|
||||
Message: extractUpstreamErrorMessage(respBody),
|
||||
Detail: func() string {
|
||||
@@ -4228,6 +4230,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: retryResp.StatusCode,
|
||||
UpstreamRequestID: retryResp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(retryReq.URL.String()),
|
||||
Kind: "signature_retry_thinking",
|
||||
Message: extractUpstreamErrorMessage(retryRespBody),
|
||||
Detail: func() string {
|
||||
@@ -4258,6 +4261,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
UpstreamURL: safeUpstreamURL(retryReq2.URL.String()),
|
||||
Kind: "signature_retry_tools_request_error",
|
||||
Message: sanitizeUpstreamErrorMessage(retryErr2.Error()),
|
||||
})
|
||||
@@ -4297,6 +4301,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "budget_constraint_error",
|
||||
Message: errMsg,
|
||||
Detail: func() string {
|
||||
@@ -4358,6 +4363,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "retry",
|
||||
Message: extractUpstreamErrorMessage(respBody),
|
||||
Detail: func() string {
|
||||
@@ -4628,6 +4634,7 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Passthrough: true,
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
@@ -4667,6 +4674,7 @@ func (s *GatewayService) forwardAnthropicAPIKeyPassthroughWithInput(
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Passthrough: true,
|
||||
Kind: "retry",
|
||||
Message: extractUpstreamErrorMessage(respBody),
|
||||
@@ -5344,6 +5352,7 @@ func (s *GatewayService) executeBedrockUpstream(
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "request_error",
|
||||
Message: safeErr,
|
||||
})
|
||||
@@ -5380,6 +5389,7 @@ func (s *GatewayService) executeBedrockUpstream(
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Kind: "retry",
|
||||
Message: extractUpstreamErrorMessage(respBody),
|
||||
Detail: func() string {
|
||||
@@ -8064,6 +8074,7 @@ func (s *GatewayService) forwardCountTokensAnthropicAPIKeyPassthrough(ctx contex
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: 0,
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Passthrough: true,
|
||||
Kind: "request_error",
|
||||
Message: sanitizeUpstreamErrorMessage(err.Error()),
|
||||
@@ -8119,6 +8130,7 @@ func (s *GatewayService) forwardCountTokensAnthropicAPIKeyPassthrough(ctx contex
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: resp.Header.Get("x-request-id"),
|
||||
UpstreamURL: safeUpstreamURL(upstreamReq.URL.String()),
|
||||
Passthrough: true,
|
||||
Kind: "http_error",
|
||||
Message: upstreamMsg,
|
||||
|
||||
@@ -52,10 +52,11 @@ func BuildGeminiDigestChain(req *antigravity.GeminiRequest) string {
|
||||
// 返回 16 字符的 Base64 编码的 SHA256 前缀
|
||||
func GenerateGeminiPrefixHash(userID, apiKeyID int64, ip, userAgent, platform, model string) string {
|
||||
// 组合所有标识符
|
||||
normalizedUserAgent := NormalizeSessionUserAgent(userAgent)
|
||||
combined := strconv.FormatInt(userID, 10) + ":" +
|
||||
strconv.FormatInt(apiKeyID, 10) + ":" +
|
||||
ip + ":" +
|
||||
userAgent + ":" +
|
||||
normalizedUserAgent + ":" +
|
||||
platform + ":" +
|
||||
model
|
||||
|
||||
|
||||
@@ -152,6 +152,24 @@ func TestGenerateGeminiPrefixHash(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenerateGeminiPrefixHash_IgnoresUserAgentVersionNoise(t *testing.T) {
|
||||
hash1 := GenerateGeminiPrefixHash(1, 100, "192.168.1.1", "Mozilla/5.0 codex_cli_rs/0.1.0", "antigravity", "gemini-2.5-pro")
|
||||
hash2 := GenerateGeminiPrefixHash(1, 100, "192.168.1.1", "Mozilla/5.0 codex_cli_rs/0.1.1", "antigravity", "gemini-2.5-pro")
|
||||
|
||||
if hash1 != hash2 {
|
||||
t.Fatalf("version-only User-Agent changes should not perturb Gemini prefix hash: %s vs %s", hash1, hash2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenerateGeminiPrefixHash_IgnoresFreeformUserAgentVersionNoise(t *testing.T) {
|
||||
hash1 := GenerateGeminiPrefixHash(1, 100, "192.168.1.1", "Codex CLI 0.1.0", "antigravity", "gemini-2.5-pro")
|
||||
hash2 := GenerateGeminiPrefixHash(1, 100, "192.168.1.1", "Codex CLI 0.1.1", "antigravity", "gemini-2.5-pro")
|
||||
|
||||
if hash1 != hash2 {
|
||||
t.Fatalf("free-form version-only User-Agent changes should not perturb Gemini prefix hash: %s vs %s", hash1, hash2)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseGeminiSessionValue(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
@@ -504,6 +504,48 @@ func TestGenerateSessionHash_SessionContext_UADifference(t *testing.T) {
|
||||
require.NotEqual(t, h1, h2, "different User-Agent should produce different hash")
|
||||
}
|
||||
|
||||
func TestGenerateSessionHash_SessionContext_UAVersionNoiseIgnored(t *testing.T) {
|
||||
svc := &GatewayService{}
|
||||
|
||||
base := func(ua string) *ParsedRequest {
|
||||
return &ParsedRequest{
|
||||
Messages: []any{
|
||||
map[string]any{"role": "user", "content": "test"},
|
||||
},
|
||||
SessionContext: &SessionContext{
|
||||
ClientIP: "1.1.1.1",
|
||||
UserAgent: ua,
|
||||
APIKeyID: 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
h1 := svc.GenerateSessionHash(base("Mozilla/5.0 codex_cli_rs/0.1.0"))
|
||||
h2 := svc.GenerateSessionHash(base("Mozilla/5.0 codex_cli_rs/0.1.1"))
|
||||
require.Equal(t, h1, h2, "version-only User-Agent changes should not perturb the sticky session hash")
|
||||
}
|
||||
|
||||
func TestGenerateSessionHash_SessionContext_FreeformUAVersionNoiseIgnored(t *testing.T) {
|
||||
svc := &GatewayService{}
|
||||
|
||||
base := func(ua string) *ParsedRequest {
|
||||
return &ParsedRequest{
|
||||
Messages: []any{
|
||||
map[string]any{"role": "user", "content": "test"},
|
||||
},
|
||||
SessionContext: &SessionContext{
|
||||
ClientIP: "1.1.1.1",
|
||||
UserAgent: ua,
|
||||
APIKeyID: 1,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
h1 := svc.GenerateSessionHash(base("Codex CLI 0.1.0"))
|
||||
h2 := svc.GenerateSessionHash(base("Codex CLI 0.1.1"))
|
||||
require.Equal(t, h1, h2, "free-form version-only User-Agent changes should not perturb the sticky session hash")
|
||||
}
|
||||
|
||||
func TestGenerateSessionHash_SessionContext_APIKeyIDDifference(t *testing.T) {
|
||||
svc := &GatewayService{}
|
||||
|
||||
|
||||
@@ -62,6 +62,12 @@ type OpsErrorLog struct {
|
||||
ClientIP *string `json:"client_ip"`
|
||||
RequestPath string `json:"request_path"`
|
||||
Stream bool `json:"stream"`
|
||||
|
||||
InboundEndpoint string `json:"inbound_endpoint"`
|
||||
UpstreamEndpoint string `json:"upstream_endpoint"`
|
||||
RequestedModel string `json:"requested_model"`
|
||||
UpstreamModel string `json:"upstream_model"`
|
||||
RequestType *int16 `json:"request_type"`
|
||||
}
|
||||
|
||||
type OpsErrorLogDetail struct {
|
||||
|
||||
@@ -79,6 +79,17 @@ type OpsInsertErrorLogInput struct {
|
||||
Model string
|
||||
RequestPath string
|
||||
Stream bool
|
||||
// InboundEndpoint is the normalized client-facing API endpoint path, e.g. /v1/chat/completions.
|
||||
InboundEndpoint string
|
||||
// UpstreamEndpoint is the normalized upstream endpoint path, e.g. /v1/responses.
|
||||
UpstreamEndpoint string
|
||||
// RequestedModel is the client-requested model name before mapping.
|
||||
RequestedModel string
|
||||
// UpstreamModel is the actual model sent to upstream after mapping. Empty means no mapping.
|
||||
UpstreamModel string
|
||||
// RequestType is the granular request type: 0=unknown, 1=sync, 2=stream, 3=ws_v2.
|
||||
// Matches service.RequestType enum semantics from usage_log.go.
|
||||
RequestType *int16
|
||||
UserAgent string
|
||||
|
||||
ErrorPhase string
|
||||
|
||||
@@ -93,6 +93,10 @@ type OpsUpstreamErrorEvent struct {
|
||||
UpstreamStatusCode int `json:"upstream_status_code,omitempty"`
|
||||
UpstreamRequestID string `json:"upstream_request_id,omitempty"`
|
||||
|
||||
// UpstreamURL is the actual upstream URL that was called (host + path, query/fragment stripped).
|
||||
// Helps debug 404/routing errors by showing which endpoint was targeted.
|
||||
UpstreamURL string `json:"upstream_url,omitempty"`
|
||||
|
||||
// Best-effort upstream request capture (sanitized+trimmed).
|
||||
// Required for retrying a specific upstream attempt.
|
||||
UpstreamRequestBody string `json:"upstream_request_body,omitempty"`
|
||||
@@ -119,6 +123,7 @@ func appendOpsUpstreamError(c *gin.Context, ev OpsUpstreamErrorEvent) {
|
||||
ev.UpstreamRequestBody = strings.TrimSpace(ev.UpstreamRequestBody)
|
||||
ev.UpstreamResponseBody = strings.TrimSpace(ev.UpstreamResponseBody)
|
||||
ev.Kind = strings.TrimSpace(ev.Kind)
|
||||
ev.UpstreamURL = strings.TrimSpace(ev.UpstreamURL)
|
||||
ev.Message = strings.TrimSpace(ev.Message)
|
||||
ev.Detail = strings.TrimSpace(ev.Detail)
|
||||
if ev.Message != "" {
|
||||
@@ -205,3 +210,19 @@ func ParseOpsUpstreamErrors(raw string) ([]*OpsUpstreamErrorEvent, error) {
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// safeUpstreamURL returns scheme + host + path from a URL, stripping query/fragment
|
||||
// to avoid leaking sensitive query parameters (e.g. OAuth tokens).
|
||||
func safeUpstreamURL(rawURL string) string {
|
||||
rawURL = strings.TrimSpace(rawURL)
|
||||
if rawURL == "" {
|
||||
return ""
|
||||
}
|
||||
if idx := strings.IndexByte(rawURL, '?'); idx >= 0 {
|
||||
rawURL = rawURL[:idx]
|
||||
}
|
||||
if idx := strings.IndexByte(rawURL, '#'); idx >= 0 {
|
||||
rawURL = rawURL[:idx]
|
||||
}
|
||||
return rawURL
|
||||
}
|
||||
|
||||
@@ -8,6 +8,27 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSafeUpstreamURL(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input string
|
||||
want string
|
||||
}{
|
||||
{"strips query", "https://api.anthropic.com/v1/messages?beta=true", "https://api.anthropic.com/v1/messages"},
|
||||
{"strips fragment", "https://api.openai.com/v1/responses#frag", "https://api.openai.com/v1/responses"},
|
||||
{"strips both", "https://host/path?token=secret#x", "https://host/path"},
|
||||
{"no query or fragment", "https://host/path", "https://host/path"},
|
||||
{"empty string", "", ""},
|
||||
{"whitespace only", " ", ""},
|
||||
{"query before fragment", "https://h/p?a=1#f", "https://h/p"},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require.Equal(t, tt.want, safeUpstreamURL(tt.input))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestAppendOpsUpstreamError_UsesRequestBodyBytesFromContext(t *testing.T) {
|
||||
gin.SetMode(gin.TestMode)
|
||||
rec := httptest.NewRecorder()
|
||||
|
||||
Reference in New Issue
Block a user