aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go501
1 files changed, 285 insertions, 216 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index b15f71c..7c882db 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2014, Google Inc.
- * All rights reserved.
+ * Copyright 2014 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -38,6 +23,7 @@ import (
"errors"
"fmt"
"io"
+ "math"
"net"
"net/http"
"reflect"
@@ -61,6 +47,11 @@ import (
"google.golang.org/grpc/transport"
)
+const (
+ defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultServerMaxSendMessageSize = math.MaxInt32
+)
+
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
@@ -96,6 +87,7 @@ type Server struct {
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[io.Closer]bool
+ serve bool
drain bool
ctx context.Context
cancel context.CancelFunc
@@ -107,27 +99,67 @@ type Server struct {
}
type options struct {
- creds credentials.TransportCredentials
- codec Codec
- cp Compressor
- dc Decompressor
- maxMsgSize int
- unaryInt UnaryServerInterceptor
- streamInt StreamServerInterceptor
- inTapHandle tap.ServerInHandle
- statsHandler stats.Handler
- maxConcurrentStreams uint32
- useHandlerImpl bool // use http.Handler-based server
- unknownStreamDesc *StreamDesc
- keepaliveParams keepalive.ServerParameters
- keepalivePolicy keepalive.EnforcementPolicy
+ creds credentials.TransportCredentials
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ unaryInt UnaryServerInterceptor
+ streamInt StreamServerInterceptor
+ inTapHandle tap.ServerInHandle
+ statsHandler stats.Handler
+ maxConcurrentStreams uint32
+ maxReceiveMessageSize int
+ maxSendMessageSize int
+ useHandlerImpl bool // use http.Handler-based server
+ unknownStreamDesc *StreamDesc
+ keepaliveParams keepalive.ServerParameters
+ keepalivePolicy keepalive.EnforcementPolicy
+ initialWindowSize int32
+ initialConnWindowSize int32
+ writeBufferSize int
+ readBufferSize int
}
-var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
+var defaultServerOptions = options{
+ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
+ maxSendMessageSize: defaultServerMaxSendMessageSize,
+}
-// A ServerOption sets options.
+// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
+// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
+// before doing a write on the wire.
+func WriteBufferSize(s int) ServerOption {
+ return func(o *options) {
+ o.writeBufferSize = s
+ }
+}
+
+// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
+// for one read syscall.
+func ReadBufferSize(s int) ServerOption {
+ return func(o *options) {
+ o.readBufferSize = s
+ }
+}
+
+// InitialWindowSize returns a ServerOption that sets window size for stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialWindowSize(s int32) ServerOption {
+ return func(o *options) {
+ o.initialWindowSize = s
+ }
+}
+
+// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func InitialConnWindowSize(s int32) ServerOption {
+ return func(o *options) {
+ o.initialConnWindowSize = s
+ }
+}
+
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
return func(o *options) {
@@ -163,11 +195,25 @@ func RPCDecompressor(dc Decompressor) ServerOption {
}
}
-// MaxMsgSize returns a ServerOption to set the max message size in bytes for inbound mesages.
-// If this is not set, gRPC uses the default 4MB.
+// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
+// If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
func MaxMsgSize(m int) ServerOption {
+ return MaxRecvMsgSize(m)
+}
+
+// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
+// If this is not set, gRPC uses the default 4MB.
+func MaxRecvMsgSize(m int) ServerOption {
return func(o *options) {
- o.maxMsgSize = m
+ o.maxReceiveMessageSize = m
+ }
+}
+
+// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
+// If this is not set, gRPC uses the default 4MB.
+func MaxSendMsgSize(m int) ServerOption {
+ return func(o *options) {
+ o.maxSendMessageSize = m
}
}
@@ -192,7 +238,7 @@ func Creds(c credentials.TransportCredentials) ServerOption {
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
return func(o *options) {
if o.unaryInt != nil {
- panic("The unary server interceptor has been set.")
+ panic("The unary server interceptor was already set and may not be reset.")
}
o.unaryInt = i
}
@@ -203,7 +249,7 @@ func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
func StreamInterceptor(i StreamServerInterceptor) ServerOption {
return func(o *options) {
if o.streamInt != nil {
- panic("The stream server interceptor has been set.")
+ panic("The stream server interceptor was already set and may not be reset.")
}
o.streamInt = i
}
@@ -214,7 +260,7 @@ func StreamInterceptor(i StreamServerInterceptor) ServerOption {
func InTapHandle(h tap.ServerInHandle) ServerOption {
return func(o *options) {
if o.inTapHandle != nil {
- panic("The tap handle has been set.")
+ panic("The tap handle was already set and may not be reset.")
}
o.inTapHandle = h
}
@@ -229,10 +275,10 @@ func StatsHandler(h stats.Handler) ServerOption {
// UnknownServiceHandler returns a ServerOption that allows for adding a custom
// unknown service handler. The provided method is a bidi-streaming RPC service
-// handler that will be invoked instead of returning the the "unimplemented" gRPC
+// handler that will be invoked instead of returning the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function has full access to the Context of the request and the
-// stream, and the invocation passes through interceptors.
+// stream, and the invocation bypasses interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
return func(o *options) {
o.unknownStreamDesc = &StreamDesc{
@@ -248,8 +294,7 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
- var opts options
- opts.maxMsgSize = defaultMaxMsgSize
+ opts := defaultServerOptions
for _, o := range opt {
o(&opts)
}
@@ -288,8 +333,8 @@ func (s *Server) errorf(format string, a ...interface{}) {
}
}
-// RegisterService register a service and its implementation to the gRPC
-// server. Called from the IDL generated code. This must be called before
+// RegisterService registers a service and its implementation to the gRPC
+// server. It is called from the IDL generated code. This must be called before
// invoking Serve.
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
ht := reflect.TypeOf(sd.HandlerType).Elem()
@@ -304,6 +349,9 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
s.printf("RegisterService(%q)", sd.ServiceName)
+ if s.serve {
+ grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
+ }
if _, ok := s.m[sd.ServiceName]; ok {
grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
}
@@ -334,7 +382,7 @@ type MethodInfo struct {
IsServerStream bool
}
-// ServiceInfo contains unary RPC method info, streaming RPC methid info and metadata for a service.
+// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
type ServiceInfo struct {
Methods []MethodInfo
// Metadata is the metadata specified in ServiceDesc when registering service.
@@ -392,6 +440,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
+ s.serve = true
if s.lis == nil {
s.mu.Unlock()
lis.Close()
@@ -427,10 +476,12 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
s.mu.Unlock()
+ timer := time.NewTimer(tempDelay)
select {
- case <-time.After(tempDelay):
+ case <-timer.C:
case <-s.ctx.Done():
}
+ timer.Stop()
continue
}
s.mu.Lock()
@@ -453,7 +504,7 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
- grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
+ grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
// If serverHandShake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
@@ -483,12 +534,16 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
// transport.NewServerTransport).
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
config := &transport.ServerConfig{
- MaxStreams: s.opts.maxConcurrentStreams,
- AuthInfo: authInfo,
- InTapHandle: s.opts.inTapHandle,
- StatsHandler: s.opts.statsHandler,
- KeepaliveParams: s.opts.keepaliveParams,
- KeepalivePolicy: s.opts.keepalivePolicy,
+ MaxStreams: s.opts.maxConcurrentStreams,
+ AuthInfo: authInfo,
+ InTapHandle: s.opts.inTapHandle,
+ StatsHandler: s.opts.statsHandler,
+ KeepaliveParams: s.opts.keepaliveParams,
+ KeepalivePolicy: s.opts.keepalivePolicy,
+ InitialWindowSize: s.opts.initialWindowSize,
+ InitialConnWindowSize: s.opts.initialConnWindowSize,
+ WriteBufferSize: s.opts.writeBufferSize,
+ ReadBufferSize: s.opts.readBufferSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
@@ -496,7 +551,7 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
c.Close()
- grpclog.Println("grpc: Server.Serve failed to create ServerTransport: ", err)
+ grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return
}
if !s.addConn(st) {
@@ -554,6 +609,30 @@ func (s *Server) serveUsingHandler(conn net.Conn) {
})
}
+// ServeHTTP implements the Go standard library's http.Handler
+// interface by responding to the gRPC request r, by looking up
+// the requested gRPC method in the gRPC server s.
+//
+// The provided HTTP request must have arrived on an HTTP/2
+// connection. When using the Go standard library's server,
+// practically this means that the Request must also have arrived
+// over TLS.
+//
+// To share one port (such as 443 for https) between gRPC and an
+// existing http.Handler, use a root http.Handler such as:
+//
+// if r.ProtoMajor == 2 && strings.HasPrefix(
+// r.Header.Get("Content-Type"), "application/grpc") {
+// grpcServer.ServeHTTP(w, r)
+// } else {
+// yourMux.ServeHTTP(w, r)
+// }
+//
+// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
+// separate from grpc-go's HTTP/2 server. Performance and features may vary
+// between the two paths. ServeHTTP does not support some gRPC features
+// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
+// and subject to change.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
st, err := transport.NewServerHandlerTransport(w, r)
if err != nil {
@@ -618,18 +697,15 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
- p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
+ hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
if err != nil {
- // This typically indicates a fatal issue (e.g., memory
- // corruption or hardware faults) the application program
- // cannot handle.
- //
- // TODO(zhaoq): There exist other options also such as only closing the
- // faulty stream locally and remotely (Other streams can keep going). Find
- // the optimal option.
- grpclog.Fatalf("grpc: Server failed to encode response %v", err)
- }
- err = t.Write(stream, p, opts)
+ grpclog.Errorln("grpc: server failed to encode response: ", err)
+ return err
+ }
+ if len(data) > s.opts.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), s.opts.maxSendMessageSize)
+ }
+ err = t.Write(stream, hdr, data, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
@@ -644,9 +720,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
BeginTime: time.Now(),
}
sh.HandleRPC(stream.Context(), begin)
- }
- defer func() {
- if sh != nil {
+ defer func() {
end := &stats.End{
EndTime: time.Now(),
}
@@ -654,8 +728,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
- }
- }()
+ }()
+ }
if trInfo != nil {
defer trInfo.tr.Finish()
trInfo.firstLine.client = false
@@ -672,139 +746,137 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
stream.SetSendCompress(s.opts.cp.Type())
}
p := &parser{r: stream}
- for { // TODO: delete
- pf, req, err := p.recvMsg(s.opts.maxMsgSize)
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
- }
- if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
- }
- if err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
+ pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ }
+ if err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
}
- return err
}
+ return err
+ }
- if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- return err
- }
- if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
+ if st, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, st); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ return err
+ }
+ if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
+ }
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ }
+ var inPayload *stats.InPayload
+ if sh != nil {
+ inPayload = &stats.InPayload{
+ RecvTime: time.Now(),
}
- var inPayload *stats.InPayload
- if sh != nil {
- inPayload = &stats.InPayload{
- RecvTime: time.Now(),
- }
+ }
+ df := func(v interface{}) error {
+ if inPayload != nil {
+ inPayload.WireLength = len(req)
}
- df := func(v interface{}) error {
- if inPayload != nil {
- inPayload.WireLength = len(req)
- }
- if pf == compressionMade {
- var err error
- req, err = s.opts.dc.Do(bytes.NewReader(req))
- if err != nil {
- return Errorf(codes.Internal, err.Error())
- }
- }
- if len(req) > s.opts.maxMsgSize {
- // TODO: Revisit the error code. Currently keep it consistent with
- // java implementation.
- return status.Errorf(codes.Internal, "grpc: server received a message of %d bytes exceeding %d limit", len(req), s.opts.maxMsgSize)
- }
- if err := s.opts.codec.Unmarshal(req, v); err != nil {
- return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
- }
- if inPayload != nil {
- inPayload.Payload = v
- inPayload.Data = req
- inPayload.Length = len(req)
- sh.HandleRPC(stream.Context(), inPayload)
+ if pf == compressionMade {
+ var err error
+ req, err = s.opts.dc.Do(bytes.NewReader(req))
+ if err != nil {
+ return Errorf(codes.Internal, err.Error())
}
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
- }
- return nil
}
- reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
- if appErr != nil {
- appStatus, ok := status.FromError(appErr)
- if !ok {
- // Convert appErr if it is not a grpc status error.
- appErr = status.Error(convertCode(appErr), appErr.Error())
- appStatus, _ = status.FromError(appErr)
- }
- if trInfo != nil {
- trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
- trInfo.tr.SetError()
- }
- if e := t.WriteStatus(stream, appStatus); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
- }
- return appErr
+ if len(req) > s.opts.maxReceiveMessageSize {
+ // TODO: Revisit the error code. Currently keep it consistent with
+ // java implementation.
+ return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
+ }
+ if err := s.opts.codec.Unmarshal(req, v); err != nil {
+ return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
+ }
+ if inPayload != nil {
+ inPayload.Payload = v
+ inPayload.Data = req
+ inPayload.Length = len(req)
+ sh.HandleRPC(stream.Context(), inPayload)
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+ }
+ return nil
+ }
+ reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
+ if appErr != nil {
+ appStatus, ok := status.FromError(appErr)
+ if !ok {
+ // Convert appErr if it is not a grpc status error.
+ appErr = status.Error(convertCode(appErr), appErr.Error())
+ appStatus, _ = status.FromError(appErr)
}
if trInfo != nil {
- trInfo.tr.LazyLog(stringer("OK"), false)
+ trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
+ trInfo.tr.SetError()
}
- opts := &transport.Options{
- Last: true,
- Delay: false,
+ if e := t.WriteStatus(stream, appStatus); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
+ }
+ return appErr
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(stringer("OK"), false)
+ }
+ opts := &transport.Options{
+ Last: true,
+ Delay: false,
+ }
+ if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
+ if err == io.EOF {
+ // The entire stream is done (for unary RPC only).
+ return err
}
- if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
+ if s, ok := status.FromError(err); ok {
+ if e := t.WriteStatus(stream, s); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
}
- if s, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, s); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", e)
- }
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Printf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
+ } else {
+ switch st := err.(type) {
+ case transport.ConnectionError:
+ // Nothing to do here.
+ case transport.StreamError:
+ if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
+ grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
+ default:
+ panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
- return err
}
- if trInfo != nil {
- trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
- }
- // TODO: Should we be logging if writing status failed here, like above?
- // Should the logging be in WriteStatus? Should we ignore the WriteStatus
- // error or allow the stats handler to see it?
- return t.WriteStatus(stream, status.New(codes.OK, ""))
+ return err
+ }
+ if trInfo != nil {
+ trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
+ // TODO: Should we be logging if writing status failed here, like above?
+ // Should the logging be in WriteStatus? Should we ignore the WriteStatus
+ // error or allow the stats handler to see it?
+ return t.WriteStatus(stream, status.New(codes.OK, ""))
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
@@ -814,9 +886,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
BeginTime: time.Now(),
}
sh.HandleRPC(stream.Context(), begin)
- }
- defer func() {
- if sh != nil {
+ defer func() {
end := &stats.End{
EndTime: time.Now(),
}
@@ -824,24 +894,22 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
end.Error = toRPCErr(err)
}
sh.HandleRPC(stream.Context(), end)
- }
- }()
+ }()
+ }
if s.opts.cp != nil {
stream.SetSendCompress(s.opts.cp.Type())
}
ss := &serverStream{
- t: t,
- s: stream,
- p: &parser{r: stream},
- codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
- maxMsgSize: s.opts.maxMsgSize,
- trInfo: trInfo,
- statsHandler: sh,
- }
- if ss.cp != nil {
- ss.cbuf = new(bytes.Buffer)
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.opts.codec,
+ cp: s.opts.cp,
+ dc: s.opts.dc,
+ maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
+ maxSendMessageSize: s.opts.maxSendMessageSize,
+ trInfo: trInfo,
+ statsHandler: sh,
}
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
@@ -913,12 +981,12 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
- if err := t.WriteStatus(stream, status.New(codes.InvalidArgument, errDesc)); err != nil {
+ if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -943,7 +1011,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -973,7 +1041,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
trInfo.tr.SetError()
}
- grpclog.Printf("grpc: Server.handleStream failed to write status: %v", err)
+ grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
}
if trInfo != nil {
trInfo.tr.Finish()
@@ -1011,8 +1079,9 @@ func (s *Server) Stop() {
s.mu.Unlock()
}
-// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
-// connections and RPCs and blocks until all the pending RPCs are finished.
+// GracefulStop stops the gRPC server gracefully. It stops the server from
+// accepting new connections and RPCs and blocks until all the pending RPCs are
+// finished.
func (s *Server) GracefulStop() {
s.mu.Lock()
defer s.mu.Unlock()