Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions internal/x402/buyer/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,46 @@ type replayableX402Transport struct {
OnPaymentUnsettled PaymentCallback
}

// maxLogBodyBytes is the maximum number of bytes read from a response body for
// structured log output. Truncation prevents huge bodies from flooding logs.
const maxLogBodyBytes = 512

// peekResponseBody reads up to maxLogBodyBytes from resp.Body for logging and
// reassembles the full body (via a TeeReader drain) so the caller (ReverseProxy)
// can still copy it to the client. The body is replaced in-place on resp.
func peekResponseBody(resp *http.Response) string {
if resp == nil || resp.Body == nil {
return ""
}

// Drain up to maxLogBodyBytes+1 bytes via a buffer, then prepend those
// bytes back using io.MultiReader so the rest of the body is still readable.
limitedBuf := new(bytes.Buffer)
_, _ = io.Copy(limitedBuf, io.LimitReader(resp.Body, maxLogBodyBytes+1))

preview := limitedBuf.Bytes()
// Reassemble: preview bytes + remainder of original body.
resp.Body = io.NopCloser(io.MultiReader(bytes.NewReader(preview), resp.Body))

s := string(preview)
if len(s) > maxLogBodyBytes {
s = s[:maxLogBodyBytes] + "…"
}
return s
}

// sanitizeLogString strips control characters (including CR/LF) from values
// that may originate from upstream HTTP requests or responses, preventing log
// injection (CWE-117) when the strings are interpolated into log lines.
func sanitizeLogString(s string) string {
return strings.Map(func(r rune) rune {
if r < 0x20 || r == 0x7f {
return '_'
}
return r
}, s)
}

func (t *replayableX402Transport) RoundTrip(req *http.Request) (*http.Response, error) {
if t.Base == nil {
t.Base = http.DefaultTransport
Expand All @@ -440,9 +480,21 @@ func (t *replayableX402Transport) RoundTrip(req *http.Request) (*http.Response,
firstReq.Header.Set("User-Agent", userAgent)
resp, err := t.Base.RoundTrip(firstReq)
if err != nil {
log.Printf("x402-buyer: outbound %s %s → transport error: %s",
sanitizeLogString(req.Method),
sanitizeLogString(req.URL.String()),
sanitizeLogString(err.Error()))
return nil, err
}
if resp.StatusCode != http.StatusPaymentRequired {
// Non-402 response on the probe request — pass through verbatim so the
// caller (LiteLLM) sees the real upstream status and body, not a
// remapped "Payment verification failed" generic error.
excerpt := peekResponseBody(resp)
log.Printf("x402-buyer: outbound %s %s → %d (no payment required) body=%q",
sanitizeLogString(req.Method),
sanitizeLogString(req.URL.String()),
resp.StatusCode, excerpt)
return resp, nil
}

Expand Down Expand Up @@ -531,6 +583,11 @@ func (t *replayableX402Transport) RoundTrip(req *http.Request) (*http.Response,

if err != nil {
releaseHeldPreSignedSpend(t.Signers, heldAuth)
log.Printf("x402-buyer: outbound %s %s (with X-PAYMENT) → transport error after %s: %s",
sanitizeLogString(req.Method),
sanitizeLogString(req.URL.String()),
duration.Round(time.Millisecond),
sanitizeLogString(err.Error()))
if t.OnPaymentFailure != nil {
t.OnPaymentFailure(PaymentEvent{
Type: PaymentEventFailure,
Expand All @@ -545,10 +602,25 @@ func (t *replayableX402Transport) RoundTrip(req *http.Request) (*http.Response,
}

if respRetry.StatusCode >= http.StatusBadRequest {
// Non-2xx after payment retry: release the held auth (not consumed) and
// pass the upstream status+body verbatim to the caller. This ensures the
// real upstream error (e.g. 403 Cloudflare WAF, 404, 500) reaches
// LiteLLM instead of being misclassified as "Payment verification failed".
excerpt := peekResponseBody(respRetry)
log.Printf("x402-buyer: outbound %s %s (with X-PAYMENT) → %d after %s body=%q — passing through upstream error",
sanitizeLogString(req.Method),
sanitizeLogString(req.URL.String()),
respRetry.StatusCode, duration.Round(time.Millisecond), excerpt)
releaseHeldPreSignedSpend(t.Signers, heldAuth)
return respRetry, nil
}

// 2xx — log the successful paid response.
log.Printf("x402-buyer: outbound %s %s (with X-PAYMENT) → %d after %s",
sanitizeLogString(req.Method),
sanitizeLogString(req.URL.String()),
respRetry.StatusCode, duration.Round(time.Millisecond))

if len(t.Signers) == 1 {
if ps, ok := t.Signers[0].(*PreSignedSigner); ok && heldAuth != nil {
if confirmErr := ps.ConfirmSpend(heldAuth); confirmErr != nil {
Expand Down
233 changes: 233 additions & 0 deletions internal/x402/buyer/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2124,3 +2124,236 @@ func TestProxy_UserAgentOnPaidRequest(t *testing.T) {
t.Errorf("paid User-Agent = %q, want %q", capturedPaidUA, wantUA)
}
}

// TestUpstreamStatusPropagation exercises the pass-through behaviour added in
// fix(x402-buyer): propagate upstream status/body and log every request.
//
// Evidence: during a live demo against inference.v1337.org the upstream
// returned HTTP 403 + body "error code: 1010" (Cloudflare WAF) but LiteLLM
// surfaced it as "ServiceUnavailableError: Payment verification failed" — a
// misleading remapping that cost ~30 min of fruitless debugging. The sidecar
// must forward non-402 non-2xx responses verbatim so the real error is visible.
func TestUpstreamStatusPropagation(t *testing.T) {
// paymentRequiredBody is the 402 body the upstream emits on the probe request.
paymentRequiredBody := `{
"x402Version": 1,
"accepts": [{
"scheme": "exact",
"network": "base-sepolia",
"maxAmountRequired": "1000",
"asset": "0x036CbD53842c5426634e7929541eC2318f3dCF7e",
"payTo": "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"
}]
}`

type upstreamBehaviour int
const (
// directStatus: upstream returns the given status immediately (no 402 handshake).
directStatus upstreamBehaviour = iota
// afterPayment: upstream first returns 402, then returns the given status when X-PAYMENT is present.
afterPayment
)

tests := []struct {
name string
behaviour upstreamBehaviour
upstreamCode int
upstreamBody string
wantCode int
wantBodySub string
}{
{
name: "200 direct — forwarded as 200",
behaviour: directStatus,
upstreamCode: http.StatusOK,
upstreamBody: `{"choices":[{"message":{"content":"ok"}}]}`,
wantCode: http.StatusOK,
wantBodySub: "ok",
},
{
name: "403 direct (no payment) — forwarded as 403 with real body",
behaviour: directStatus,
upstreamCode: http.StatusForbidden,
upstreamBody: "error code: 1010",
wantCode: http.StatusForbidden,
wantBodySub: "error code: 1010",
},
{
name: "404 direct — forwarded as 404",
behaviour: directStatus,
upstreamCode: http.StatusNotFound,
upstreamBody: "not found",
wantCode: http.StatusNotFound,
wantBodySub: "not found",
},
{
name: "503 direct — forwarded as 503, NOT remapped",
behaviour: directStatus,
upstreamCode: http.StatusServiceUnavailable,
upstreamBody: "upstream overloaded",
wantCode: http.StatusServiceUnavailable,
wantBodySub: "upstream overloaded",
},
{
name: "200 after 402 payment — forwarded as 200",
behaviour: afterPayment,
upstreamCode: http.StatusOK,
upstreamBody: `{"choices":[{"message":{"content":"paid ok"}}]}`,
wantCode: http.StatusOK,
wantBodySub: "paid ok",
},
{
name: "403 after 402 payment (WAF block) — forwarded as 403 with real body",
behaviour: afterPayment,
upstreamCode: http.StatusForbidden,
upstreamBody: "error code: 1010",
wantCode: http.StatusForbidden,
wantBodySub: "error code: 1010",
},
{
name: "500 after 402 payment — forwarded as 500, NOT remapped",
behaviour: afterPayment,
upstreamCode: http.StatusInternalServerError,
upstreamBody: "internal server error",
wantCode: http.StatusInternalServerError,
wantBodySub: "internal server error",
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch tt.behaviour {
case directStatus:
w.WriteHeader(tt.upstreamCode)
fmt.Fprint(w, tt.upstreamBody)
case afterPayment:
if r.Header.Get("X-Payment") == "" {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusPaymentRequired)
fmt.Fprint(w, paymentRequiredBody)
return
}
if tt.upstreamCode == http.StatusOK {
w.Header().Set("X-PAYMENT-RESPONSE", base64.StdEncoding.EncodeToString([]byte(
`{"success":true,"transaction":"0xtest","network":"base-sepolia","payer":"0xpayer"}`,
)))
}
w.WriteHeader(tt.upstreamCode)
fmt.Fprint(w, tt.upstreamBody)
}
}))
defer upstream.Close()

cfg := &Config{
Upstreams: map[string]UpstreamConfig{
"seller": {
URL: upstream.URL,
Network: "base-sepolia",
PayTo: "0x70997970C51812dc3A010C7d01b50e0d17dc79C8",
Asset: "0x036CbD53842c5426634e7929541eC2318f3dCF7e",
Price: "1000",
},
},
}
auths := AuthsFile{"seller": {makeAuth("0xsig-prop")}}

proxy, err := NewProxy(cfg, auths, nil)
if err != nil {
t.Fatalf("NewProxy: %v", err)
}

srv := httptest.NewServer(proxy)
defer srv.Close()

resp, err := http.Post(
srv.URL+"/upstream/seller/v1/chat/completions",
"application/json",
strings.NewReader(`{"model":"seller","messages":[{"role":"user","content":"hi"}]}`),
)
if err != nil {
t.Fatalf("request: %v", err)
}
defer resp.Body.Close()

body, _ := io.ReadAll(resp.Body)

if resp.StatusCode != tt.wantCode {
t.Errorf("status = %d, want %d (body: %s)", resp.StatusCode, tt.wantCode, string(body))
}

if tt.wantBodySub != "" && !strings.Contains(string(body), tt.wantBodySub) {
t.Errorf("body %q does not contain %q", string(body), tt.wantBodySub)
}
})
}
}

func TestSanitizeLogString(t *testing.T) {
tests := []struct {
name string
input string
want string
}{
{
name: "newline replaced",
input: "foo\nbar",
want: "foo_bar",
},
{
name: "carriage return replaced",
input: "foo\rbar",
want: "foo_bar",
},
{
name: "tab replaced",
input: "foo\tbar",
want: "foo_bar",
},
{
name: "null byte replaced",
input: "foo\x00bar",
want: "foo_bar",
},
{
name: "escape replaced",
input: "foo\x1bbar",
want: "foo_bar",
},
{
name: "DEL replaced",
input: "foo\x7fbar",
want: "foo_bar",
},
{
name: "multiple control chars",
input: "\r\ninjected-log-entry\r\n",
want: "__injected-log-entry__",
},
{
name: "printable ASCII unchanged",
input: "GET /v1/chat/completions HTTP/1.1",
want: "GET /v1/chat/completions HTTP/1.1",
},
{
name: "UTF-8 letters unchanged",
input: "café résumé",
want: "café résumé",
},
{
name: "empty string",
input: "",
want: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := sanitizeLogString(tt.input)
if got != tt.want {
t.Errorf("sanitizeLogString(%q) = %q, want %q", tt.input, got, tt.want)
}
})
}
}
Loading