|
|
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// TODO: turn off the serve goroutine when idle, so
// an idle conn only has the readFrames goroutine active. (which could
// also be optimized probably to pin less memory in crypto/tls). This
// would involve tracking when the serve goroutine is active (atomic
// int32 read/CAS probably?) and starting it up when frames arrive,
// and shutting it down when all handlers exit. the occasional PING
// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
// (which is a no-op if already running) and then queue the PING write
// as normal. The serve loop would then exit in most cases (if no
// Handlers running) and not be woken up again until the PING packet
// returns.
// TODO (maybe): add a mechanism for Handlers to going into
// half-closed-local mode (rw.(io.Closer) test?) but not exit their
// handler, and continue to be able to read from the
// Request.Body. This would be a somewhat semantic change from HTTP/1
// (or at least what we expose in net/http), so I'd probably want to
// add it there too. For now, this package says that returning from
// the Handler ServeHTTP function means you're both done reading and
// done writing, without a way to stop just one or the other.
package http2
import ( "bufio" "bytes" "crypto/tls" "errors" "fmt" "io" "log" "math" "net" "net/http" "net/textproto" "net/url" "os" "reflect" "runtime" "strconv" "strings" "sync" "time"
"golang.org/x/net/http2/hpack" )
const ( prefaceTimeout = 10 * time.Second firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
handlerChunkWriteSize = 4 << 10 defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
)
var ( errClientDisconnected = errors.New("client disconnected") errClosedBody = errors.New("body closed by handler") errHandlerComplete = errors.New("http2: request body closed due to handler exiting") errStreamClosed = errors.New("http2: stream closed") )
var responseWriterStatePool = sync.Pool{ New: func() interface{} { rws := &responseWriterState{} rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize) return rws }, }
// Test hooks.
var ( testHookOnConn func() testHookGetServerConn func(*serverConn) testHookOnPanicMu *sync.Mutex // nil except in tests
testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool) )
// Server is an HTTP/2 server.
type Server struct { // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
// which may run at a time over all connections.
// Negative or zero no limit.
// TODO: implement
MaxHandlers int
// MaxConcurrentStreams optionally specifies the number of
// concurrent streams that each client may have open at a
// time. This is unrelated to the number of http.Handler goroutines
// which may be active globally, which is MaxHandlers.
// If zero, MaxConcurrentStreams defaults to at least 100, per
// the HTTP/2 spec's recommendations.
MaxConcurrentStreams uint32
// MaxReadFrameSize optionally specifies the largest frame
// this server is willing to read. A valid value is between
// 16k and 16M, inclusive. If zero or otherwise invalid, a
// default value is used.
MaxReadFrameSize uint32
// PermitProhibitedCipherSuites, if true, permits the use of
// cipher suites prohibited by the HTTP/2 spec.
PermitProhibitedCipherSuites bool
// IdleTimeout specifies how long until idle clients should be
// closed with a GOAWAY frame. PING frames are not considered
// activity for the purposes of IdleTimeout.
IdleTimeout time.Duration
// MaxUploadBufferPerConnection is the size of the initial flow
// control window for each connections. The HTTP/2 spec does not
// allow this to be smaller than 65535 or larger than 2^32-1.
// If the value is outside this range, a default value will be
// used instead.
MaxUploadBufferPerConnection int32
// MaxUploadBufferPerStream is the size of the initial flow control
// window for each stream. The HTTP/2 spec does not allow this to
// be larger than 2^32-1. If the value is zero or larger than the
// maximum, a default value will be used instead.
MaxUploadBufferPerStream int32
// NewWriteScheduler constructs a write scheduler for a connection.
// If nil, a default scheduler is chosen.
NewWriteScheduler func() WriteScheduler
// Internal state. This is a pointer (rather than embedded directly)
// so that we don't embed a Mutex in this struct, which will make the
// struct non-copyable, which might break some callers.
state *serverInternalState }
func (s *Server) initialConnRecvWindowSize() int32 { if s.MaxUploadBufferPerConnection > initialWindowSize { return s.MaxUploadBufferPerConnection } return 1 << 20 }
func (s *Server) initialStreamRecvWindowSize() int32 { if s.MaxUploadBufferPerStream > 0 { return s.MaxUploadBufferPerStream } return 1 << 20 }
func (s *Server) maxReadFrameSize() uint32 { if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize { return v } return defaultMaxReadFrameSize }
func (s *Server) maxConcurrentStreams() uint32 { if v := s.MaxConcurrentStreams; v > 0 { return v } return defaultMaxStreams }
type serverInternalState struct { mu sync.Mutex activeConns map[*serverConn]struct{} }
func (s *serverInternalState) registerConn(sc *serverConn) { if s == nil { return // if the Server was used without calling ConfigureServer
} s.mu.Lock() s.activeConns[sc] = struct{}{} s.mu.Unlock() }
func (s *serverInternalState) unregisterConn(sc *serverConn) { if s == nil { return // if the Server was used without calling ConfigureServer
} s.mu.Lock() delete(s.activeConns, sc) s.mu.Unlock() }
func (s *serverInternalState) startGracefulShutdown() { if s == nil { return // if the Server was used without calling ConfigureServer
} s.mu.Lock() for sc := range s.activeConns { sc.startGracefulShutdown() } s.mu.Unlock() }
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
//
// ConfigureServer must be called before s begins serving.
func ConfigureServer(s *http.Server, conf *Server) error { if s == nil { panic("nil *http.Server") } if conf == nil { conf = new(Server) } conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})} if err := configureServer18(s, conf); err != nil { return err } if err := configureServer19(s, conf); err != nil { return err }
if s.TLSConfig == nil { s.TLSConfig = new(tls.Config) } else if s.TLSConfig.CipherSuites != nil { // If they already provided a CipherSuite list, return
// an error if it has a bad order or is missing
// ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256.
haveRequired := false sawBad := false for i, cs := range s.TLSConfig.CipherSuites { switch cs { case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, // Alternative MTI cipher to not discourage ECDSA-only servers.
// See http://golang.org/cl/30721 for further information.
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: haveRequired = true } if isBadCipher(cs) { sawBad = true } else if sawBad { return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs) } } if !haveRequired { return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.") } }
// Note: not setting MinVersion to tls.VersionTLS12,
// as we don't want to interfere with HTTP/1.1 traffic
// on the user's server. We enforce TLS 1.2 later once
// we accept a connection. Ideally this should be done
// during next-proto selection, but using TLS <1.2 with
// HTTP/2 is still the client's bug.
s.TLSConfig.PreferServerCipherSuites = true
haveNPN := false for _, p := range s.TLSConfig.NextProtos { if p == NextProtoTLS { haveNPN = true break } } if !haveNPN { s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS) }
if s.TLSNextProto == nil { s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){} } protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) { if testHookOnConn != nil { testHookOnConn() } conf.ServeConn(c, &ServeConnOpts{ Handler: h, BaseConfig: hs, }) } s.TLSNextProto[NextProtoTLS] = protoHandler return nil }
// ServeConnOpts are options for the Server.ServeConn method.
type ServeConnOpts struct { // BaseConfig optionally sets the base configuration
// for values. If nil, defaults are used.
BaseConfig *http.Server
// Handler specifies which handler to use for processing
// requests. If nil, BaseConfig.Handler is used. If BaseConfig
// or BaseConfig.Handler is nil, http.DefaultServeMux is used.
Handler http.Handler }
func (o *ServeConnOpts) baseConfig() *http.Server { if o != nil && o.BaseConfig != nil { return o.BaseConfig } return new(http.Server) }
func (o *ServeConnOpts) handler() http.Handler { if o != nil { if o.Handler != nil { return o.Handler } if o.BaseConfig != nil && o.BaseConfig.Handler != nil { return o.BaseConfig.Handler } } return http.DefaultServeMux }
// ServeConn serves HTTP/2 requests on the provided connection and
// blocks until the connection is no longer readable.
//
// ServeConn starts speaking HTTP/2 assuming that c has not had any
// reads or writes. It writes its initial settings frame and expects
// to be able to read the preface and settings frame from the
// client. If c has a ConnectionState method like a *tls.Conn, the
// ConnectionState is used to verify the TLS ciphersuite and to set
// the Request.TLS field in Handlers.
//
// ServeConn does not support h2c by itself. Any h2c support must be
// implemented in terms of providing a suitably-behaving net.Conn.
//
// The opts parameter is optional. If nil, default values are used.
func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { baseCtx, cancel := serverConnBaseContext(c, opts) defer cancel()
sc := &serverConn{ srv: s, hs: opts.baseConfig(), conn: c, baseCtx: baseCtx, remoteAddrStr: c.RemoteAddr().String(), bw: newBufferedWriter(c), handler: opts.handler(), streams: make(map[uint32]*stream), readFrameCh: make(chan readFrameResult), wantWriteFrameCh: make(chan FrameWriteRequest, 8), serveMsgCh: make(chan interface{}, 8), wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}), clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
advMaxStreams: s.maxConcurrentStreams(), initialStreamSendWindowSize: initialWindowSize, maxFrameSize: initialMaxFrameSize, headerTableSize: initialHeaderTableSize, serveG: newGoroutineLock(), pushEnabled: true, }
s.state.registerConn(sc) defer s.state.unregisterConn(sc)
// The net/http package sets the write deadline from the
// http.Server.WriteTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already set.
// Write deadlines are set per stream in serverConn.newStream.
// Disarm the net.Conn write deadline here.
if sc.hs.WriteTimeout != 0 { sc.conn.SetWriteDeadline(time.Time{}) }
if s.NewWriteScheduler != nil { sc.writeSched = s.NewWriteScheduler() } else { sc.writeSched = NewRandomWriteScheduler() }
// These start at the RFC-specified defaults. If there is a higher
// configured value for inflow, that will be updated when we send a
// WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(initialWindowSize) sc.inflow.add(initialWindowSize) sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
fr := NewFramer(sc.bw, c) fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) fr.MaxHeaderListSize = sc.maxHeaderListSize() fr.SetMaxReadFrameSize(s.maxReadFrameSize()) sc.framer = fr
if tc, ok := c.(connectionStater); ok { sc.tlsState = new(tls.ConnectionState) *sc.tlsState = tc.ConnectionState() // 9.2 Use of TLS Features
// An implementation of HTTP/2 over TLS MUST use TLS
// 1.2 or higher with the restrictions on feature set
// and cipher suite described in this section. Due to
// implementation limitations, it might not be
// possible to fail TLS negotiation. An endpoint MUST
// immediately terminate an HTTP/2 connection that
// does not meet the TLS requirements described in
// this section with a connection error (Section
// 5.4.1) of type INADEQUATE_SECURITY.
if sc.tlsState.Version < tls.VersionTLS12 { sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low") return }
if sc.tlsState.ServerName == "" { // Client must use SNI, but we don't enforce that anymore,
// since it was causing problems when connecting to bare IP
// addresses during development.
//
// TODO: optionally enforce? Or enforce at the time we receive
// a new request, and verify the the ServerName matches the :authority?
// But that precludes proxy situations, perhaps.
//
// So for now, do nothing here again.
}
if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) { // "Endpoints MAY choose to generate a connection error
// (Section 5.4.1) of type INADEQUATE_SECURITY if one of
// the prohibited cipher suites are negotiated."
//
// We choose that. In my opinion, the spec is weak
// here. It also says both parties must support at least
// TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no
// excuses here. If we really must, we could allow an
// "AllowInsecureWeakCiphers" option on the server later.
// Let's see how it plays out first.
sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) return } }
if hook := testHookGetServerConn; hook != nil { hook(sc) } sc.serve() }
func (sc *serverConn) rejectConn(err ErrCode, debug string) { sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) // ignoring errors. hanging up anyway.
sc.framer.WriteGoAway(0, err, []byte(debug)) sc.bw.Flush() sc.conn.Close() }
type serverConn struct { // Immutable:
srv *Server hs *http.Server conn net.Conn bw *bufferedWriter // writing to conn
handler http.Handler baseCtx contextContext framer *Framer doneServing chan struct{} // closed when serverConn.serve ends
readFrameCh chan readFrameResult // written by serverConn.readFrames
wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve
wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop
flow flow // conn-wide (not stream-specific) outbound flow control
inflow flow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
remoteAddrStr string writeSched WriteScheduler
// Everything following is owned by the serve loop; use serveG.check():
serveG goroutineLock // used to verify funcs are on serve()
pushEnabled bool sawFirstSettings bool // got the initial SETTINGS frame after the preface
needToSendSettingsAck bool unackedSettings int // how many SETTINGS have we sent without ACKs?
clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
curClientStreams uint32 // number of open streams initiated by the client
curPushedStreams uint32 // number of open streams initiated by server push
maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
streams map[uint32]*stream initialStreamSendWindowSize int32 maxFrameSize int32 headerTableSize uint32 peerMaxHeaderListSize uint32 // zero means unknown (default)
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
writingFrame bool // started writing a frame (on serve goroutine or separate)
writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
needsFrameFlush bool // last frame write wasn't a flush
inGoAway bool // we've started to or sent GOAWAY
inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode shutdownTimer *time.Timer // nil until used
idleTimer *time.Timer // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer hpackEncoder *hpack.Encoder
// Used by startGracefulShutdown.
shutdownOnce sync.Once }
func (sc *serverConn) maxHeaderListSize() uint32 { n := sc.hs.MaxHeaderBytes if n <= 0 { n = http.DefaultMaxHeaderBytes } // http2's count is in a slightly different unit and includes 32 bytes per pair.
// So, take the net/http.Server value and pad it up a bit, assuming 10 headers.
const perFieldOverhead = 32 // per http2 spec
const typicalHeaders = 10 // conservative
return uint32(n + typicalHeaders*perFieldOverhead) }
func (sc *serverConn) curOpenStreams() uint32 { sc.serveG.check() return sc.curClientStreams + sc.curPushedStreams }
// stream represents a stream. This is the minimal metadata needed by
// the serve goroutine. Most of the actual stream state is owned by
// the http.Handler's goroutine in the responseWriter. Because the
// responseWriter's responseWriterState is recycled at the end of a
// handler, this struct intentionally has no pointer to the
// *responseWriter{,State} itself, as the Handler ending nils out the
// responseWriter's state field.
type stream struct { // immutable:
sc *serverConn id uint32 body *pipe // non-nil if expecting DATA frames
cw closeWaiter // closed wait stream transitions to closed state
ctx contextContext cancelCtx func()
// owned by serverConn's serve loop:
bodyBytes int64 // body bytes seen so far
declBodyBytes int64 // or -1 if undeclared
flow flow // limits writing from Handler to client
inflow flow // what the client is allowed to POST/etc to us
parent *stream // or nil
numTrailerValues int64 weight uint8 state streamState resetQueued bool // RST_STREAM queued for write; set by sc.resetStream
gotTrailerHeader bool // HEADER frame for trailers was seen
wroteHeaders bool // whether we wrote headers (not status 100)
writeDeadline *time.Timer // nil if unused
trailer http.Header // accumulated trailers
reqTrailer http.Header // handler's Request.Trailer
}
func (sc *serverConn) Framer() *Framer { return sc.framer } func (sc *serverConn) CloseConn() error { return sc.conn.Close() } func (sc *serverConn) Flush() error { return sc.bw.Flush() } func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { return sc.hpackEncoder, &sc.headerWriteBuf }
func (sc *serverConn) state(streamID uint32) (streamState, *stream) { sc.serveG.check() // http://tools.ietf.org/html/rfc7540#section-5.1
if st, ok := sc.streams[streamID]; ok { return st.state, st } // "The first use of a new stream identifier implicitly closes all
// streams in the "idle" state that might have been initiated by
// that peer with a lower-valued stream identifier. For example, if
// a client sends a HEADERS frame on stream 7 without ever sending a
// frame on stream 5, then stream 5 transitions to the "closed"
// state when the first frame for stream 7 is sent or received."
if streamID%2 == 1 { if streamID <= sc.maxClientStreamID { return stateClosed, nil } } else { if streamID <= sc.maxPushPromiseID { return stateClosed, nil } } return stateIdle, nil }
// setConnState calls the net/http ConnState hook for this connection, if configured.
// Note that the net/http package does StateNew and StateClosed for us.
// There is currently no plan for StateHijacked or hijacking HTTP/2 connections.
func (sc *serverConn) setConnState(state http.ConnState) { if sc.hs.ConnState != nil { sc.hs.ConnState(sc.conn, state) } }
func (sc *serverConn) vlogf(format string, args ...interface{}) { if VerboseLogs { sc.logf(format, args...) } }
func (sc *serverConn) logf(format string, args ...interface{}) { if lg := sc.hs.ErrorLog; lg != nil { lg.Printf(format, args...) } else { log.Printf(format, args...) } }
// errno returns v's underlying uintptr, else 0.
//
// TODO: remove this helper function once http2 can use build
// tags. See comment in isClosedConnError.
func errno(v error) uintptr { if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr { return uintptr(rv.Uint()) } return 0 }
// isClosedConnError reports whether err is an error from use of a closed
// network connection.
func isClosedConnError(err error) bool { if err == nil { return false }
// TODO: remove this string search and be more like the Windows
// case below. That might involve modifying the standard library
// to return better error types.
str := err.Error() if strings.Contains(str, "use of closed network connection") { return true }
// TODO(bradfitz): x/tools/cmd/bundle doesn't really support
// build tags, so I can't make an http2_windows.go file with
// Windows-specific stuff. Fix that and move this, once we
// have a way to bundle this into std's net/http somehow.
if runtime.GOOS == "windows" { if oe, ok := err.(*net.OpError); ok && oe.Op == "read" { if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" { const WSAECONNABORTED = 10053 const WSAECONNRESET = 10054 if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED { return true } } } } return false }
func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { if err == nil { return } if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout { // Boring, expected errors.
sc.vlogf(format, args...) } else { sc.logf(format, args...) } }
func (sc *serverConn) canonicalHeader(v string) string { sc.serveG.check() cv, ok := commonCanonHeader[v] if ok { return cv } cv, ok = sc.canonHeader[v] if ok { return cv } if sc.canonHeader == nil { sc.canonHeader = make(map[string]string) } cv = http.CanonicalHeaderKey(v) sc.canonHeader[v] = cv return cv }
type readFrameResult struct { f Frame // valid until readMore is called
err error
// readMore should be called once the consumer no longer needs or
// retains f. After readMore, f is invalid and more frames can be
// read.
readMore func() }
// readFrames is the loop that reads incoming frames.
// It takes care to only read one frame at a time, blocking until the
// consumer is done with the frame.
// It's run on its own goroutine.
func (sc *serverConn) readFrames() { gate := make(gate) gateDone := gate.Done for { f, err := sc.framer.ReadFrame() select { case sc.readFrameCh <- readFrameResult{f, err, gateDone}: case <-sc.doneServing: return } select { case <-gate: case <-sc.doneServing: return } if terminalReadFrameError(err) { return } } }
// frameWriteResult is the message passed from writeFrameAsync to the serve goroutine.
type frameWriteResult struct { wr FrameWriteRequest // what was written (or attempted)
err error // result of the writeFrame call
}
// writeFrameAsync runs in its own goroutine and writes a single frame
// and then reports when it's done.
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) { err := wr.write.writeFrame(sc) sc.wroteFrameCh <- frameWriteResult{wr, err} }
func (sc *serverConn) closeAllStreamsOnConnClose() { sc.serveG.check() for _, st := range sc.streams { sc.closeStream(st, errClientDisconnected) } }
func (sc *serverConn) stopShutdownTimer() { sc.serveG.check() if t := sc.shutdownTimer; t != nil { t.Stop() } }
func (sc *serverConn) notePanic() { // Note: this is for serverConn.serve panicking, not http.Handler code.
if testHookOnPanicMu != nil { testHookOnPanicMu.Lock() defer testHookOnPanicMu.Unlock() } if testHookOnPanic != nil { if e := recover(); e != nil { if testHookOnPanic(sc, e) { panic(e) } } } }
func (sc *serverConn) serve() { sc.serveG.check() defer sc.notePanic() defer sc.conn.Close() defer sc.closeAllStreamsOnConnClose() defer sc.stopShutdownTimer() defer close(sc.doneServing) // unblocks handlers trying to send
if VerboseLogs { sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) }
sc.writeFrame(FrameWriteRequest{ write: writeSettings{ {SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, {SettingMaxConcurrentStreams, sc.advMaxStreams}, {SettingMaxHeaderListSize, sc.maxHeaderListSize()}, {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, }, }) sc.unackedSettings++
// Each connection starts with intialWindowSize inflow tokens.
// If a higher value is configured, we add more tokens.
if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 { sc.sendWindowUpdate(nil, int(diff)) }
if err := sc.readPreface(); err != nil { sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) return } // Now that we've got the preface, get us out of the
// "StateNew" state. We can't go directly to idle, though.
// Active means we read some data and anticipate a request. We'll
// do another Active when we get a HEADERS frame.
sc.setConnState(http.StateActive) sc.setConnState(http.StateIdle)
if sc.srv.IdleTimeout != 0 { sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) defer sc.idleTimer.Stop() }
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer) defer settingsTimer.Stop()
loopNum := 0 for { loopNum++ select { case wr := <-sc.wantWriteFrameCh: if se, ok := wr.write.(StreamError); ok { sc.resetStream(se) break } sc.writeFrame(wr) case res := <-sc.wroteFrameCh: sc.wroteFrame(res) case res := <-sc.readFrameCh: if !sc.processFrameFromReader(res) { return } res.readMore() if settingsTimer != nil { settingsTimer.Stop() settingsTimer = nil } case m := <-sc.bodyReadCh: sc.noteBodyRead(m.st, m.n) case msg := <-sc.serveMsgCh: switch v := msg.(type) { case func(int): v(loopNum) // for testing
case *serverMessage: switch v { case settingsTimerMsg: sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) return case idleTimerMsg: sc.vlogf("connection is idle") sc.goAway(ErrCodeNo) case shutdownTimerMsg: sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) return case gracefulShutdownMsg: sc.startGracefulShutdownInternal() default: panic("unknown timer") } case *startPushRequest: sc.startPush(v) default: panic(fmt.Sprintf("unexpected type %T", v)) } }
// Start the shutdown timer after sending a GOAWAY. When sending GOAWAY
// with no error code (graceful shutdown), don't start the timer until
// all open streams have been completed.
sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0 if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) { sc.shutDownIn(goAwayTimeout) } } }
func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { select { case <-sc.doneServing: case <-sharedCh: close(privateCh) } }
type serverMessage int
// Message values sent to serveMsgCh.
var ( settingsTimerMsg = new(serverMessage) idleTimerMsg = new(serverMessage) shutdownTimerMsg = new(serverMessage) gracefulShutdownMsg = new(serverMessage) )
func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) } func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) }
func (sc *serverConn) sendServeMsg(msg interface{}) { sc.serveG.checkNotOn() // NOT
select { case sc.serveMsgCh <- msg: case <-sc.doneServing: } }
var errPrefaceTimeout = errors.New("timeout waiting for client preface")
// readPreface reads the ClientPreface greeting from the peer or
// returns errPrefaceTimeout on timeout, or an error if the greeting
// is invalid.
func (sc *serverConn) readPreface() error { errc := make(chan error, 1) go func() { // Read the client preface
buf := make([]byte, len(ClientPreface)) if _, err := io.ReadFull(sc.conn, buf); err != nil { errc <- err } else if !bytes.Equal(buf, clientPreface) { errc <- fmt.Errorf("bogus greeting %q", buf) } else { errc <- nil } }() timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop() select { case <-timer.C: return errPrefaceTimeout case err := <-errc: if err == nil { if VerboseLogs { sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr()) } } return err } }
var errChanPool = sync.Pool{ New: func() interface{} { return make(chan error, 1) }, }
var writeDataPool = sync.Pool{ New: func() interface{} { return new(writeData) }, }
// writeDataFromHandler writes DATA response frames from a handler on
// the given stream.
func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error { ch := errChanPool.Get().(chan error) writeArg := writeDataPool.Get().(*writeData) *writeArg = writeData{stream.id, data, endStream} err := sc.writeFrameFromHandler(FrameWriteRequest{ write: writeArg, stream: stream, done: ch, }) if err != nil { return err } var frameWriteDone bool // the frame write is done (successfully or not)
select { case err = <-ch: frameWriteDone = true case <-sc.doneServing: return errClientDisconnected case <-stream.cw: // If both ch and stream.cw were ready (as might
// happen on the final Write after an http.Handler
// ends), prefer the write result. Otherwise this
// might just be us successfully closing the stream.
// The writeFrameAsync and serve goroutines guarantee
// that the ch send will happen before the stream.cw
// close.
select { case err = <-ch: frameWriteDone = true default: return errStreamClosed } } errChanPool.Put(ch) if frameWriteDone { writeDataPool.Put(writeArg) } return err }
// writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts
// if the connection has gone away.
//
// This must not be run from the serve goroutine itself, else it might
// deadlock writing to sc.wantWriteFrameCh (which is only mildly
// buffered and is read by serve itself). If you're on the serve
// goroutine, call writeFrame instead.
func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error { sc.serveG.checkNotOn() // NOT
select { case sc.wantWriteFrameCh <- wr: return nil case <-sc.doneServing: // Serve loop is gone.
// Client has closed their connection to the server.
return errClientDisconnected } }
// writeFrame schedules a frame to write and sends it if there's nothing
// already being written.
//
// There is no pushback here (the serve goroutine never blocks). It's
// the http.Handlers that block, waiting for their previous frames to
// make it onto the wire
//
// If you're not on the serve goroutine, use writeFrameFromHandler instead.
func (sc *serverConn) writeFrame(wr FrameWriteRequest) { sc.serveG.check()
// If true, wr will not be written and wr.done will not be signaled.
var ignoreWrite bool
// We are not allowed to write frames on closed streams. RFC 7540 Section
// 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on
// a closed stream." Our server never sends PRIORITY, so that exception
// does not apply.
//
// The serverConn might close an open stream while the stream's handler
// is still running. For example, the server might close a stream when it
// receives bad data from the client. If this happens, the handler might
// attempt to write a frame after the stream has been closed (since the
// handler hasn't yet been notified of the close). In this case, we simply
// ignore the frame. The handler will notice that the stream is closed when
// it waits for the frame to be written.
//
// As an exception to this rule, we allow sending RST_STREAM after close.
// This allows us to immediately reject new streams without tracking any
// state for those streams (except for the queued RST_STREAM frame). This
// may result in duplicate RST_STREAMs in some cases, but the client should
// ignore those.
if wr.StreamID() != 0 { _, isReset := wr.write.(StreamError) if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset { ignoreWrite = true } }
// Don't send a 100-continue response if we've already sent headers.
// See golang.org/issue/14030.
switch wr.write.(type) { case *writeResHeaders: wr.stream.wroteHeaders = true case write100ContinueHeadersFrame: if wr.stream.wroteHeaders { // We do not need to notify wr.done because this frame is
// never written with wr.done != nil.
if wr.done != nil { panic("wr.done != nil for write100ContinueHeadersFrame") } ignoreWrite = true } }
if !ignoreWrite { sc.writeSched.Push(wr) } sc.scheduleFrameWrite() }
// startFrameWrite starts a goroutine to write wr (in a separate
// goroutine since that might block on the network), and updates the
// serve goroutine's state about the world, updated from info in wr.
func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { sc.serveG.check() if sc.writingFrame { panic("internal error: can only be writing one frame at a time") }
st := wr.stream if st != nil { switch st.state { case stateHalfClosedLocal: switch wr.write.(type) { case StreamError, handlerPanicRST, writeWindowUpdate: // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE
// in this state. (We never send PRIORITY from the server, so that is not checked.)
default: panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) } case stateClosed: panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr)) } } if wpp, ok := wr.write.(*writePushPromise); ok { var err error wpp.promisedID, err = wpp.allocatePromisedID() if err != nil { sc.writingFrameAsync = false wr.replyToWriter(err) return } }
sc.writingFrame = true sc.needsFrameFlush = true if wr.write.staysWithinBuffer(sc.bw.Available()) { sc.writingFrameAsync = false err := wr.write.writeFrame(sc) sc.wroteFrame(frameWriteResult{wr, err}) } else { sc.writingFrameAsync = true go sc.writeFrameAsync(wr) } }
// errHandlerPanicked is the error given to any callers blocked in a read from
// Request.Body when the main goroutine panics. Since most handlers read in the
// the main ServeHTTP goroutine, this will show up rarely.
var errHandlerPanicked = errors.New("http2: handler panicked")
// wroteFrame is called on the serve goroutine with the result of
// whatever happened on writeFrameAsync.
func (sc *serverConn) wroteFrame(res frameWriteResult) { sc.serveG.check() if !sc.writingFrame { panic("internal error: expected to be already writing a frame") } sc.writingFrame = false sc.writingFrameAsync = false
wr := res.wr
if writeEndsStream(wr.write) { st := wr.stream if st == nil { panic("internal error: expecting non-nil stream") } switch st.state { case stateOpen: // Here we would go to stateHalfClosedLocal in
// theory, but since our handler is done and
// the net/http package provides no mechanism
// for closing a ResponseWriter while still
// reading data (see possible TODO at top of
// this file), we go into closed state here
// anyway, after telling the peer we're
// hanging up on them. We'll transition to
// stateClosed after the RST_STREAM frame is
// written.
st.state = stateHalfClosedLocal // Section 8.1: a server MAY request that the client abort
// transmission of a request without error by sending a
// RST_STREAM with an error code of NO_ERROR after sending
// a complete response.
sc.resetStream(streamError(st.id, ErrCodeNo)) case stateHalfClosedRemote: sc.closeStream(st, errHandlerComplete) } } else { switch v := wr.write.(type) { case StreamError: // st may be unknown if the RST_STREAM was generated to reject bad input.
if st, ok := sc.streams[v.StreamID]; ok { sc.closeStream(st, v) } case handlerPanicRST: sc.closeStream(wr.stream, errHandlerPanicked) } }
// Reply (if requested) to unblock the ServeHTTP goroutine.
wr.replyToWriter(res.err)
sc.scheduleFrameWrite() }
// scheduleFrameWrite tickles the frame writing scheduler.
//
// If a frame is already being written, nothing happens. This will be called again
// when the frame is done being written.
//
// If a frame isn't being written we need to send one, the best frame
// to send is selected, preferring first things that aren't
// stream-specific (e.g. ACKing settings), and then finding the
// highest priority stream.
//
// If a frame isn't being written and there's nothing else to send, we
// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() { sc.serveG.check() if sc.writingFrame || sc.inFrameScheduleLoop { return } sc.inFrameScheduleLoop = true for !sc.writingFrameAsync { if sc.needToSendGoAway { sc.needToSendGoAway = false sc.startFrameWrite(FrameWriteRequest{ write: &writeGoAway{ maxStreamID: sc.maxClientStreamID, code: sc.goAwayCode, }, }) continue } if sc.needToSendSettingsAck { sc.needToSendSettingsAck = false sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}}) continue } if !sc.inGoAway || sc.goAwayCode == ErrCodeNo { if wr, ok := sc.writeSched.Pop(); ok { sc.startFrameWrite(wr) continue } } if sc.needsFrameFlush { sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}}) sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
continue } break } sc.inFrameScheduleLoop = false }
// startGracefulShutdown gracefully shuts down a connection. This
// sends GOAWAY with ErrCodeNo to tell the client we're gracefully
// shutting down. The connection isn't closed until all current
// streams are done.
//
// startGracefulShutdown returns immediately; it does not wait until
// the connection has shut down.
func (sc *serverConn) startGracefulShutdown() { sc.serveG.checkNotOn() // NOT
sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) }) }
// After sending GOAWAY, the connection will close after goAwayTimeout.
// If we close the connection immediately after sending GOAWAY, there may
// be unsent data in our kernel receive buffer, which will cause the kernel
// to send a TCP RST on close() instead of a FIN. This RST will abort the
// connection immediately, whether or not the client had received the GOAWAY.
//
// Ideally we should delay for at least 1 RTT + epsilon so the client has
// a chance to read the GOAWAY and stop sending messages. Measuring RTT
// is hard, so we approximate with 1 second. See golang.org/issue/18701.
//
// This is a var so it can be shorter in tests, where all requests uses the
// loopback interface making the expected RTT very small.
//
// TODO: configurable?
var goAwayTimeout = 1 * time.Second
func (sc *serverConn) startGracefulShutdownInternal() { sc.goAway(ErrCodeNo) }
func (sc *serverConn) goAway(code ErrCode) { sc.serveG.check() if sc.inGoAway { return } sc.inGoAway = true sc.needToSendGoAway = true sc.goAwayCode = code sc.scheduleFrameWrite() }
func (sc *serverConn) shutDownIn(d time.Duration) { sc.serveG.check() sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) }
func (sc *serverConn) resetStream(se StreamError) { sc.serveG.check() sc.writeFrame(FrameWriteRequest{write: se}) if st, ok := sc.streams[se.StreamID]; ok { st.resetQueued = true } }
// processFrameFromReader processes the serve loop's read from readFrameCh from the
// frame-reading goroutine.
// processFrameFromReader returns whether the connection should be kept open.
func (sc *serverConn) processFrameFromReader(res readFrameResult) bool { sc.serveG.check() err := res.err if err != nil { if err == ErrFrameTooLarge { sc.goAway(ErrCodeFrameSize) return true // goAway will close the loop
} clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) if clientGone { // TODO: could we also get into this state if
// the peer does a half close
// (e.g. CloseWrite) because they're done
// sending frames but they're still wanting
// our open replies? Investigate.
// TODO: add CloseWrite to crypto/tls.Conn first
// so we have a way to test this? I suppose
// just for testing we could have a non-TLS mode.
return false } } else { f := res.f if VerboseLogs { sc.vlogf("http2: server read frame %v", summarizeFrame(f)) } err = sc.processFrame(f) if err == nil { return true } }
switch ev := err.(type) { case StreamError: sc.resetStream(ev) return true case goAwayFlowError: sc.goAway(ErrCodeFlowControl) return true case ConnectionError: sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev) sc.goAway(ErrCode(ev)) return true // goAway will handle shutdown
default: if res.err != nil { sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) } else { sc.logf("http2: server closing client connection: %v", err) } return false } }
func (sc *serverConn) processFrame(f Frame) error { sc.serveG.check()
// First frame received must be SETTINGS.
if !sc.sawFirstSettings { if _, ok := f.(*SettingsFrame); !ok { return ConnectionError(ErrCodeProtocol) } sc.sawFirstSettings = true }
switch f := f.(type) { case *SettingsFrame: return sc.processSettings(f) case *MetaHeadersFrame: return sc.processHeaders(f) case *WindowUpdateFrame: return sc.processWindowUpdate(f) case *PingFrame: return sc.processPing(f) case *DataFrame: return sc.processData(f) case *RSTStreamFrame: return sc.processResetStream(f) case *PriorityFrame: return sc.processPriority(f) case *GoAwayFrame: return sc.processGoAway(f) case *PushPromiseFrame: // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE
// frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
return ConnectionError(ErrCodeProtocol) default: sc.vlogf("http2: server ignoring frame: %v", f.Header()) return nil } }
func (sc *serverConn) processPing(f *PingFrame) error { sc.serveG.check() if f.IsAck() { // 6.7 PING: " An endpoint MUST NOT respond to PING frames
// containing this flag."
return nil } if f.StreamID != 0 { // "PING frames are not associated with any individual
// stream. If a PING frame is received with a stream
// identifier field value other than 0x0, the recipient MUST
// respond with a connection error (Section 5.4.1) of type
// PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol) } if sc.inGoAway && sc.goAwayCode != ErrCodeNo { return nil } sc.writeFrame(FrameWriteRequest{write: writePingAck{f}}) return nil }
func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error { sc.serveG.check() switch { case f.StreamID != 0: // stream-level flow control
state, st := sc.state(f.StreamID) if state == stateIdle { // Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol) } if st == nil { // "WINDOW_UPDATE can be sent by a peer that has sent a
// frame bearing the END_STREAM flag. This means that a
// receiver could receive a WINDOW_UPDATE frame on a "half
// closed (remote)" or "closed" stream. A receiver MUST
// NOT treat this as an error, see Section 5.1."
return nil } if !st.flow.add(int32(f.Increment)) { return streamError(f.StreamID, ErrCodeFlowControl) } default: // connection-level flow control
if !sc.flow.add(int32(f.Increment)) { return goAwayFlowError{} } } sc.scheduleFrameWrite() return nil }
func (sc *serverConn) processResetStream(f *RSTStreamFrame) error { sc.serveG.check()
state, st := sc.state(f.StreamID) if state == stateIdle { // 6.4 "RST_STREAM frames MUST NOT be sent for a
// stream in the "idle" state. If a RST_STREAM frame
// identifying an idle stream is received, the
// recipient MUST treat this as a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
return ConnectionError(ErrCodeProtocol) } if st != nil { st.cancelCtx() sc.closeStream(st, streamError(f.StreamID, f.ErrCode)) } return nil }
func (sc *serverConn) closeStream(st *stream, err error) { sc.serveG.check() if st.state == stateIdle || st.state == stateClosed { panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) } st.state = stateClosed if st.writeDeadline != nil { st.writeDeadline.Stop() } if st.isPushed() { sc.curPushedStreams-- } else { sc.curClientStreams-- } delete(sc.streams, st.id) if len(sc.streams) == 0 { sc.setConnState(http.StateIdle) if sc.srv.IdleTimeout != 0 { sc.idleTimer.Reset(sc.srv.IdleTimeout) } if h1ServerKeepAlivesDisabled(sc.hs) { sc.startGracefulShutdownInternal() } } if p := st.body; p != nil { // Return any buffered unread bytes worth of conn-level flow control.
// See golang.org/issue/16481
sc.sendWindowUpdate(nil, p.Len())
p.CloseWithError(err) } st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.CloseStream(st.id) }
func (sc *serverConn) processSettings(f *SettingsFrame) error { sc.serveG.check() if f.IsAck() { sc.unackedSettings-- if sc.unackedSettings < 0 { // Why is the peer ACKing settings we never sent?
// The spec doesn't mention this case, but
// hang up on them anyway.
return ConnectionError(ErrCodeProtocol) } return nil } if err := f.ForeachSetting(sc.processSetting); err != nil { return err } sc.needToSendSettingsAck = true sc.scheduleFrameWrite() return nil }
func (sc *serverConn) processSetting(s Setting) error { sc.serveG.check() if err := s.Valid(); err != nil { return err } if VerboseLogs { sc.vlogf("http2: server processing setting %v", s) } switch s.ID { case SettingHeaderTableSize: sc.headerTableSize = s.Val sc.hpackEncoder.SetMaxDynamicTableSize(s.Val) case SettingEnablePush: sc.pushEnabled = s.Val != 0 case SettingMaxConcurrentStreams: sc.clientMaxStreams = s.Val case SettingInitialWindowSize: return sc.processSettingInitialWindowSize(s.Val) case SettingMaxFrameSize: sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31
case SettingMaxHeaderListSize: sc.peerMaxHeaderListSize = s.Val default: // Unknown setting: "An endpoint that receives a SETTINGS
// frame with any unknown or unsupported identifier MUST
// ignore that setting."
if VerboseLogs { sc.vlogf("http2: server ignoring unknown setting %v", s) } } return nil }
func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { sc.serveG.check() // Note: val already validated to be within range by
// processSetting's Valid call.
// "A SETTINGS frame can alter the initial flow control window
// size for all current streams. When the value of
// SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
// adjust the size of all stream flow control windows that it
// maintains by the difference between the new value and the
// old value."
old := sc.initialStreamSendWindowSize sc.initialStreamSendWindowSize = int32(val) growth := int32(val) - old // may be negative
for _, st := range sc.streams { if !st.flow.add(growth) { // 6.9.2 Initial Flow Control Window Size
// "An endpoint MUST treat a change to
// SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
// control window to exceed the maximum size as a
// connection error (Section 5.4.1) of type
// FLOW_CONTROL_ERROR."
return ConnectionError(ErrCodeFlowControl) } } return nil }
func (sc *serverConn) processData(f *DataFrame) error { sc.serveG.check() if sc.inGoAway && sc.goAwayCode != ErrCodeNo { return nil } data := f.Data()
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
id := f.Header().StreamID state, st := sc.state(id) if id == 0 || state == stateIdle { // Section 5.1: "Receiving any frame other than HEADERS
// or PRIORITY on a stream in this state MUST be
// treated as a connection error (Section 5.4.1) of
// type PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol) } if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued { // This includes sending a RST_STREAM if the stream is
// in stateHalfClosedLocal (which currently means that
// the http.Handler returned, so it's done reading &
// done writing). Try to stop the client from sending
// more DATA.
// But still enforce their connection-level flow control,
// and return any flow control bytes since we're not going
// to consume them.
if sc.inflow.available() < int32(f.Length) { return streamError(id, ErrCodeFlowControl) } // Deduct the flow control from inflow, since we're
// going to immediately add it back in
// sendWindowUpdate, which also schedules sending the
// frames.
sc.inflow.take(int32(f.Length)) sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
if st != nil && st.resetQueued { // Already have a stream error in flight. Don't send another.
return nil } return streamError(id, ErrCodeStreamClosed) } if st.body == nil { panic("internal error: should have a body in this state") }
// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) return streamError(id, ErrCodeStreamClosed) } if f.Length > 0 { // Check whether the client has flow control quota.
if st.inflow.available() < int32(f.Length) { return streamError(id, ErrCodeFlowControl) } st.inflow.take(int32(f.Length))
if len(data) > 0 { wrote, err := st.body.Write(data) if err != nil { return streamError(id, ErrCodeStreamClosed) } if wrote != len(data) { panic("internal error: bad Writer") } st.bodyBytes += int64(len(data)) }
// Return any padded flow control now, since we won't
// refund it later on body reads.
if pad := int32(f.Length) - int32(len(data)); pad > 0 { sc.sendWindowUpdate32(nil, pad) sc.sendWindowUpdate32(st, pad) } } if f.StreamEnded() { st.endStream() } return nil }
func (sc *serverConn) processGoAway(f *GoAwayFrame) error { sc.serveG.check() if f.ErrCode != ErrCodeNo { sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f) } else { sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) } sc.startGracefulShutdownInternal() // http://tools.ietf.org/html/rfc7540#section-6.8
// We should not create any new streams, which means we should disable push.
sc.pushEnabled = false return nil }
// isPushed reports whether the stream is server-initiated.
func (st *stream) isPushed() bool { return st.id%2 == 0 }
// endStream closes a Request.Body's pipe. It is called when a DATA
// frame says a request body is over (or after trailers).
func (st *stream) endStream() { sc := st.sc sc.serveG.check()
if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", st.declBodyBytes, st.bodyBytes)) } else { st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest) st.body.CloseWithError(io.EOF) } st.state = stateHalfClosedRemote }
// copyTrailersToHandlerRequest is run in the Handler's goroutine in
// its Request.Body.Read just before it gets io.EOF.
func (st *stream) copyTrailersToHandlerRequest() { for k, vv := range st.trailer { if _, ok := st.reqTrailer[k]; ok { // Only copy it over it was pre-declared.
st.reqTrailer[k] = vv } } }
// onWriteTimeout is run on its own goroutine (from time.AfterFunc)
// when the stream's WriteTimeout has fired.
func (st *stream) onWriteTimeout() { st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)}) }
func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { sc.serveG.check() id := f.StreamID if sc.inGoAway { // Ignore.
return nil } // http://tools.ietf.org/html/rfc7540#section-5.1.1
// Streams initiated by a client MUST use odd-numbered stream
// identifiers. [...] An endpoint that receives an unexpected
// stream identifier MUST respond with a connection error
// (Section 5.4.1) of type PROTOCOL_ERROR.
if id%2 != 1 { return ConnectionError(ErrCodeProtocol) } // A HEADERS frame can be used to create a new stream or
// send a trailer for an open one. If we already have a stream
// open, let it process its own HEADERS frame (trailers at this
// point, if it's valid).
if st := sc.streams[f.StreamID]; st != nil { if st.resetQueued { // We're sending RST_STREAM to close the stream, so don't bother
// processing this frame.
return nil } return st.processTrailerHeaders(f) }
// [...] The identifier of a newly established stream MUST be
// numerically greater than all streams that the initiating
// endpoint has opened or reserved. [...] An endpoint that
// receives an unexpected stream identifier MUST respond with
// a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
if id <= sc.maxClientStreamID { return ConnectionError(ErrCodeProtocol) } sc.maxClientStreamID = id
if sc.idleTimer != nil { sc.idleTimer.Stop() }
// http://tools.ietf.org/html/rfc7540#section-5.1.2
// [...] Endpoints MUST NOT exceed the limit set by their peer. An
// endpoint that receives a HEADERS frame that causes their
// advertised concurrent stream limit to be exceeded MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR
// or REFUSED_STREAM.
if sc.curClientStreams+1 > sc.advMaxStreams { if sc.unackedSettings == 0 { // They should know better.
return streamError(id, ErrCodeProtocol) } // Assume it's a network race, where they just haven't
// received our last SETTINGS update. But actually
// this can't happen yet, because we don't yet provide
// a way for users to adjust server parameters at
// runtime.
return streamError(id, ErrCodeRefusedStream) }
initialState := stateOpen if f.StreamEnded() { initialState = stateHalfClosedRemote } st := sc.newStream(id, 0, initialState)
if f.HasPriority() { if err := checkPriority(f.StreamID, f.Priority); err != nil { return err } sc.writeSched.AdjustStream(st.id, f.Priority) }
rw, req, err := sc.newWriterAndRequest(st, f) if err != nil { return err } st.reqTrailer = req.Trailer if st.reqTrailer != nil { st.trailer = make(http.Header) } st.body = req.Body.(*requestBody).pipe // may be nil
st.declBodyBytes = req.ContentLength
handler := sc.handler.ServeHTTP if f.Truncated { // Their header list was too long. Send a 431 error.
handler = handleHeaderListTooLong } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil { handler = new400Handler(err) }
// The net/http package sets the read deadline from the
// http.Server.ReadTimeout during the TLS handshake, but then
// passes the connection off to us with the deadline already
// set. Disarm it here after the request headers are read,
// similar to how the http1 server works. Here it's
// technically more like the http1 Server's ReadHeaderTimeout
// (in Go 1.8), though. That's a more sane option anyway.
if sc.hs.ReadTimeout != 0 { sc.conn.SetReadDeadline(time.Time{}) }
go sc.runHandler(rw, req, handler) return nil }
func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error { sc := st.sc sc.serveG.check() if st.gotTrailerHeader { return ConnectionError(ErrCodeProtocol) } st.gotTrailerHeader = true if !f.StreamEnded() { return streamError(st.id, ErrCodeProtocol) }
if len(f.PseudoFields()) > 0 { return streamError(st.id, ErrCodeProtocol) } if st.trailer != nil { for _, hf := range f.RegularFields() { key := sc.canonicalHeader(hf.Name) if !ValidTrailerHeader(key) { // TODO: send more details to the peer somehow. But http2 has
// no way to send debug data at a stream level. Discuss with
// HTTP folk.
return streamError(st.id, ErrCodeProtocol) } st.trailer[key] = append(st.trailer[key], hf.Value) } } st.endStream() return nil }
func checkPriority(streamID uint32, p PriorityParam) error { if streamID == p.StreamDep { // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat
// this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR."
// Section 5.3.3 says that a stream can depend on one of its dependencies,
// so it's only self-dependencies that are forbidden.
return streamError(streamID, ErrCodeProtocol) } return nil }
func (sc *serverConn) processPriority(f *PriorityFrame) error { if sc.inGoAway { return nil } if err := checkPriority(f.StreamID, f.PriorityParam); err != nil { return err } sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam) return nil }
func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream { sc.serveG.check() if id == 0 { panic("internal error: cannot create stream with id 0") }
ctx, cancelCtx := contextWithCancel(sc.baseCtx) st := &stream{ sc: sc, id: id, state: state, ctx: ctx, cancelCtx: cancelCtx, } st.cw.Init() st.flow.conn = &sc.flow // link to conn-level counter
st.flow.add(sc.initialStreamSendWindowSize) st.inflow.conn = &sc.inflow // link to conn-level counter
st.inflow.add(sc.srv.initialStreamRecvWindowSize()) if sc.hs.WriteTimeout != 0 { st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) }
sc.streams[id] = st sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID}) if st.isPushed() { sc.curPushedStreams++ } else { sc.curClientStreams++ } if sc.curOpenStreams() == 1 { sc.setConnState(http.StateActive) }
return st }
func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) { sc.serveG.check()
rp := requestParam{ method: f.PseudoValue("method"), scheme: f.PseudoValue("scheme"), authority: f.PseudoValue("authority"), path: f.PseudoValue("path"), }
isConnect := rp.method == "CONNECT" if isConnect { if rp.path != "" || rp.scheme != "" || rp.authority == "" { return nil, nil, streamError(f.StreamID, ErrCodeProtocol) } } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { // See 8.1.2.6 Malformed Requests and Responses:
//
// Malformed requests or responses that are detected
// MUST be treated as a stream error (Section 5.4.2)
// of type PROTOCOL_ERROR."
//
// 8.1.2.3 Request Pseudo-Header Fields
// "All HTTP/2 requests MUST include exactly one valid
// value for the :method, :scheme, and :path
// pseudo-header fields"
return nil, nil, streamError(f.StreamID, ErrCodeProtocol) }
bodyOpen := !f.StreamEnded() if rp.method == "HEAD" && bodyOpen { // HEAD requests can't have bodies
return nil, nil, streamError(f.StreamID, ErrCodeProtocol) }
rp.header = make(http.Header) for _, hf := range f.RegularFields() { rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value) } if rp.authority == "" { rp.authority = rp.header.Get("Host") }
rw, req, err := sc.newWriterAndRequestNoBody(st, rp) if err != nil { return nil, nil, err } if bodyOpen { if vv, ok := rp.header["Content-Length"]; ok { req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) } else { req.ContentLength = -1 } req.Body.(*requestBody).pipe = &pipe{ b: &dataBuffer{expected: req.ContentLength}, } } return rw, req, nil }
type requestParam struct { method string scheme, authority, path string header http.Header }
func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) { sc.serveG.check()
var tlsState *tls.ConnectionState // nil if not scheme https
if rp.scheme == "https" { tlsState = sc.tlsState }
needsContinue := rp.header.Get("Expect") == "100-continue" if needsContinue { rp.header.Del("Expect") } // Merge Cookie headers into one "; "-delimited value.
if cookies := rp.header["Cookie"]; len(cookies) > 1 { rp.header.Set("Cookie", strings.Join(cookies, "; ")) }
// Setup Trailers
var trailer http.Header for _, v := range rp.header["Trailer"] { for _, key := range strings.Split(v, ",") { key = http.CanonicalHeaderKey(strings.TrimSpace(key)) switch key { case "Transfer-Encoding", "Trailer", "Content-Length": // Bogus. (copy of http1 rules)
// Ignore.
default: if trailer == nil { trailer = make(http.Header) } trailer[key] = nil } } } delete(rp.header, "Trailer")
var url_ *url.URL var requestURI string if rp.method == "CONNECT" { url_ = &url.URL{Host: rp.authority} requestURI = rp.authority // mimic HTTP/1 server behavior
} else { var err error url_, err = url.ParseRequestURI(rp.path) if err != nil { return nil, nil, streamError(st.id, ErrCodeProtocol) } requestURI = rp.path }
body := &requestBody{ conn: sc, stream: st, needsContinue: needsContinue, } req := &http.Request{ Method: rp.method, URL: url_, RemoteAddr: sc.remoteAddrStr, Header: rp.header, RequestURI: requestURI, Proto: "HTTP/2.0", ProtoMajor: 2, ProtoMinor: 0, TLS: tlsState, Host: rp.authority, Body: body, Trailer: trailer, } req = requestWithContext(req, st.ctx)
rws := responseWriterStatePool.Get().(*responseWriterState) bwSave := rws.bw *rws = responseWriterState{} // zero all the fields
rws.conn = sc rws.bw = bwSave rws.bw.Reset(chunkWriter{rws}) rws.stream = st rws.req = req rws.body = body
rw := &responseWriter{rws: rws} return rw, req, nil }
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { didPanic := true defer func() { rw.rws.stream.cancelCtx() if didPanic { e := recover() sc.writeFrameFromHandler(FrameWriteRequest{ write: handlerPanicRST{rw.rws.stream.id}, stream: rw.rws.stream, }) // Same as net/http:
if shouldLogPanic(e) { const size = 64 << 10 buf := make([]byte, size) buf = buf[:runtime.Stack(buf, false)] sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf) } return } rw.handlerDone() }() handler(rw, req) didPanic = false }
func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) { // 10.5.1 Limits on Header Block Size:
// .. "A server that receives a larger header block than it is
// willing to handle can send an HTTP 431 (Request Header Fields Too
// Large) status code"
const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+
w.WriteHeader(statusRequestHeaderFieldsTooLarge) io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>") }
// called from handler goroutines.
// h may be nil.
func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error { sc.serveG.checkNotOn() // NOT on
var errc chan error if headerData.h != nil { // If there's a header map (which we don't own), so we have to block on
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
errc = errChanPool.Get().(chan error) } if err := sc.writeFrameFromHandler(FrameWriteRequest{ write: headerData, stream: st, done: errc, }); err != nil { return err } if errc != nil { select { case err := <-errc: errChanPool.Put(errc) return err case <-sc.doneServing: return errClientDisconnected case <-st.cw: return errStreamClosed } } return nil }
// called from handler goroutines.
func (sc *serverConn) write100ContinueHeaders(st *stream) { sc.writeFrameFromHandler(FrameWriteRequest{ write: write100ContinueHeadersFrame{st.id}, stream: st, }) }
// A bodyReadMsg tells the server loop that the http.Handler read n
// bytes of the DATA from the client on the given stream.
type bodyReadMsg struct { st *stream n int }
// called from handler goroutines.
// Notes that the handler for the given stream ID read n bytes of its body
// and schedules flow control tokens to be sent.
func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) { sc.serveG.checkNotOn() // NOT on
if n > 0 { select { case sc.bodyReadCh <- bodyReadMsg{st, n}: case <-sc.doneServing: } } }
func (sc *serverConn) noteBodyRead(st *stream, n int) { sc.serveG.check() sc.sendWindowUpdate(nil, n) // conn-level
if st.state != stateHalfClosedRemote && st.state != stateClosed { // Don't send this WINDOW_UPDATE if the stream is closed
// remotely.
sc.sendWindowUpdate(st, n) } }
// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate(st *stream, n int) { sc.serveG.check() // "The legal range for the increment to the flow control
// window is 1 to 2^31-1 (2,147,483,647) octets."
// A Go Read call on 64-bit machines could in theory read
// a larger Read than this. Very unlikely, but we handle it here
// rather than elsewhere for now.
const maxUint31 = 1<<31 - 1 for n >= maxUint31 { sc.sendWindowUpdate32(st, maxUint31) n -= maxUint31 } sc.sendWindowUpdate32(st, int32(n)) }
// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) { sc.serveG.check() if n == 0 { return } if n < 0 { panic("negative update") } var streamID uint32 if st != nil { streamID = st.id } sc.writeFrame(FrameWriteRequest{ write: writeWindowUpdate{streamID: streamID, n: uint32(n)}, stream: st, }) var ok bool if st == nil { ok = sc.inflow.add(n) } else { ok = st.inflow.add(n) } if !ok { panic("internal error; sent too many window updates without decrements?") } }
// requestBody is the Handler's Request.Body type.
// Read and Close may be called concurrently.
type requestBody struct { stream *stream conn *serverConn closed bool // for use by Close only
sawEOF bool // for use by Read only
pipe *pipe // non-nil if we have a HTTP entity message body
needsContinue bool // need to send a 100-continue
}
func (b *requestBody) Close() error { if b.pipe != nil && !b.closed { b.pipe.BreakWithError(errClosedBody) } b.closed = true return nil }
func (b *requestBody) Read(p []byte) (n int, err error) { if b.needsContinue { b.needsContinue = false b.conn.write100ContinueHeaders(b.stream) } if b.pipe == nil || b.sawEOF { return 0, io.EOF } n, err = b.pipe.Read(p) if err == io.EOF { b.sawEOF = true } if b.conn == nil && inTests { return } b.conn.noteBodyReadFromHandler(b.stream, n, err) return }
// responseWriter is the http.ResponseWriter implementation. It's
// intentionally small (1 pointer wide) to minimize garbage. The
// responseWriterState pointer inside is zeroed at the end of a
// request (in handlerDone) and calls on the responseWriter thereafter
// simply crash (caller's mistake), but the much larger responseWriterState
// and buffers are reused between multiple requests.
type responseWriter struct { rws *responseWriterState }
// Optional http.ResponseWriter interfaces implemented.
var ( _ http.CloseNotifier = (*responseWriter)(nil) _ http.Flusher = (*responseWriter)(nil) _ stringWriter = (*responseWriter)(nil) )
type responseWriterState struct { // immutable within a request:
stream *stream req *http.Request body *requestBody // to close at end of request, if DATA frames didn't
conn *serverConn
// TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
// mutated by http.Handler goroutine:
handlerHeader http.Header // nil until called
snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
trailers []string // set in writeChunk
status int // status code passed to WriteHeader
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
dirty bool // a Write failed; don't reuse this responseWriterState
sentContentLen int64 // non-zero if handler set a Content-Length header
wroteBytes int64
closeNotifierMu sync.Mutex // guards closeNotifierCh
closeNotifierCh chan bool // nil until first used
}
type chunkWriter struct{ rws *responseWriterState }
func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) != 0 }
// declareTrailer is called for each Trailer header when the
// response header is written. It notes that a header will need to be
// written in the trailers at the end of the response.
func (rws *responseWriterState) declareTrailer(k string) { k = http.CanonicalHeaderKey(k) if !ValidTrailerHeader(k) { // Forbidden by RFC 2616 14.40.
rws.conn.logf("ignoring invalid trailer %q", k) return } if !strSliceContains(rws.trailers, k) { rws.trailers = append(rws.trailers, k) } }
// writeChunk writes chunks from the bufio.Writer. But because
// bufio.Writer may bypass its chunking, sometimes p may be
// arbitrarily large.
//
// writeChunk is also responsible (on the first chunk) for sending the
// HEADER response.
func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { if !rws.wroteHeader { rws.writeHeader(200) }
isHeadResp := rws.req.Method == "HEAD" if !rws.sentHeader { rws.sentHeader = true var ctype, clen string if clen = rws.snapHeader.Get("Content-Length"); clen != "" { rws.snapHeader.Del("Content-Length") clen64, err := strconv.ParseInt(clen, 10, 64) if err == nil && clen64 >= 0 { rws.sentContentLen = clen64 } else { clen = "" } } if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) { clen = strconv.Itoa(len(p)) } _, hasContentType := rws.snapHeader["Content-Type"] if !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 { ctype = http.DetectContentType(p) } var date string if _, ok := rws.snapHeader["Date"]; !ok { // TODO(bradfitz): be faster here, like net/http? measure.
date = time.Now().UTC().Format(http.TimeFormat) }
for _, v := range rws.snapHeader["Trailer"] { foreachHeaderElement(v, rws.declareTrailer) }
endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ streamID: rws.stream.id, httpResCode: rws.status, h: rws.snapHeader, endStream: endStream, contentType: ctype, contentLength: clen, date: date, }) if err != nil { rws.dirty = true return 0, err } if endStream { return 0, nil } } if isHeadResp { return len(p), nil } if len(p) == 0 && !rws.handlerDone { return 0, nil }
if rws.handlerDone { rws.promoteUndeclaredTrailers() }
endStream := rws.handlerDone && !rws.hasTrailers() if len(p) > 0 || endStream { // only send a 0 byte DATA frame if we're ending the stream.
if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { rws.dirty = true return 0, err } }
if rws.handlerDone && rws.hasTrailers() { err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ streamID: rws.stream.id, h: rws.handlerHeader, trailers: rws.trailers, endStream: true, }) if err != nil { rws.dirty = true } return len(p), err } return len(p), nil }
// TrailerPrefix is a magic prefix for ResponseWriter.Header map keys
// that, if present, signals that the map entry is actually for
// the response trailers, and not the response headers. The prefix
// is stripped after the ServeHTTP call finishes and the values are
// sent in the trailers.
//
// This mechanism is intended only for trailers that are not known
// prior to the headers being written. If the set of trailers is fixed
// or known before the header is written, the normal Go trailers mechanism
// is preferred:
// https://golang.org/pkg/net/http/#ResponseWriter
// https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
const TrailerPrefix = "Trailer:"
// promoteUndeclaredTrailers permits http.Handlers to set trailers
// after the header has already been flushed. Because the Go
// ResponseWriter interface has no way to set Trailers (only the
// Header), and because we didn't want to expand the ResponseWriter
// interface, and because nobody used trailers, and because RFC 2616
// says you SHOULD (but not must) predeclare any trailers in the
// header, the official ResponseWriter rules said trailers in Go must
// be predeclared, and then we reuse the same ResponseWriter.Header()
// map to mean both Headers and Trailers. When it's time to write the
// Trailers, we pick out the fields of Headers that were declared as
// trailers. That worked for a while, until we found the first major
// user of Trailers in the wild: gRPC (using them only over http2),
// and gRPC libraries permit setting trailers mid-stream without
// predeclarnig them. So: change of plans. We still permit the old
// way, but we also permit this hack: if a Header() key begins with
// "Trailer:", the suffix of that key is a Trailer. Because ':' is an
// invalid token byte anyway, there is no ambiguity. (And it's already
// filtered out) It's mildly hacky, but not terrible.
//
// This method runs after the Handler is done and promotes any Header
// fields to be trailers.
func (rws *responseWriterState) promoteUndeclaredTrailers() { for k, vv := range rws.handlerHeader { if !strings.HasPrefix(k, TrailerPrefix) { continue } trailerKey := strings.TrimPrefix(k, TrailerPrefix) rws.declareTrailer(trailerKey) rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv }
if len(rws.trailers) > 1 { sorter := sorterPool.Get().(*sorter) sorter.SortStrings(rws.trailers) sorterPool.Put(sorter) } }
func (w *responseWriter) Flush() { rws := w.rws if rws == nil { panic("Header called after Handler finished") } if rws.bw.Buffered() > 0 { if err := rws.bw.Flush(); err != nil { // Ignore the error. The frame writer already knows.
return } } else { // The bufio.Writer won't call chunkWriter.Write
// (writeChunk with zero bytes, so we have to do it
// ourselves to force the HTTP response header and/or
// final DATA frame (with END_STREAM) to be sent.
rws.writeChunk(nil) } }
func (w *responseWriter) CloseNotify() <-chan bool { rws := w.rws if rws == nil { panic("CloseNotify called after Handler finished") } rws.closeNotifierMu.Lock() ch := rws.closeNotifierCh if ch == nil { ch = make(chan bool, 1) rws.closeNotifierCh = ch cw := rws.stream.cw go func() { cw.Wait() // wait for close
ch <- true }() } rws.closeNotifierMu.Unlock() return ch }
func (w *responseWriter) Header() http.Header { rws := w.rws if rws == nil { panic("Header called after Handler finished") } if rws.handlerHeader == nil { rws.handlerHeader = make(http.Header) } return rws.handlerHeader }
// checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode.
func checkWriteHeaderCode(code int) { // Issue 22880: require valid WriteHeader status codes.
// For now we only enforce that it's three digits.
// In the future we might block things over 599 (600 and above aren't defined
// at http://httpwg.org/specs/rfc7231.html#status.codes)
// and we might block under 200 (once we have more mature 1xx support).
// But for now any three digits.
//
// We used to send "HTTP/1.1 000 0" on the wire in responses but there's
// no equivalent bogus thing we can realistically send in HTTP/2,
// so we'll consistently panic instead and help people find their bugs
// early. (We can't return an error from WriteHeader even if we wanted to.)
if code < 100 || code > 999 { panic(fmt.Sprintf("invalid WriteHeader code %v", code)) } }
func (w *responseWriter) WriteHeader(code int) { checkWriteHeaderCode(code) rws := w.rws if rws == nil { panic("WriteHeader called after Handler finished") } rws.writeHeader(code) }
func (rws *responseWriterState) writeHeader(code int) { if !rws.wroteHeader { rws.wroteHeader = true rws.status = code if len(rws.handlerHeader) > 0 { rws.snapHeader = cloneHeader(rws.handlerHeader) } } }
func cloneHeader(h http.Header) http.Header { h2 := make(http.Header, len(h)) for k, vv := range h { vv2 := make([]string, len(vv)) copy(vv2, vv) h2[k] = vv2 } return h2 }
// The Life Of A Write is like this:
//
// * Handler calls w.Write or w.WriteString ->
// * -> rws.bw (*bufio.Writer) ->
// * (Handler might call Flush)
// * -> chunkWriter{rws}
// * -> responseWriterState.writeChunk(p []byte)
// * -> responseWriterState.writeChunk (most of the magic; see comment there)
func (w *responseWriter) Write(p []byte) (n int, err error) { return w.write(len(p), p, "") }
func (w *responseWriter) WriteString(s string) (n int, err error) { return w.write(len(s), nil, s) }
// either dataB or dataS is non-zero.
func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { rws := w.rws if rws == nil { panic("Write called after Handler finished") } if !rws.wroteHeader { w.WriteHeader(200) } if !bodyAllowedForStatus(rws.status) { return 0, http.ErrBodyNotAllowed } rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set
if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen { // TODO: send a RST_STREAM
return 0, errors.New("http2: handler wrote more than declared Content-Length") }
if dataB != nil { return rws.bw.Write(dataB) } else { return rws.bw.WriteString(dataS) } }
func (w *responseWriter) handlerDone() { rws := w.rws dirty := rws.dirty rws.handlerDone = true w.Flush() w.rws = nil if !dirty { // Only recycle the pool if all prior Write calls to
// the serverConn goroutine completed successfully. If
// they returned earlier due to resets from the peer
// there might still be write goroutines outstanding
// from the serverConn referencing the rws memory. See
// issue 20704.
responseWriterStatePool.Put(rws) } }
// Push errors.
var ( ErrRecursivePush = errors.New("http2: recursive push not allowed") ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS") )
// pushOptions is the internal version of http.PushOptions, which we
// cannot include here because it's only defined in Go 1.8 and later.
type pushOptions struct { Method string Header http.Header }
func (w *responseWriter) push(target string, opts pushOptions) error { st := w.rws.stream sc := st.sc sc.serveG.checkNotOn()
// No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream."
// http://tools.ietf.org/html/rfc7540#section-6.6
if st.isPushed() { return ErrRecursivePush }
// Default options.
if opts.Method == "" { opts.Method = "GET" } if opts.Header == nil { opts.Header = http.Header{} } wantScheme := "http" if w.rws.req.TLS != nil { wantScheme = "https" }
// Validate the request.
u, err := url.Parse(target) if err != nil { return err } if u.Scheme == "" { if !strings.HasPrefix(target, "/") { return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target) } u.Scheme = wantScheme u.Host = w.rws.req.Host } else { if u.Scheme != wantScheme { return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme) } if u.Host == "" { return errors.New("URL must have a host") } } for k := range opts.Header { if strings.HasPrefix(k, ":") { return fmt.Errorf("promised request headers cannot include pseudo header %q", k) } // These headers are meaningful only if the request has a body,
// but PUSH_PROMISE requests cannot have a body.
// http://tools.ietf.org/html/rfc7540#section-8.2
// Also disallow Host, since the promised URL must be absolute.
switch strings.ToLower(k) { case "content-length", "content-encoding", "trailer", "te", "expect", "host": return fmt.Errorf("promised request headers cannot include %q", k) } } if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil { return err }
// The RFC effectively limits promised requests to GET and HEAD:
// "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]"
// http://tools.ietf.org/html/rfc7540#section-8.2
if opts.Method != "GET" && opts.Method != "HEAD" { return fmt.Errorf("method %q must be GET or HEAD", opts.Method) }
msg := &startPushRequest{ parent: st, method: opts.Method, url: u, header: cloneHeader(opts.Header), done: errChanPool.Get().(chan error), }
select { case <-sc.doneServing: return errClientDisconnected case <-st.cw: return errStreamClosed case sc.serveMsgCh <- msg: }
select { case <-sc.doneServing: return errClientDisconnected case <-st.cw: return errStreamClosed case err := <-msg.done: errChanPool.Put(msg.done) return err } }
type startPushRequest struct { parent *stream method string url *url.URL header http.Header done chan error }
func (sc *serverConn) startPush(msg *startPushRequest) { sc.serveG.check()
// http://tools.ietf.org/html/rfc7540#section-6.6.
// PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that
// is in either the "open" or "half-closed (remote)" state.
if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote { // responseWriter.Push checks that the stream is peer-initiaed.
msg.done <- errStreamClosed return }
// http://tools.ietf.org/html/rfc7540#section-6.6.
if !sc.pushEnabled { msg.done <- http.ErrNotSupported return }
// PUSH_PROMISE frames must be sent in increasing order by stream ID, so
// we allocate an ID for the promised stream lazily, when the PUSH_PROMISE
// is written. Once the ID is allocated, we start the request handler.
allocatePromisedID := func() (uint32, error) { sc.serveG.check()
// Check this again, just in case. Technically, we might have received
// an updated SETTINGS by the time we got around to writing this frame.
if !sc.pushEnabled { return 0, http.ErrNotSupported } // http://tools.ietf.org/html/rfc7540#section-6.5.2.
if sc.curPushedStreams+1 > sc.clientMaxStreams { return 0, ErrPushLimitReached }
// http://tools.ietf.org/html/rfc7540#section-5.1.1.
// Streams initiated by the server MUST use even-numbered identifiers.
// A server that is unable to establish a new stream identifier can send a GOAWAY
// frame so that the client is forced to open a new connection for new streams.
if sc.maxPushPromiseID+2 >= 1<<31 { sc.startGracefulShutdownInternal() return 0, ErrPushLimitReached } sc.maxPushPromiseID += 2 promisedID := sc.maxPushPromiseID
// http://tools.ietf.org/html/rfc7540#section-8.2.
// Strictly speaking, the new stream should start in "reserved (local)", then
// transition to "half closed (remote)" after sending the initial HEADERS, but
// we start in "half closed (remote)" for simplicity.
// See further comments at the definition of stateHalfClosedRemote.
promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote) rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{ method: msg.method, scheme: msg.url.Scheme, authority: msg.url.Host, path: msg.url.RequestURI(), header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE
}) if err != nil { // Should not happen, since we've already validated msg.url.
panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err)) }
go sc.runHandler(rw, req, sc.handler.ServeHTTP) return promisedID, nil }
sc.writeFrame(FrameWriteRequest{ write: &writePushPromise{ streamID: msg.parent.id, method: msg.method, url: msg.url, h: msg.header, allocatePromisedID: allocatePromisedID, }, stream: msg.parent, done: msg.done, }) }
// foreachHeaderElement splits v according to the "#rule" construction
// in RFC 2616 section 2.1 and calls fn for each non-empty element.
func foreachHeaderElement(v string, fn func(string)) { v = textproto.TrimString(v) if v == "" { return } if !strings.Contains(v, ",") { fn(v) return } for _, f := range strings.Split(v, ",") { if f = textproto.TrimString(f); f != "" { fn(f) } } }
// From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2
var connHeaders = []string{ "Connection", "Keep-Alive", "Proxy-Connection", "Transfer-Encoding", "Upgrade", }
// checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request,
// per RFC 7540 Section 8.1.2.2.
// The returned error is reported to users.
func checkValidHTTP2RequestHeaders(h http.Header) error { for _, k := range connHeaders { if _, ok := h[k]; ok { return fmt.Errorf("request header %q is not valid in HTTP/2", k) } } te := h["Te"] if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) { return errors.New(`request header "TE" may only be "trailers" in HTTP/2`) } return nil }
func new400Handler(err error) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { http.Error(w, err.Error(), http.StatusBadRequest) } }
// ValidTrailerHeader reports whether name is a valid header field name to appear
// in trailers.
// See: http://tools.ietf.org/html/rfc7230#section-4.1.2
func ValidTrailerHeader(name string) bool { name = http.CanonicalHeaderKey(name) if strings.HasPrefix(name, "If-") || badTrailer[name] { return false } return true }
var badTrailer = map[string]bool{ "Authorization": true, "Cache-Control": true, "Connection": true, "Content-Encoding": true, "Content-Length": true, "Content-Range": true, "Content-Type": true, "Expect": true, "Host": true, "Keep-Alive": true, "Max-Forwards": true, "Pragma": true, "Proxy-Authenticate": true, "Proxy-Authorization": true, "Proxy-Connection": true, "Range": true, "Realm": true, "Te": true, "Trailer": true, "Transfer-Encoding": true, "Www-Authenticate": true, }
// h1ServerKeepAlivesDisabled reports whether hs has its keep-alives
// disabled. See comments on h1ServerShutdownChan above for why
// the code is written this way.
func h1ServerKeepAlivesDisabled(hs *http.Server) bool { var x interface{} = hs type I interface { doKeepAlives() bool } if hs, ok := x.(I); ok { return !hs.doKeepAlives() } return false }
|