package handler
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/mux"
"github.com/seifghazi/claude-code-monitor/internal/model"
"github.com/seifghazi/claude-code-monitor/internal/provider"
"github.com/seifghazi/claude-code-monitor/internal/runtime"
"github.com/seifghazi/claude-code-monitor/internal/service"
"github.com/seifghazi/claude-code-monitor/internal/sse"
)
const (
defaultPage = 1
defaultPageLimit = 10
maxPageLimit = 100000
defaultBucketMinutes = 60
convHashCharLimit = 500
convHashThreshold = 0.3
maxUserAgentLen = 20
maxStreamChunks = 2000 // max SSE chunks to hold in memory for logging
)
type Handler struct {
storageService service.StorageService
conversationService service.ConversationService
modelRouter *service.ModelRouter
anthropicProvider provider.Provider
logger *log.Logger
cachedSettings *model.ProxySettings
cachedSettingsMu sync.RWMutex
// demoteNonstreaming forces stream=true upstream when the client requested
// stream=false. The proxy then accumulates the SSE stream into a single
// non-streaming JSON response. Eliminates ResponseHeaderTimeout failures
// for long-running requests (1M context + extended thinking on opus).
demoteNonstreaming bool
}
func New(storageService service.StorageService, logger *log.Logger, modelRouter *service.ModelRouter, anthropicProvider provider.Provider, demoteNonstreaming bool) *Handler {
conversationService := service.NewConversationService()
return &Handler{
storageService: storageService,
conversationService: conversationService,
modelRouter: modelRouter,
anthropicProvider: anthropicProvider,
logger: logger,
demoteNonstreaming: demoteNonstreaming,
}
}
func (h *Handler) ChatCompletions(w http.ResponseWriter, r *http.Request) {
// This endpoint is for compatibility but we're an Anthropic proxy
// Return a helpful error message
writeErrorResponse(w, "This is an Anthropic proxy. Please use the /v1/messages endpoint instead of /v1/chat/completions", http.StatusBadRequest)
}
func (h *Handler) Messages(w http.ResponseWriter, r *http.Request) {
// Get body bytes from context (set by middleware)
bodyBytes := getBodyBytes(r)
if bodyBytes == nil {
writeErrorResponse(w, "Error reading request body", http.StatusBadRequest)
return
}
// Parse the request
var req model.AnthropicRequest
if err := json.Unmarshal(bodyBytes, &req); err != nil {
log.Printf("❌ Error parsing JSON: %v", err)
writeErrorResponse(w, "Invalid JSON", http.StatusBadRequest)
return
}
requestID := generateRequestID()
startTime := time.Now()
// Use model router to determine provider and route the request
decision, err := h.modelRouter.DetermineRoute(&req)
if err != nil {
log.Printf("❌ Error routing request: %v", err)
writeErrorResponse(w, "Failed to route request", http.StatusInternalServerError)
return
}
// Create request log with routing information
requestLog := &model.RequestLog{
RequestID: requestID,
Timestamp: time.Now().Format(time.RFC3339),
Method: r.Method,
Endpoint: r.URL.Path,
Headers: SanitizeHeaders(r.Header),
Body: req,
Model: decision.OriginalModel,
OriginalModel: decision.OriginalModel,
RoutedModel: decision.TargetModel,
UserAgent: r.Header.Get("User-Agent"),
ContentType: r.Header.Get("Content-Type"),
ConversationHash: computeConversationHash(&req),
MessageCount: len(req.Messages),
}
if _, err := h.storageService.SaveRequest(requestLog); err != nil {
log.Printf("❌ Error saving request: %v", err)
}
// Decide whether to demote a non-streaming client request to a streaming
// upstream call. Anthropic's non-streaming responses don't write headers
// until the full body is computed, which causes ResponseHeaderTimeout
// failures on opus + 1M context + extended thinking. With demotion the
// proxy asks Anthropic for SSE, gets headers immediately, and synthesizes
// a single JSON response for the client.
clientWantsStream := req.Stream
demote := h.demoteNonstreaming &&
!clientWantsStream &&
decision.Provider.Name() == "anthropic"
// Rewrite the upstream body if either the routed model differs from the
// original or we're demoting to streaming. Use a raw map (with json.Number
// to preserve integer precision in unknown nested fields like tool inputs
// from previous turns) so we don't drop fields the AnthropicRequest struct
// doesn't model (thinking, top_p, beta-only fields). The previous
// re-marshal-from-struct path silently dropped any field not declared on
// AnthropicRequest.
if demote || decision.TargetModel != decision.OriginalModel {
var raw map[string]interface{}
dec := json.NewDecoder(bytes.NewReader(bodyBytes))
dec.UseNumber()
if err := dec.Decode(&raw); err != nil {
log.Printf("❌ Error parsing body for rewrite: %v", err)
writeErrorResponse(w, "Invalid JSON", http.StatusBadRequest)
return
}
if decision.TargetModel != decision.OriginalModel {
raw["model"] = decision.TargetModel
req.Model = decision.TargetModel
}
if demote {
raw["stream"] = true
}
updatedBodyBytes, err := json.Marshal(raw)
if err != nil {
log.Printf("❌ Error marshaling updated request: %v", err)
writeErrorResponse(w, "Failed to process request", http.StatusInternalServerError)
return
}
r.Body = io.NopCloser(bytes.NewReader(updatedBodyBytes))
r.ContentLength = int64(len(updatedBodyBytes))
r.Header.Set("Content-Length", fmt.Sprintf("%d", len(updatedBodyBytes)))
}
// Create a context with an extended timeout for the forwarded request.
// We use a background context with a long timeout (30 minutes) instead of the
// request context to prevent "context canceled" errors for long-running API calls.
// This is necessary because:
// Use context.Background() instead of r.Context() so that:
// 1. Client disconnects don't cancel in-flight API calls (we still want to record usage)
// 2. The HTTP server's WriteTimeout doesn't cap the forwarding timeout
// 3. Claude's "thinking" feature can cause responses to take 5+ minutes
forwardCtx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
// Apply request header rules before forwarding
h.applyRequestHeaderRules(r)
// Forward the request to the selected provider
resp, err := decision.Provider.ForwardRequest(forwardCtx, r)
if err != nil {
logForwardFailure(r, &req, bodyBytes, decision, requestID, startTime, forwardCtx, err)
writeErrorResponse(w, "Failed to forward request", http.StatusInternalServerError)
return
}
defer resp.Body.Close()
if clientWantsStream {
h.handleStreamingResponse(w, resp, requestLog, startTime)
return
}
if demote {
h.handleDemotedStreamingResponse(w, resp, requestLog, startTime)
return
}
h.handleNonStreamingResponse(w, resp, requestLog, startTime)
}
func (h *Handler) Models(w http.ResponseWriter, r *http.Request) {
// This proxy uses pattern-based routing and supports any model dynamically.
// Returning an empty list since the actual supported models depend on the
// upstream providers (Anthropic, OpenAI) and their current offerings.
response := &model.ModelsResponse{
Object: "list",
Data: []model.ModelInfo{},
}
writeJSONResponse(w, response)
}
// ProxyPassthrough forwards any unhandled /v1/* request to Anthropic and logs it.
// This covers endpoints like config, quota, batches, etc.
func (h *Handler) ProxyPassthrough(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
requestID := generateRequestID()
// Read body if present
var bodyBytes []byte
if r.Body != nil && (r.Method == "POST" || r.Method == "PUT" || r.Method == "PATCH") {
var err error
bodyBytes, err = io.ReadAll(r.Body)
if err != nil {
log.Printf("Error reading passthrough request body: %v", err)
writeErrorResponse(w, "Error reading request body", http.StatusBadRequest)
return
}
r.Body.Close()
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
}
// Build request log
var bodyForLog interface{}
if len(bodyBytes) > 0 {
var parsed interface{}
if err := json.Unmarshal(bodyBytes, &parsed); err == nil {
bodyForLog = parsed
} else {
bodyForLog = string(bodyBytes)
}
}
requestLog := &model.RequestLog{
RequestID: requestID,
Timestamp: time.Now().Format(time.RFC3339),
Method: r.Method,
Endpoint: r.URL.Path,
Headers: SanitizeHeaders(r.Header),
Body: bodyForLog,
UserAgent: r.Header.Get("User-Agent"),
ContentType: r.Header.Get("Content-Type"),
}
if _, err := h.storageService.SaveRequest(requestLog); err != nil {
log.Printf("Error saving passthrough request: %v", err)
}
// Apply request header rules
h.applyRequestHeaderRules(r)
// Forward to Anthropic
forwardCtx, cancel := context.WithTimeout(r.Context(), 2*time.Minute)
defer cancel()
resp, err := h.anthropicProvider.ForwardRequest(forwardCtx, r)
if err != nil {
log.Printf("Error forwarding passthrough request to %s: %v", r.URL.Path, err)
writeErrorResponse(w, "Failed to forward request", http.StatusInternalServerError)
return
}
defer resp.Body.Close()
// Read the response
responseBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading passthrough response: %v", err)
writeErrorResponse(w, "Failed to read response", http.StatusInternalServerError)
return
}
responseLog := &model.ResponseLog{
StatusCode: resp.StatusCode,
Headers: SanitizeResponseHeaders(resp.Header),
ResponseTime: time.Since(startTime).Milliseconds(),
IsStreaming: false,
CompletedAt: time.Now().Format(time.RFC3339),
RateLimit: ExtractRateLimitInfo(resp.Header),
}
// Store response body as JSON if possible, otherwise as text
var parsed interface{}
if err := json.Unmarshal(responseBytes, &parsed); err == nil {
responseLog.Body = json.RawMessage(responseBytes)
} else {
responseLog.BodyText = string(responseBytes)
}
requestLog.Response = responseLog
extractOrganizationID(requestLog, resp.Header)
if err := h.storageService.UpdateRequestWithResponse(requestLog); err != nil {
log.Printf("Error updating passthrough request with response: %v", err)
}
// Forward all response headers and body to the client
CopyAllResponseHeaders(w, resp)
h.applyResponseHeaderRules(w)
w.Header().Set("Content-Type", resp.Header.Get("Content-Type"))
w.WriteHeader(resp.StatusCode)
w.Write(responseBytes)
log.Printf("Passthrough %s %s -> %d (%dms)", r.Method, r.URL.Path, resp.StatusCode, time.Since(startTime).Milliseconds())
}
func (h *Handler) Health(w http.ResponseWriter, r *http.Request) {
// When the process is draining (SIGTERM received) we return 503 so any
// LB doing health-based routing — Traefik with a healthcheck on this
// service, or `deploy.sh wait_healthy` — stops sending new requests
// before the shutdown loop waits for in-flight to reach zero.
if runtime.IsDraining() {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusServiceUnavailable)
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"status": "draining",
"timestamp": time.Now().Format(time.RFC3339),
"in_flight": runtime.InFlight(),
})
return
}
response := &model.HealthResponse{
Status: "healthy",
Timestamp: time.Now(),
}
writeJSONResponse(w, response)
}
// Livez exposes operational state for deploy/drain orchestration. Always
// returns 200 with the current in-flight count and draining flag — distinct
// from Health, which is a binary up/ready signal for load balancers.
func (h *Handler) Livez(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-store")
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"status": "ok",
"timestamp": time.Now().Format(time.RFC3339),
"in_flight": runtime.InFlight(),
"draining": runtime.IsDraining(),
})
}
func (h *Handler) UI(w http.ResponseWriter, r *http.Request) {
htmlContent, err := os.ReadFile("index.html")
if err != nil {
// Error reading index.html
writeErrorResponse(w, "UI not available", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "text/html")
w.Write(htmlContent)
}
func (h *Handler) GetRequests(w http.ResponseWriter, r *http.Request) {
page, _ := strconv.Atoi(r.URL.Query().Get("page"))
if page < 1 {
page = defaultPage
}
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
if limit <= 0 {
limit = defaultPageLimit
}
// Get model filter from query parameters
modelFilter := r.URL.Query().Get("model")
if modelFilter == "" {
modelFilter = "all"
}
requests, total, err := h.storageService.GetRequests(page, limit, modelFilter)
if err != nil {
log.Printf("Error getting requests: %v", err)
writeErrorResponse(w, "Failed to get requests", http.StatusInternalServerError)
return
}
writeJSONResponse(w, struct {
Requests []model.RequestLog `json:"requests"`
Total int `json:"total"`
}{
Requests: requests,
Total: total,
})
}
func (h *Handler) DeleteRequests(w http.ResponseWriter, r *http.Request) {
clearedCount, err := h.storageService.ClearRequests()
if err != nil {
log.Printf("Error clearing requests: %v", err)
writeErrorResponse(w, "Error clearing request history", http.StatusInternalServerError)
return
}
response := map[string]interface{}{
"message": "Request history cleared",
"deleted": clearedCount,
}
writeJSONResponse(w, response)
}
func (h *Handler) NotFound(w http.ResponseWriter, r *http.Request) {
writeErrorResponse(w, "Not found", http.StatusNotFound)
}
// streamState holds the accumulated state while processing a streaming response.
type streamState struct {
fullResponseText strings.Builder
toolCalls []model.ContentBlock
streamingChunks []string
chunkTimings []model.ChunkTiming
finalUsage *model.AnthropicUsage
messageID string
modelName string
stopReason string
sawMessageStop bool
chunkIndex int
droppedChunks int // chunks not stored because we hit maxStreamChunks
}
// extractMessageMetadata extracts message ID, model, and stop_reason from a message_start event.
func extractMessageMetadata(event map[string]interface{}) (id, modelName, stopReason string) {
if message, ok := event["message"].(map[string]interface{}); ok {
if v, ok := message["id"].(string); ok {
id = v
}
if v, ok := message["model"].(string); ok {
modelName = v
}
if v, ok := message["stop_reason"].(string); ok {
stopReason = v
}
}
return
}
// extractUsageFromEvent extracts Anthropic usage data from a message_delta event.
// Returns nil if no usage data is present.
func extractUsageFromEvent(event map[string]interface{}) *model.AnthropicUsage {
usage, ok := event["usage"].(map[string]interface{})
if !ok {
return nil
}
u := &model.AnthropicUsage{}
if v, ok := usage["input_tokens"].(float64); ok {
u.InputTokens = int(v)
}
if v, ok := usage["output_tokens"].(float64); ok {
u.OutputTokens = int(v)
}
if v, ok := usage["cache_creation_input_tokens"].(float64); ok {
u.CacheCreationInputTokens = int(v)
}
if v, ok := usage["cache_read_input_tokens"].(float64); ok {
u.CacheReadInputTokens = int(v)
}
return u
}
// buildStreamResponseBody assembles the Anthropic-format response body from accumulated stream state.
func buildStreamResponseBody(state *streamState) json.RawMessage {
var contentBlocks []model.AnthropicContentBlock
if state.fullResponseText.Len() > 0 {
contentBlocks = append(contentBlocks, model.AnthropicContentBlock{
Type: "text",
Text: state.fullResponseText.String(),
})
}
responseBody := map[string]interface{}{
"content": contentBlocks,
"id": state.messageID,
"model": state.modelName,
"role": "assistant",
"stop_reason": state.stopReason,
"type": "message",
}
if state.finalUsage != nil {
responseBody["usage"] = state.finalUsage
}
responseBodyBytes, err := json.Marshal(responseBody)
if err != nil {
log.Printf("❌ Error marshaling streaming response body: %v", err)
responseBodyBytes = []byte("{}")
}
return json.RawMessage(responseBodyBytes)
}
func (h *Handler) handleStreamingResponse(w http.ResponseWriter, resp *http.Response, requestLog *model.RequestLog, startTime time.Time) {
// Forward important upstream headers (rate limits, request IDs, etc.)
CopyAllResponseHeaders(w, resp)
h.applyResponseHeaderRules(w)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
if resp.StatusCode != http.StatusOK {
log.Printf("❌ Anthropic API error: %d", resp.StatusCode)
errorBytes, _ := io.ReadAll(resp.Body)
log.Printf("Error details: %s", string(errorBytes))
responseLog := &model.ResponseLog{
StatusCode: resp.StatusCode,
Headers: SanitizeResponseHeaders(resp.Header),
BodyText: string(errorBytes),
ResponseTime: time.Since(startTime).Milliseconds(),
IsStreaming: true,
CompletedAt: time.Now().Format(time.RFC3339),
RateLimit: ExtractRateLimitInfo(resp.Header),
}
requestLog.Response = responseLog
extractOrganizationID(requestLog, resp.Header)
if err := h.storageService.UpdateRequestWithResponse(requestLog); err != nil {
log.Printf("❌ Error updating request with error response: %v", err)
}
w.WriteHeader(resp.StatusCode)
w.Write(errorBytes)
return
}
state := &streamState{}
streamErr := sse.ForEachLine(resp.Body, func(line string) error {
// Forward every SSE line verbatim — preserves event:, id:, retry:,
// `:` comment keepalives, and the blank-line event terminator. The
// previous code dropped everything except `data:` lines, which
// produced malformed SSE for clients that read the event field
// (browser EventSource, strict SSE parsers).
if _, err := fmt.Fprintf(w, "%s\n", line); err != nil {
return err
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
// Only `data:` lines carry the JSON payload we accumulate for storage
// and response synthesis. Everything else (event:, id:, comments,
// blank line) was just forwarded above and needs no further work.
if !strings.HasPrefix(line, "data:") {
return nil
}
now := time.Now()
if state.chunkIndex < maxStreamChunks {
state.streamingChunks = append(state.streamingChunks, line)
state.chunkTimings = append(state.chunkTimings, model.ChunkTiming{
Index: state.chunkIndex,
Timestamp: now.Format(time.RFC3339Nano),
ByteSize: len(line),
ElapsedMs: now.Sub(startTime).Milliseconds(),
})
} else {
state.droppedChunks++
}
state.chunkIndex++
jsonData := strings.TrimPrefix(line, "data: ")
// Parse as generic JSON first to capture usage and metadata
var genericEvent map[string]interface{}
if err := json.Unmarshal([]byte(jsonData), &genericEvent); err != nil {
log.Printf("⚠️ Error unmarshalling streaming event: %v", err)
return nil
}
eventType, _ := genericEvent["type"].(string)
if eventType == "message_start" {
id, modelName, stopReason := extractMessageMetadata(genericEvent)
if id != "" {
state.messageID = id
}
if modelName != "" {
state.modelName = modelName
}
if stopReason != "" {
state.stopReason = stopReason
}
}
if eventType == "message_delta" {
if usage := extractUsageFromEvent(genericEvent); usage != nil {
state.finalUsage = usage
}
}
// Parse as structured event for content processing
var event model.StreamingEvent
if err := json.Unmarshal([]byte(jsonData), &event); err != nil {
return nil
}
switch event.Type {
case "content_block_delta":
if event.Delta != nil {
if event.Delta.Type == "text_delta" {
state.fullResponseText.WriteString(event.Delta.Text)
} else if event.Delta.Type == "input_json_delta" {
if event.Index != nil && *event.Index < len(state.toolCalls) {
state.toolCalls[*event.Index].Input = append(state.toolCalls[*event.Index].Input, event.Delta.Input...)
}
}
}
case "content_block_start":
if event.ContentBlock != nil && event.ContentBlock.Type == "tool_use" {
state.toolCalls = append(state.toolCalls, *event.ContentBlock)
}
case "message_stop":
state.sawMessageStop = true
}
return nil
})
if streamErr == nil && !state.sawMessageStop {
streamErr = io.ErrUnexpectedEOF
}
responseLog := &model.ResponseLog{
StatusCode: resp.StatusCode,
Headers: SanitizeResponseHeaders(resp.Header),
StreamingChunks: state.streamingChunks,
ChunkTimings: state.chunkTimings,
ResponseTime: time.Since(startTime).Milliseconds(),
IsStreaming: true,
CompletedAt: time.Now().Format(time.RFC3339),
RateLimit: ExtractRateLimitInfo(resp.Header),
}
if streamErr != nil {
responseLog.StreamError = streamErr.Error()
}
responseLog.Body = buildStreamResponseBody(state)
requestLog.Response = responseLog
extractOrganizationID(requestLog, resp.Header)
if err := h.storageService.UpdateRequestWithResponse(requestLog); err != nil {
log.Printf("❌ Error updating request with streaming response: %v", err)
}
if streamErr != nil {
log.Printf("❌ Streaming error: %v", streamErr)
// Send error event to client in Anthropic streaming format
errorEvent := map[string]interface{}{
"type": "error",
"error": map[string]interface{}{
"type": "stream_error",
"message": fmt.Sprintf("Stream interrupted: %v", streamErr),
},
}
if errorJSON, jsonErr := json.Marshal(errorEvent); jsonErr == nil {
fmt.Fprintf(w, "data: %s\n\n", errorJSON)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
} else {
log.Println("✅ Streaming response completed")
}
}
func (h *Handler) handleNonStreamingResponse(w http.ResponseWriter, resp *http.Response, requestLog *model.RequestLog, startTime time.Time) {
// Forward important upstream headers (rate limits, request IDs, etc.)
CopyAllResponseHeaders(w, resp)
h.applyResponseHeaderRules(w)
responseBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("❌ Error reading Anthropic response: %v", err)
writeErrorResponse(w, "Failed to read response", http.StatusInternalServerError)
return
}
responseLog := &model.ResponseLog{
StatusCode: resp.StatusCode,
Headers: SanitizeResponseHeaders(resp.Header),
ResponseTime: time.Since(startTime).Milliseconds(),
IsStreaming: false,
CompletedAt: time.Now().Format(time.RFC3339),
RateLimit: ExtractRateLimitInfo(resp.Header),
}
// Parse the response as AnthropicResponse for consistent structure
if resp.StatusCode == http.StatusOK {
var anthropicResp model.AnthropicResponse
if err := json.Unmarshal(responseBytes, &anthropicResp); err == nil {
// Successfully parsed - store the structured response
responseLog.Body = json.RawMessage(responseBytes)
} else {
// If parsing fails, store as text but log the error
log.Printf("⚠️ Failed to parse Anthropic response: %v", err)
log.Printf("📄 Response body (first %d chars): %s", convHashCharLimit, string(responseBytes[:min(convHashCharLimit, len(responseBytes))]))
responseLog.BodyText = string(responseBytes)
}
} else {
// For error responses, store as text
responseLog.BodyText = string(responseBytes)
}
requestLog.Response = responseLog
extractOrganizationID(requestLog, resp.Header)
if err := h.storageService.UpdateRequestWithResponse(requestLog); err != nil {
log.Printf("❌ Error updating request with response: %v", err)
}
if resp.StatusCode != http.StatusOK {
log.Printf("❌ Anthropic API error: %d %s", resp.StatusCode, string(responseBytes))
// Headers already forwarded at start of function
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
w.Write(responseBytes)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(responseBytes)
}
// applyRequestHeaderRules applies configured request header rules to the given request.
func (h *Handler) applyRequestHeaderRules(r *http.Request) {
if settings := h.GetCachedSettings(); len(settings.RequestHeaderRules) > 0 {
ApplyHeaderRules(r.Header, settings.RequestHeaderRules)
}
}
// applyResponseHeaderRules applies configured response header rules to the response writer.
func (h *Handler) applyResponseHeaderRules(w http.ResponseWriter) {
if settings := h.GetCachedSettings(); len(settings.ResponseHeaderRules) > 0 {
ApplyHeaderRules(w.Header(), settings.ResponseHeaderRules)
}
}
// Helper function to get minimum of two integers
func min(a, b int) int {
if a < b {
return a
}
return b
}
// extractOrganizationID extracts the Anthropic-Organization-Id from response headers and sets it on the request log.
func extractOrganizationID(requestLog *model.RequestLog, respHeaders http.Header) {
if orgID := respHeaders.Get("Anthropic-Organization-Id"); orgID != "" {
requestLog.OrganizationID = orgID
}
}
func generateRequestID() string {
bytes := make([]byte, 8)
if _, err := rand.Read(bytes); err != nil {
// fallback to timestamp-based ID
return fmt.Sprintf("%x", time.Now().UnixNano())
}
return hex.EncodeToString(bytes)
}
// computeConversationHash generates a hash to identify which conversation a request belongs to.
// It collects plain text from the first user message that has real content (after stripping
// injected XML blocks), falling back to the system prompt hash if no user text is found.
func computeConversationHash(req *model.AnthropicRequest) string {
if req == nil || len(req.Messages) == 0 {
return ""
}
// Try each user message for real text content
for _, msg := range req.Messages {
if msg.Role != "user" {
continue
}
// Collect all text blocks from this message
var allText strings.Builder
blocks := msg.GetContentBlocks()
for _, block := range blocks {
if block.Type == "text" && block.Text != "" {
if allText.Len() > 0 {
allText.WriteString("\n")
}
allText.WriteString(block.Text)
}
}
if allText.Len() == 0 {
continue
}
// Strip all XML-like tags and their content (system-reminder, command-*, etc.)
text := stripXmlBlocks(allText.String())
text = strings.TrimSpace(text)
if text == "" {
continue
}
// Take first N chars to avoid hashing huge messages
if len(text) > convHashCharLimit {
text = text[:convHashCharLimit]
}
hash := sha256.Sum256([]byte(text))
return hex.EncodeToString(hash[:8])
}
// Fallback: hash the system prompt if present (all turns share the same system prompt)
if len(req.System) > 0 {
var sysText strings.Builder
for _, s := range req.System {
if s.Text != "" {
sysText.WriteString(s.Text)
}
}
if sysText.Len() > 0 {
text := sysText.String()
if len(text) > convHashCharLimit {
text = text[:convHashCharLimit]
}
hash := sha256.Sum256([]byte("sys:" + text))
return hex.EncodeToString(hash[:8])
}
}
return ""
}
// stripXmlBlocks removes XML-like tag blocks from text, leaving only plain text.
func stripXmlBlocks(text string) string {
// Iteratively remove ... blocks
for {
start := strings.Index(text, "<")
if start == -1 {
break
}
// Find tag name
end := strings.IndexByte(text[start+1:], '>')
if end == -1 {
break
}
tagEnd := start + 1 + end
tagContent := text[start+1 : tagEnd]
// Skip self-closing or malformed
if strings.HasPrefix(tagContent, "/") || strings.HasSuffix(tagContent, "/") {
text = text[:start] + text[tagEnd+1:]
continue
}
tagName := strings.Fields(tagContent)[0]
// Look for closing tag
closeTag := "" + tagName + ">"
closeIdx := strings.Index(text[tagEnd+1:], closeTag)
if closeIdx == -1 {
// No closing tag — not a block, skip past this <
text = text[:start] + text[start+1:]
continue
}
// Remove the entire block
blockEnd := tagEnd + 1 + closeIdx + len(closeTag)
text = text[:start] + text[blockEnd:]
}
return text
}
func getBodyBytes(r *http.Request) []byte {
if bodyBytes, ok := r.Context().Value(model.BodyBytesKey).([]byte); ok {
return bodyBytes
}
return nil
}
func writeJSONResponse(w http.ResponseWriter, data interface{}) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(data); err != nil {
log.Printf("❌ Error encoding JSON response: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
}
func writeErrorResponse(w http.ResponseWriter, message string, statusCode int) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
json.NewEncoder(w).Encode(&model.ErrorResponse{Error: message})
}
// extractTextFromMessage tries multiple strategies to extract text from a message
func extractTextFromMessage(message json.RawMessage) string {
// Strategy 1: Direct string (simple text message)
var directString string
if err := json.Unmarshal(message, &directString); err == nil && directString != "" {
return directString
}
// Strategy 2: Array format [{"type": "text", "text": "..."}]
var msgArray []interface{}
if err := json.Unmarshal(message, &msgArray); err == nil {
for _, item := range msgArray {
if itemMap, ok := item.(map[string]interface{}); ok {
if itemMap["type"] == "text" {
if text, ok := itemMap["text"].(string); ok && text != "" {
return text
}
}
}
}
}
// Strategy 3: Content object format {"content": [{"type": "text", "text": "..."}]}
var msgContent map[string]interface{}
if err := json.Unmarshal(message, &msgContent); err == nil {
if content, ok := msgContent["content"]; ok {
if contentArray, ok := content.([]interface{}); ok {
for _, block := range contentArray {
if blockMap, ok := block.(map[string]interface{}); ok {
if blockMap["type"] == "text" {
if text, ok := blockMap["text"].(string); ok && text != "" {
return text
}
}
}
}
}
}
// Also check if content is a string directly
if contentStr, ok := msgContent["content"].(string); ok && contentStr != "" {
return contentStr
}
}
// Strategy 4: Single object with text field {"type": "text", "text": "..."}
var singleObj map[string]interface{}
if err := json.Unmarshal(message, &singleObj); err == nil {
if singleObj["type"] == "text" {
if text, ok := singleObj["text"].(string); ok && text != "" {
return text
}
}
// Also check for content field at top level
if text, ok := singleObj["content"].(string); ok && text != "" {
return text
}
}
return ""
}
// Conversation handlers
func (h *Handler) GetConversations(w http.ResponseWriter, r *http.Request) {
modelFilter := strings.ToLower(strings.TrimSpace(r.URL.Query().Get("model")))
if modelFilter == "" {
modelFilter = "all"
}
conversations, err := h.conversationService.GetConversations()
if err != nil {
log.Printf("❌ Error getting conversations: %v", err)
writeErrorResponse(w, "Failed to get conversations", http.StatusInternalServerError)
return
}
// Flatten all conversations into a single array for the UI
var allConversations []map[string]interface{}
for _, convs := range conversations {
for _, conv := range convs {
if modelFilter != "all" && !conversationModelMatchesFilter(conv.Model, modelFilter) {
continue
}
// Extract first user message from the conversation
var firstMessage string
for _, msg := range conv.Messages {
if msg.Type == "user" {
// Try multiple parsing strategies
text := extractTextFromMessage(msg.Message)
if text != "" {
firstMessage = text
if len(firstMessage) > 200 {
firstMessage = firstMessage[:200] + "..."
}
break
}
}
}
allConversations = append(allConversations, map[string]interface{}{
"id": conv.SessionID,
"requestCount": conv.MessageCount,
"startTime": conv.StartTime.Format(time.RFC3339),
"lastActivity": conv.EndTime.Format(time.RFC3339),
"duration": conv.EndTime.Sub(conv.StartTime).Milliseconds(),
"firstMessage": firstMessage,
"projectPath": conv.ProjectPath,
"projectName": conv.ProjectName,
"model": conv.Model,
})
}
}
// Sort by last activity (newest first)
sort.Slice(allConversations, func(i, j int) bool {
t1, _ := time.Parse(time.RFC3339, allConversations[i]["lastActivity"].(string))
t2, _ := time.Parse(time.RFC3339, allConversations[j]["lastActivity"].(string))
return t1.After(t2)
})
// Apply pagination
page, _ := strconv.Atoi(r.URL.Query().Get("page"))
if page < 1 {
page = defaultPage
}
limit, _ := strconv.Atoi(r.URL.Query().Get("limit"))
if limit <= 0 {
limit = defaultPageLimit
}
total := len(allConversations)
start := (page - 1) * limit
end := start + limit
hasMore := false
if start >= total {
allConversations = []map[string]interface{}{}
} else {
if end > total {
end = total
}
hasMore = end < total
allConversations = allConversations[start:end]
}
response := map[string]interface{}{
"conversations": allConversations,
"hasMore": hasMore,
"total": total,
"page": page,
"limit": limit,
}
writeJSONResponse(w, response)
}
func conversationModelMatchesFilter(modelValue, filter string) bool {
if filter == "" || filter == "all" {
return true
}
if modelValue == "" {
return false
}
modelValue = strings.ToLower(modelValue)
filter = strings.ToLower(filter)
return strings.Contains(modelValue, filter)
}
func (h *Handler) GetConversationByID(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
sessionID, ok := vars["id"]
if !ok {
writeErrorResponse(w, "Session ID is required", http.StatusBadRequest)
return
}
projectPath := r.URL.Query().Get("project")
if projectPath == "" {
writeErrorResponse(w, "Project path is required", http.StatusBadRequest)
return
}
conversation, err := h.conversationService.GetConversation(projectPath, sessionID)
if err != nil {
log.Printf("❌ Error getting conversation: %v", err)
writeErrorResponse(w, "Conversation not found", http.StatusNotFound)
return
}
writeJSONResponse(w, conversation)
}
func (h *Handler) GetConversationsByProject(w http.ResponseWriter, r *http.Request) {
projectPath := r.URL.Query().Get("project")
if projectPath == "" {
writeErrorResponse(w, "Project path is required", http.StatusBadRequest)
return
}
conversations, err := h.conversationService.GetConversationsByProject(projectPath)
if err != nil {
log.Printf("❌ Error getting project conversations: %v", err)
writeErrorResponse(w, "Failed to get project conversations", http.StatusInternalServerError)
return
}
writeJSONResponse(w, conversations)
}
// GetStats returns aggregated usage statistics
func (h *Handler) GetStats(w http.ResponseWriter, r *http.Request) {
startDate := r.URL.Query().Get("start_date")
endDate := r.URL.Query().Get("end_date")
modelFilter := r.URL.Query().Get("model")
orgFilter := r.URL.Query().Get("org")
stats, err := h.storageService.GetUsageStats(startDate, endDate, modelFilter, orgFilter)
if err != nil {
log.Printf("❌ Error getting usage stats: %v", err)
writeErrorResponse(w, "Failed to get usage statistics", http.StatusInternalServerError)
return
}
writeJSONResponse(w, stats)
}
// GetRequestsSummary returns lightweight request data for fast list rendering
func (h *Handler) GetRequestsSummary(w http.ResponseWriter, r *http.Request) {
modelFilter := r.URL.Query().Get("model")
if modelFilter == "" {
modelFilter = "all"
}
// Get start/end time range (UTC ISO 8601 format from browser)
startTime := r.URL.Query().Get("start")
endTime := r.URL.Query().Get("end")
// Parse pagination params
offset := 0
limit := 0 // Default to 0 (no limit - fetch all)
if offsetStr := r.URL.Query().Get("offset"); offsetStr != "" {
if parsed, err := strconv.Atoi(offsetStr); err == nil && parsed >= 0 {
offset = parsed
}
}
if limitStr := r.URL.Query().Get("limit"); limitStr != "" {
if parsed, err := strconv.Atoi(limitStr); err == nil && parsed > 0 && parsed <= maxPageLimit {
limit = parsed
}
}
summaries, total, err := h.storageService.GetRequestsSummaryPaginated(modelFilter, startTime, endTime, offset, limit)
if err != nil {
log.Printf("Error getting request summaries: %v", err)
writeErrorResponse(w, "Failed to get requests", http.StatusInternalServerError)
return
}
writeJSONResponse(w, struct {
Requests []*model.RequestSummary `json:"requests"`
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}{
Requests: summaries,
Total: total,
Offset: offset,
Limit: limit,
})
}
// GetRequestByID returns a single request by its ID
func (h *Handler) GetRequestByID(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
requestID := vars["id"]
if requestID == "" {
writeErrorResponse(w, "Request ID is required", http.StatusBadRequest)
return
}
request, fullID, err := h.storageService.GetRequestByShortID(requestID)
if err != nil {
log.Printf("Error getting request by ID %s: %v", requestID, err)
writeErrorResponse(w, "Failed to get request", http.StatusInternalServerError)
return
}
if request == nil {
writeErrorResponse(w, "Request not found", http.StatusNotFound)
return
}
writeJSONResponse(w, struct {
Request *model.RequestLog `json:"request"`
FullID string `json:"fullId"`
}{
Request: request,
FullID: fullID,
})
}
// GetDashboardStats returns aggregated dashboard statistics (daily token usage)
func (h *Handler) GetDashboardStats(w http.ResponseWriter, r *http.Request) {
// Get start/end time range (UTC ISO 8601 format from browser)
startTime := r.URL.Query().Get("start")
endTime := r.URL.Query().Get("end")
// Fallback to last 7 days if not provided
if startTime == "" || endTime == "" {
now := time.Now().UTC()
endTime = now.Format(time.RFC3339)
startTime = now.AddDate(0, 0, -7).Format(time.RFC3339)
}
orgFilter := r.URL.Query().Get("org")
stats, err := h.storageService.GetStats(startTime, endTime, orgFilter)
if err != nil {
log.Printf("Error getting dashboard stats: %v", err)
writeErrorResponse(w, "Failed to get stats", http.StatusInternalServerError)
return
}
writeJSONResponse(w, stats)
}
// GetHourlyStats returns hourly breakdown for a specific date range
func (h *Handler) GetHourlyStats(w http.ResponseWriter, r *http.Request) {
// Get start/end time range (UTC ISO 8601 format from browser)
startTime := r.URL.Query().Get("start")
endTime := r.URL.Query().Get("end")
if startTime == "" || endTime == "" {
writeErrorResponse(w, "start and end parameters are required", http.StatusBadRequest)
return
}
bucketMinutes := defaultBucketMinutes
if b := r.URL.Query().Get("bucket"); b != "" {
if parsed, err := strconv.Atoi(b); err == nil && parsed > 0 {
bucketMinutes = parsed
}
}
orgFilter := r.URL.Query().Get("org")
stats, err := h.storageService.GetHourlyStats(startTime, endTime, bucketMinutes, orgFilter)
if err != nil {
log.Printf("Error getting hourly stats: %v", err)
writeErrorResponse(w, "Failed to get hourly stats", http.StatusInternalServerError)
return
}
writeJSONResponse(w, stats)
}
// GetModelStats returns model breakdown for a specific date range
func (h *Handler) GetModelStats(w http.ResponseWriter, r *http.Request) {
// Get start/end time range (UTC ISO 8601 format from browser)
startTime := r.URL.Query().Get("start")
endTime := r.URL.Query().Get("end")
if startTime == "" || endTime == "" {
writeErrorResponse(w, "start and end parameters are required", http.StatusBadRequest)
return
}
orgFilter := r.URL.Query().Get("org")
stats, err := h.storageService.GetModelStats(startTime, endTime, orgFilter)
if err != nil {
log.Printf("Error getting model stats: %v", err)
writeErrorResponse(w, "Failed to get model stats", http.StatusInternalServerError)
return
}
writeJSONResponse(w, stats)
}
// GetOrganizations returns distinct organization IDs seen in requests
func (h *Handler) GetOrganizations(w http.ResponseWriter, r *http.Request) {
orgs, err := h.storageService.GetDistinctOrganizations()
if err != nil {
log.Printf("Error getting organizations: %v", err)
writeErrorResponse(w, "Failed to get organizations", http.StatusInternalServerError)
return
}
if orgs == nil {
orgs = []string{}
}
writeJSONResponse(w, struct {
Organizations []string `json:"organizations"`
}{Organizations: orgs})
}
// GetLatestRequestDate returns the date of the most recent request
func (h *Handler) GetLatestRequestDate(w http.ResponseWriter, r *http.Request) {
latestDate, err := h.storageService.GetLatestRequestDate()
if err != nil {
log.Printf("Error getting latest request date: %v", err)
writeErrorResponse(w, "Failed to get latest request date", http.StatusInternalServerError)
return
}
writeJSONResponse(w, map[string]interface{}{
"latestDate": latestDate,
})
}
// GetSettings returns the current proxy settings
func (h *Handler) GetSettings(w http.ResponseWriter, r *http.Request) {
settings, err := h.storageService.GetSettings()
if err != nil {
log.Printf("Error getting settings: %v", err)
writeErrorResponse(w, "Failed to get settings", http.StatusInternalServerError)
return
}
writeJSONResponse(w, settings)
}
// SaveSettings updates the proxy settings
func (h *Handler) SaveSettings(w http.ResponseWriter, r *http.Request) {
var settings model.ProxySettings
if err := json.NewDecoder(r.Body).Decode(&settings); err != nil {
writeErrorResponse(w, "Invalid request body", http.StatusBadRequest)
return
}
if err := h.storageService.SaveSettings(&settings); err != nil {
log.Printf("Error saving settings: %v", err)
writeErrorResponse(w, "Failed to save settings", http.StatusInternalServerError)
return
}
// Update the in-memory cached settings
h.cachedSettingsMu.Lock()
h.cachedSettings = &settings
h.cachedSettingsMu.Unlock()
writeJSONResponse(w, settings)
}
// GetHeaderRules returns the current header rules (convenience for the proxy to apply)
func (h *Handler) GetCachedSettings() *model.ProxySettings {
h.cachedSettingsMu.RLock()
if h.cachedSettings != nil {
settings := h.cachedSettings
h.cachedSettingsMu.RUnlock()
return settings
}
h.cachedSettingsMu.RUnlock()
settings, err := h.storageService.GetSettings()
if err != nil {
log.Printf("Error loading settings: %v", err)
return &model.ProxySettings{}
}
h.cachedSettingsMu.Lock()
defer h.cachedSettingsMu.Unlock()
if h.cachedSettings != nil {
return h.cachedSettings
}
h.cachedSettings = settings
return settings
}
// logForwardFailure emits a structured key=value diagnostic line plus the
// existing human-readable error so timeouts to the upstream provider are
// debuggable from logs alone (request shape, body size, betas, thinking, etc.).
func logForwardFailure(
r *http.Request,
req *model.AnthropicRequest,
bodyBytes []byte,
decision *service.RoutingDecision,
requestID string,
startTime time.Time,
forwardCtx context.Context,
forwardErr error,
) {
hasThinking := false
var raw map[string]json.RawMessage
if json.Unmarshal(bodyBytes, &raw) == nil {
_, hasThinking = raw["thinking"]
}
ctxErr := ""
if e := forwardCtx.Err(); e != nil {
ctxErr = e.Error()
}
provName := ""
if decision != nil && decision.Provider != nil {
provName = decision.Provider.Name()
}
origModel, routedModel := "", ""
if decision != nil {
origModel = decision.OriginalModel
routedModel = decision.TargetModel
}
log.Printf(
"forward_error request_id=%s provider=%s model=%s routed_model=%s stream=%t body_bytes=%d messages=%d tools=%d max_tokens=%d has_thinking=%t query=%q anthropic_beta=%q client=%q elapsed=%s ctx_err=%s err=%q",
requestID,
provName,
origModel,
routedModel,
req.Stream,
len(bodyBytes),
len(req.Messages),
len(req.Tools),
req.MaxTokens,
hasThinking,
r.URL.RawQuery,
r.Header.Get("anthropic-beta"),
r.Header.Get("User-Agent"),
time.Since(startTime),
ctxErr,
forwardErr.Error(),
)
// Backwards-compat: keep the existing categorized line so anything grepping
// for "Error forwarding" or "Timeout forwarding" still matches.
switch forwardCtx.Err() {
case context.DeadlineExceeded:
log.Printf("❌ Timeout forwarding to %s API after 30 minutes: %v", provName, forwardErr)
case context.Canceled:
log.Printf("❌ Context canceled forwarding to %s API: %v", provName, forwardErr)
default:
log.Printf("❌ Error forwarding to %s API: %v", provName, forwardErr)
}
}
// handleDemotedStreamingResponse consumes an Anthropic SSE stream and writes a
// single non-streaming JSON response to the client. Used when the client
// requested stream=false but we forced stream=true upstream to avoid the
// ResponseHeaderTimeout. Only invoked for the anthropic provider.
func (h *Handler) handleDemotedStreamingResponse(w http.ResponseWriter, resp *http.Response, requestLog *model.RequestLog, startTime time.Time) {
CopyAllResponseHeaders(w, resp)
h.applyResponseHeaderRules(w)
// Upstream errors come back as JSON, not SSE — forward as-is with the
// correct content type for the non-streaming client.
if resp.StatusCode != http.StatusOK {
errorBytes, _ := io.ReadAll(resp.Body)
log.Printf("❌ Anthropic API error during demoted stream: %d %s", resp.StatusCode, string(errorBytes))
responseLog := &model.ResponseLog{
StatusCode: resp.StatusCode,
Headers: SanitizeResponseHeaders(resp.Header),
BodyText: string(errorBytes),
ResponseTime: time.Since(startTime).Milliseconds(),
IsStreaming: false,
CompletedAt: time.Now().Format(time.RFC3339),
RateLimit: ExtractRateLimitInfo(resp.Header),
}
requestLog.Response = responseLog
extractOrganizationID(requestLog, resp.Header)
if err := h.storageService.UpdateRequestWithResponse(requestLog); err != nil {
log.Printf("❌ Error updating request with error response: %v", err)
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
w.Write(errorBytes)
return
}
msg, accumErr := accumulateSSEToMessage(resp.Body)
responseLog := &model.ResponseLog{
StatusCode: resp.StatusCode,
Headers: SanitizeResponseHeaders(resp.Header),
ResponseTime: time.Since(startTime).Milliseconds(),
IsStreaming: false, // client-facing shape is non-streaming
CompletedAt: time.Now().Format(time.RFC3339),
RateLimit: ExtractRateLimitInfo(resp.Header),
}
if accumErr != nil {
responseLog.StreamError = accumErr.Error()
}
var bodyBytes []byte
if accumErr == nil && msg != nil {
var marshalErr error
bodyBytes, marshalErr = json.Marshal(msg)
if marshalErr != nil {
log.Printf("❌ Error marshaling demoted response: %v", marshalErr)
accumErr = marshalErr
} else {
responseLog.Body = json.RawMessage(bodyBytes)
}
}
requestLog.Response = responseLog
extractOrganizationID(requestLog, resp.Header)
if err := h.storageService.UpdateRequestWithResponse(requestLog); err != nil {
log.Printf("❌ Error updating request with response: %v", err)
}
if accumErr != nil {
log.Printf("❌ Demotion accumulator error: %v", accumErr)
w.Header().Set("Content-Type", "application/json")
writeErrorResponse(w, fmt.Sprintf("Failed to assemble response: %v", accumErr), http.StatusBadGateway)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(bodyBytes)
}
// accumulateSSEToMessage walks an Anthropic SSE stream and builds the
// equivalent non-streaming response body. Handles text, tool_use (with
// partial_json reassembly), thinking, and signature blocks indexed by
// content_block.index. Returns the message map ready for json.Marshal.
//
// Note: parses partial_json (the actual Anthropic field) for input_json_delta,
// not "input" — the existing model.Delta.Input wiring used by the legacy
// streaming accumulator never matched the wire format.
func accumulateSSEToMessage(body io.Reader) (map[string]interface{}, error) {
var (
msgID, modelName, role, stopReason string
stopSequence interface{}
usage map[string]interface{}
sawMessageStop bool
)
contentBlocks := map[int]map[string]interface{}{}
toolJSONBuilders := map[int]*strings.Builder{}
maxIndex := -1
err := sse.ForEachLine(body, func(line string) error {
if !strings.HasPrefix(line, "data:") {
return nil
}
var ev map[string]interface{}
if err := json.Unmarshal([]byte(strings.TrimPrefix(line, "data: ")), &ev); err != nil {
// Malformed event line — skip rather than abort the stream.
return nil
}
evType, _ := ev["type"].(string)
switch evType {
case "message_start":
msg, ok := ev["message"].(map[string]interface{})
if !ok {
return nil
}
if v, ok := msg["id"].(string); ok {
msgID = v
}
if v, ok := msg["model"].(string); ok {
modelName = v
}
if v, ok := msg["role"].(string); ok {
role = v
}
if v, ok := msg["stop_reason"].(string); ok && v != "" {
stopReason = v
}
if v, ok := msg["stop_sequence"]; ok {
stopSequence = v
}
if u, ok := msg["usage"].(map[string]interface{}); ok {
usage = map[string]interface{}{}
for k, val := range u {
usage[k] = val
}
}
case "content_block_start":
idx, ok := indexFromEvent(ev)
if !ok {
return nil
}
cb, ok := ev["content_block"].(map[string]interface{})
if !ok {
return nil
}
block := map[string]interface{}{}
for k, v := range cb {
block[k] = v
}
contentBlocks[idx] = block
if idx > maxIndex {
maxIndex = idx
}
if t, _ := block["type"].(string); t == "tool_use" {
toolJSONBuilders[idx] = &strings.Builder{}
// Tool input arrives via input_json_delta events; clear seed.
delete(block, "input")
}
case "content_block_delta":
idx, ok := indexFromEvent(ev)
if !ok {
return nil
}
block := contentBlocks[idx]
if block == nil {
return nil
}
delta, ok := ev["delta"].(map[string]interface{})
if !ok {
return nil
}
dType, _ := delta["type"].(string)
switch dType {
case "text_delta":
cur, _ := block["text"].(string)
if t, ok := delta["text"].(string); ok {
block["text"] = cur + t
}
case "input_json_delta":
if pj, ok := delta["partial_json"].(string); ok {
if b, ok := toolJSONBuilders[idx]; ok {
b.WriteString(pj)
}
}
case "thinking_delta":
cur, _ := block["thinking"].(string)
if t, ok := delta["thinking"].(string); ok {
block["thinking"] = cur + t
}
case "signature_delta":
cur, _ := block["signature"].(string)
if s, ok := delta["signature"].(string); ok {
block["signature"] = cur + s
}
case "citations_delta":
// Anthropic streams each citation as its own delta event;
// non-streaming responses return them as a `citations` array
// on the content block. Append to preserve order.
if c, ok := delta["citation"]; ok {
existing, _ := block["citations"].([]interface{})
block["citations"] = append(existing, c)
}
}
case "content_block_stop":
idx, ok := indexFromEvent(ev)
if !ok {
return nil
}
b, isToolBuilder := toolJSONBuilders[idx]
if !isToolBuilder {
return nil
}
block := contentBlocks[idx]
if block == nil {
return nil
}
s := b.String()
if s == "" {
block["input"] = map[string]interface{}{}
return nil
}
var input interface{}
if err := json.Unmarshal([]byte(s), &input); err != nil {
// Keep the raw partial JSON so a debugger can see what came through.
block["input_raw"] = s
return fmt.Errorf("tool_use input_json_delta did not parse as JSON at index %d: %w", idx, err)
}
block["input"] = input
case "message_delta":
if delta, ok := ev["delta"].(map[string]interface{}); ok {
if v, ok := delta["stop_reason"].(string); ok && v != "" {
stopReason = v
}
if v, ok := delta["stop_sequence"]; ok {
stopSequence = v
}
}
if u, ok := ev["usage"].(map[string]interface{}); ok {
if usage == nil {
usage = map[string]interface{}{}
}
for k, v := range u {
usage[k] = v
}
}
case "message_stop":
sawMessageStop = true
case "error":
if errObj, ok := ev["error"].(map[string]interface{}); ok {
m, _ := errObj["message"].(string)
t, _ := errObj["type"].(string)
return fmt.Errorf("upstream stream error: %s (%s)", m, t)
}
return fmt.Errorf("upstream stream error event without details")
}
return nil
})
if err != nil {
return nil, err
}
if !sawMessageStop {
return nil, fmt.Errorf("stream ended before message_stop")
}
blocks := make([]map[string]interface{}, 0, maxIndex+1)
for i := 0; i <= maxIndex; i++ {
if b := contentBlocks[i]; b != nil {
blocks = append(blocks, b)
}
}
out := map[string]interface{}{
"id": msgID,
"type": "message",
"role": role,
"model": modelName,
"content": blocks,
"stop_reason": stopReason,
"stop_sequence": stopSequence,
}
if usage != nil {
out["usage"] = usage
}
return out, nil
}
func indexFromEvent(ev map[string]interface{}) (int, bool) {
f, ok := ev["index"].(float64)
if !ok {
return 0, false
}
return int(f), true
}