Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion frontend/app/aipanel/aipanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ const AIPanelComponentInner = memo(() => {
};

useEffect(() => {
globalStore.set(model.isAIStreaming, status == "streaming");
globalStore.set(model.isAIStreaming, status === "streaming" || status === "submitted");
}, [status]);

useEffect(() => {
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

137 changes: 111 additions & 26 deletions pkg/aiusechat/anthropic/anthropic-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"net/url"
"sort"
"strings"
"time"

Expand All @@ -20,6 +21,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/aiusechat/aiutil"
"github.com/wavetermdev/waveterm/pkg/aiusechat/chatstore"
"github.com/wavetermdev/waveterm/pkg/aiusechat/uctypes"
"github.com/wavetermdev/waveterm/pkg/util/logutil"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
"github.com/wavetermdev/waveterm/pkg/web/sse"
)
Expand Down Expand Up @@ -56,10 +58,11 @@ func (m *anthropicChatMessage) GetUsage() *uctypes.AIUsage {
}

return &uctypes.AIUsage{
APIType: uctypes.APIType_AnthropicMessages,
Model: m.Usage.Model,
InputTokens: m.Usage.InputTokens,
OutputTokens: m.Usage.OutputTokens,
APIType: uctypes.APIType_AnthropicMessages,
Model: m.Usage.Model,
InputTokens: m.Usage.InputTokens,
OutputTokens: m.Usage.OutputTokens,
NativeWebSearchCount: m.Usage.NativeWebSearchCount,
}
}

Expand Down Expand Up @@ -95,8 +98,9 @@ type anthropicMessageContentBlock struct {
Name string `json:"name,omitempty"`
Input interface{} `json:"input,omitempty"`

ToolUseDisplayName string `json:"toolusedisplayname,omitempty"` // internal field (cannot marshal to API, must be stripped)
ToolUseShortDescription string `json:"tooluseshortdescription,omitempty"` // internal field (cannot marshal to API, must be stripped)
ToolUseDisplayName string `json:"toolusedisplayname,omitempty"` // internal field (cannot marshal to API, must be stripped)
ToolUseShortDescription string `json:"tooluseshortdescription,omitempty"` // internal field (cannot marshal to API, must be stripped)
ToolUseData *uctypes.UIMessageDataToolUse `json:"toolusedata,omitempty"` // internal field (cannot marshal to API, must be stripped)

// Tool result content
ToolUseID string `json:"tool_use_id,omitempty"`
Expand Down Expand Up @@ -154,6 +158,7 @@ func (b *anthropicMessageContentBlock) Clean() *anthropicMessageContentBlock {
rtn.SourcePreviewUrl = ""
rtn.ToolUseDisplayName = ""
rtn.ToolUseShortDescription = ""
rtn.ToolUseData = nil
if rtn.Source != nil {
rtn.Source = rtn.Source.Clean()
}
Expand All @@ -177,10 +182,15 @@ type anthropicStreamRequest struct {
Stream bool `json:"stream"`
System []anthropicMessageContentBlock `json:"system,omitempty"`
ToolChoice any `json:"tool_choice,omitempty"`
Tools []uctypes.ToolDefinition `json:"tools,omitempty"`
Tools []any `json:"tools,omitempty"` // *uctypes.ToolDefinition or *anthropicWebSearchTool
Thinking *anthropicThinkingOpts `json:"thinking,omitempty"`
}

type anthropicWebSearchTool struct {
Type string `json:"type"` // "web_search_20250305"
Name string `json:"name"` // "web_search"
}

type anthropicCacheControl struct {
Type string `json:"type"` // "ephemeral"
TTL string `json:"ttl"` // "5m" or "1h"
Expand Down Expand Up @@ -228,8 +238,9 @@ type anthropicUsageType struct {
CacheCreationInputTokens int `json:"cache_creation_input_tokens,omitempty"`
CacheReadInputTokens int `json:"cache_read_input_tokens,omitempty"`

// internal field for Wave use (not sent to API)
Model string `json:"model,omitempty"`
// internal fields for Wave use (not sent to API)
Model string `json:"model,omitempty"`
NativeWebSearchCount int `json:"nativewebsearchcount,omitempty"`

// for reference, but we dont keep thsese up to date or track them
CacheCreation *anthropicCacheCreationType `json:"cache_creation,omitempty"` // breakdown of cached tokens by TTL
Expand Down Expand Up @@ -290,14 +301,16 @@ type partialJSON struct {
}

type streamingState struct {
blockMap map[int]*blockState
toolCalls []uctypes.WaveToolCall
stopFromDelta string
msgID string
model string
stepStarted bool
rtnMessage *anthropicChatMessage
usage *anthropicUsageType
blockMap map[int]*blockState
toolCalls []uctypes.WaveToolCall
stopFromDelta string
msgID string
model string
stepStarted bool
rtnMessage *anthropicChatMessage
usage *anthropicUsageType
chatOpts uctypes.WaveChatOpts
webSearchCount int
}

func (p *partialJSON) Write(s string) {
Expand Down Expand Up @@ -330,6 +343,20 @@ func (p *partialJSON) FinalObject() (json.RawMessage, error) {
}
}

// sanitizeHostnameInError removes the Wave cloud hostname from error messages
func sanitizeHostnameInError(err error) error {
if err == nil {
return nil
}
errStr := err.Error()
parsedURL, parseErr := url.Parse(uctypes.DefaultAIEndpoint)
if parseErr == nil && parsedURL.Host != "" && strings.Contains(errStr, parsedURL.Host) {
errStr = strings.ReplaceAll(errStr, uctypes.DefaultAIEndpoint, "AI service")
errStr = strings.ReplaceAll(errStr, parsedURL.Host, "host")
}
return fmt.Errorf("%s", errStr)
}

// makeThinkingOpts creates thinking options based on level and max tokens
func makeThinkingOpts(thinkingLevel string, maxTokens int) *anthropicThinkingOpts {
if thinkingLevel != uctypes.ThinkingLevelMedium && thinkingLevel != uctypes.ThinkingLevelHigh {
Expand Down Expand Up @@ -373,21 +400,21 @@ func parseAnthropicHTTPError(resp *http.Response) error {
// Try to parse as Anthropic error format first
var eresp anthropicHTTPErrorResponse
if err := json.Unmarshal(slurp, &eresp); err == nil && eresp.Error.Message != "" {
return fmt.Errorf("anthropic %s: %s", resp.Status, eresp.Error.Message)
return sanitizeHostnameInError(fmt.Errorf("anthropic %s: %s", resp.Status, eresp.Error.Message))
}

// Try to parse as proxy error format
var proxyErr uctypes.ProxyErrorResponse
if err := json.Unmarshal(slurp, &proxyErr); err == nil && !proxyErr.Success && proxyErr.Error != "" {
return fmt.Errorf("anthropic %s: %s", resp.Status, proxyErr.Error)
return sanitizeHostnameInError(fmt.Errorf("anthropic %s: %s", resp.Status, proxyErr.Error))
}

// Fall back to truncated raw response
msg := utilfn.TruncateString(strings.TrimSpace(string(slurp)), 120)
if msg == "" {
msg = "unknown error"
}
return fmt.Errorf("anthropic %s: %s", resp.Status, msg)
return sanitizeHostnameInError(fmt.Errorf("anthropic %s: %s", resp.Status, msg))
}

func RunAnthropicChatStep(
Expand Down Expand Up @@ -426,7 +453,7 @@ func RunAnthropicChatStep(

// Validate continuation if provided
if cont != nil {
if chatOpts.Config.Model != cont.Model {
if !uctypes.AreModelsCompatible(chat.APIType, chatOpts.Config.Model, cont.Model) {
return nil, nil, nil, fmt.Errorf("cannot continue with a different model, model:%q, cont-model:%q", chatOpts.Config.Model, cont.Model)
}
}
Expand Down Expand Up @@ -461,7 +488,7 @@ func RunAnthropicChatStep(

resp, err := httpClient.Do(req)
if err != nil {
return nil, nil, nil, err
return nil, nil, nil, sanitizeHostnameInError(err)
}
defer resp.Body.Close()

Expand Down Expand Up @@ -499,7 +526,7 @@ func RunAnthropicChatStep(
// Use eventsource decoder for proper SSE parsing
decoder := eventsource.NewDecoder(resp.Body)

stopReason, rtnMessage := handleAnthropicStreamingResp(ctx, sse, decoder, cont)
stopReason, rtnMessage := handleAnthropicStreamingResp(ctx, sse, decoder, cont, chatOpts)
return stopReason, rtnMessage, rateLimitInfo, nil
}

Expand All @@ -509,6 +536,7 @@ func handleAnthropicStreamingResp(
sse *sse.SSEHandlerCh,
decoder *eventsource.Decoder,
cont *uctypes.WaveContinueResponse,
chatOpts uctypes.WaveChatOpts,
) (*uctypes.WaveStopReason, *anthropicChatMessage) {
// Per-response state
state := &streamingState{
Expand All @@ -518,6 +546,7 @@ func handleAnthropicStreamingResp(
Role: "assistant",
Content: []anthropicMessageContentBlock{},
},
chatOpts: chatOpts,
}

var rtnStopReason *uctypes.WaveStopReason
Expand All @@ -526,8 +555,10 @@ func handleAnthropicStreamingResp(
defer func() {
// Set usage in the returned message
if state.usage != nil {
// Set model in usage for internal use
state.usage.Model = state.model
if state.webSearchCount > 0 {
state.usage.NativeWebSearchCount = state.webSearchCount
}
state.rtnMessage.Usage = state.usage
}

Expand Down Expand Up @@ -558,6 +589,13 @@ func handleAnthropicStreamingResp(
// Normal end of stream
break
}
if sse.Err() != nil {
return &uctypes.WaveStopReason{
Kind: uctypes.StopKindCanceled,
ErrorType: "client_disconnect",
ErrorText: "client disconnected",
}, extractPartialTextFromState(state)
}
// transport error mid-stream
_ = sse.AiMsgError(err.Error())
return &uctypes.WaveStopReason{
Expand Down Expand Up @@ -587,6 +625,37 @@ func handleAnthropicStreamingResp(
return rtnStopReason, state.rtnMessage
}

func extractPartialTextFromState(state *streamingState) *anthropicChatMessage {
var content []anthropicMessageContentBlock
for _, block := range state.rtnMessage.Content {
if block.Type == "text" && block.Text != "" {
content = append(content, block)
}
}
var partialIdx []int
for idx, st := range state.blockMap {
if st.kind == blockText && st.contentBlock != nil && st.contentBlock.Text != "" {
partialIdx = append(partialIdx, idx)
}
}
sort.Ints(partialIdx)
for _, idx := range partialIdx {
st := state.blockMap[idx]
if st.kind == blockText && st.contentBlock != nil && st.contentBlock.Text != "" {
content = append(content, *st.contentBlock)
}
}
if len(content) == 0 {
return nil
}
return &anthropicChatMessage{
MessageId: state.rtnMessage.MessageId,
Role: "assistant",
Content: content,
Usage: state.rtnMessage.Usage,
}
}

// handleAnthropicEvent processes one SSE event block. It may emit SSE parts
// and/or return a StopReason when the stream is complete.
//
Expand All @@ -601,6 +670,13 @@ func handleAnthropicEvent(
state *streamingState,
cont *uctypes.WaveContinueResponse,
) (stopFromDelta *string, final *uctypes.WaveStopReason) {
if err := sse.Err(); err != nil {
return nil, &uctypes.WaveStopReason{
Kind: uctypes.StopKindCanceled,
ErrorType: "client_disconnect",
ErrorText: "client disconnected",
}
}
eventName := event.Event()
data := event.Data()
switch eventName {
Expand Down Expand Up @@ -693,6 +769,10 @@ func handleAnthropicEvent(
}
state.blockMap[idx] = st
_ = sse.AiMsgToolInputStart(tcID, tName)
case "server_tool_use":
if ev.ContentBlock.Name == "web_search" {
state.webSearchCount++
}
default:
// ignore other block types gracefully per Anthropic guidance :contentReference[oaicite:18]{index=18}
}
Expand Down Expand Up @@ -732,6 +812,7 @@ func handleAnthropicEvent(
if st.kind == blockToolUse {
st.accumJSON.Write(ev.Delta.PartialJSON)
_ = sse.AiMsgToolInputDelta(st.toolCallID, ev.Delta.PartialJSON)
aiutil.SendToolProgress(st.toolCallID, st.toolName, st.accumJSON.Bytes(), state.chatOpts, sse, true)
}
case "signature_delta":
// Accumulate signature for thinking blocks
Expand Down Expand Up @@ -784,6 +865,7 @@ func handleAnthropicEvent(
}
}
_ = sse.AiMsgToolInputAvailable(st.toolCallID, st.toolName, raw)
aiutil.SendToolProgress(st.toolCallID, st.toolName, raw, state.chatOpts, sse, false)
state.toolCalls = append(state.toolCalls, uctypes.WaveToolCall{
ID: st.toolCallID,
Name: st.toolName,
Expand All @@ -798,6 +880,9 @@ func handleAnthropicEvent(
}
state.rtnMessage.Content = append(state.rtnMessage.Content, toolUseBlock)
}
// extractPartialTextFromState reads blockMap for still-in-flight content, so remove completed blocks
// once they have been appended to rtnMessage.Content to avoid duplicate text on disconnect.
delete(state.blockMap, *ev.Index)
return nil, nil

case "message_delta":
Expand Down Expand Up @@ -868,7 +953,7 @@ func handleAnthropicEvent(
}

default:
log.Printf("unknown anthropic event type: %s", eventName)
logutil.DevPrintf("unknown anthropic event type: %s", eventName)
return nil, nil
}
}
Loading
Loading