diff options
Diffstat (limited to 'vendor/google.golang.org/appengine/internal/api.go')
-rw-r--r-- | vendor/google.golang.org/appengine/internal/api.go | 155 |
1 files changed, 146 insertions, 9 deletions
diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go index a2a4b4c..ec5aa59 100644 --- a/vendor/google.golang.org/appengine/internal/api.go +++ b/vendor/google.golang.org/appengine/internal/api.go @@ -26,6 +26,8 @@ import ( "github.com/golang/protobuf/proto" netcontext "golang.org/x/net/context" + basepb "google.golang.org/appengine/internal/base" + logpb "google.golang.org/appengine/internal/log" remotepb "google.golang.org/appengine/internal/remote_api" ) @@ -50,6 +52,7 @@ var ( apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") apiContentType = http.CanonicalHeaderKey("Content-Type") apiContentTypeValue = []string{"application/octet-stream"} + logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") apiHTTPClient = &http.Client{ Transport: &http.Transport{ @@ -79,8 +82,8 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) { req: r, outHeader: w.Header(), apiURL: apiURL(), - logger: globalLogger(), } + stopFlushing := make(chan int) ctxs.Lock() ctxs.m[r] = c @@ -109,9 +112,26 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) { r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") } + // Start goroutine responsible for flushing app logs. + // This is done after adding c to ctx.m (and stopped before removing it) + // because flushing logs requires making an API call. + go c.logFlusher(stopFlushing) + executeRequestSafely(c, r) c.outHeader = nil // make sure header changes aren't respected any more + stopFlushing <- 1 // any logging beyond this point will be dropped + + // Flush any pending logs asynchronously. + c.pendingLogs.Lock() + flushes := c.pendingLogs.flushes + if len(c.pendingLogs.lines) > 0 { + flushes++ + } + c.pendingLogs.Unlock() + go c.flushLog(false) + w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) + // Avoid nil Write call if c.Write is never called. if c.outCode != 0 { w.WriteHeader(c.outCode) @@ -186,13 +206,18 @@ var ctxs = struct { // context represents the context of an in-flight HTTP request. // It implements the appengine.Context and http.ResponseWriter interfaces. type context struct { - req *http.Request - logger *jsonLogger + req *http.Request outCode int outHeader http.Header outBody []byte + pendingLogs struct { + sync.Mutex + lines []*logpb.UserAppLogLine + flushes int + } + apiURL *url.URL } @@ -265,9 +290,11 @@ func BackgroundContext() netcontext.Context { }, }, apiURL: apiURL(), - logger: globalLogger(), } + // TODO(dsymonds): Wire up the shutdown handler to do a final flush. + go ctxs.bg.logFlusher(make(chan int)) + return toContext(ctxs.bg) } @@ -279,7 +306,6 @@ func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netco c := &context{ req: req, apiURL: apiURL, - logger: globalLogger(), } ctxs.Lock() defer ctxs.Unlock() @@ -501,9 +527,120 @@ func (c *context) Request() *http.Request { return c.req } -func ContextForTesting(req *http.Request) netcontext.Context { - return toContext(&context{ - req: req, - logger: testLogger, +func (c *context) addLogLine(ll *logpb.UserAppLogLine) { + // Truncate long log lines. + // TODO(dsymonds): Check if this is still necessary. + const lim = 8 << 10 + if len(*ll.Message) > lim { + suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) + ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) + } + + c.pendingLogs.Lock() + c.pendingLogs.lines = append(c.pendingLogs.lines, ll) + c.pendingLogs.Unlock() +} + +var logLevelName = map[int64]string{ + 0: "DEBUG", + 1: "INFO", + 2: "WARNING", + 3: "ERROR", + 4: "CRITICAL", +} + +func logf(c *context, level int64, format string, args ...interface{}) { + s := fmt.Sprintf(format, args...) + s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. + c.addLogLine(&logpb.UserAppLogLine{ + TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), + Level: &level, + Message: &s, }) + log.Print(logLevelName[level] + ": " + s) +} + +// flushLog attempts to flush any pending logs to the appserver. +// It should not be called concurrently. +func (c *context) flushLog(force bool) (flushed bool) { + c.pendingLogs.Lock() + // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. + n, rem := 0, 30<<20 + for ; n < len(c.pendingLogs.lines); n++ { + ll := c.pendingLogs.lines[n] + // Each log line will require about 3 bytes of overhead. + nb := proto.Size(ll) + 3 + if nb > rem { + break + } + rem -= nb + } + lines := c.pendingLogs.lines[:n] + c.pendingLogs.lines = c.pendingLogs.lines[n:] + c.pendingLogs.Unlock() + + if len(lines) == 0 && !force { + // Nothing to flush. + return false + } + + rescueLogs := false + defer func() { + if rescueLogs { + c.pendingLogs.Lock() + c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) + c.pendingLogs.Unlock() + } + }() + + buf, err := proto.Marshal(&logpb.UserAppLogGroup{ + LogLine: lines, + }) + if err != nil { + log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) + rescueLogs = true + return false + } + + req := &logpb.FlushRequest{ + Logs: buf, + } + res := &basepb.VoidProto{} + c.pendingLogs.Lock() + c.pendingLogs.flushes++ + c.pendingLogs.Unlock() + if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { + log.Printf("internal.flushLog: Flush RPC: %v", err) + rescueLogs = true + return false + } + return true +} + +const ( + // Log flushing parameters. + flushInterval = 1 * time.Second + forceFlushInterval = 60 * time.Second +) + +func (c *context) logFlusher(stop <-chan int) { + lastFlush := time.Now() + tick := time.NewTicker(flushInterval) + for { + select { + case <-stop: + // Request finished. + tick.Stop() + return + case <-tick.C: + force := time.Now().Sub(lastFlush) > forceFlushInterval + if c.flushLog(force) { + lastFlush = time.Now() + } + } + } +} + +func ContextForTesting(req *http.Request) netcontext.Context { + return toContext(&context{req: req}) } |