aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/appengine/internal/api.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/appengine/internal/api.go')
-rw-r--r--vendor/google.golang.org/appengine/internal/api.go155
1 files changed, 146 insertions, 9 deletions
diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go
index a2a4b4c..ec5aa59 100644
--- a/vendor/google.golang.org/appengine/internal/api.go
+++ b/vendor/google.golang.org/appengine/internal/api.go
@@ -26,6 +26,8 @@ import (
"github.com/golang/protobuf/proto"
netcontext "golang.org/x/net/context"
+ basepb "google.golang.org/appengine/internal/base"
+ logpb "google.golang.org/appengine/internal/log"
remotepb "google.golang.org/appengine/internal/remote_api"
)
@@ -50,6 +52,7 @@ var (
apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
apiContentType = http.CanonicalHeaderKey("Content-Type")
apiContentTypeValue = []string{"application/octet-stream"}
+ logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")
apiHTTPClient = &http.Client{
Transport: &http.Transport{
@@ -79,8 +82,8 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) {
req: r,
outHeader: w.Header(),
apiURL: apiURL(),
- logger: globalLogger(),
}
+ stopFlushing := make(chan int)
ctxs.Lock()
ctxs.m[r] = c
@@ -109,9 +112,26 @@ func handleHTTP(w http.ResponseWriter, r *http.Request) {
r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
}
+ // Start goroutine responsible for flushing app logs.
+ // This is done after adding c to ctx.m (and stopped before removing it)
+ // because flushing logs requires making an API call.
+ go c.logFlusher(stopFlushing)
+
executeRequestSafely(c, r)
c.outHeader = nil // make sure header changes aren't respected any more
+ stopFlushing <- 1 // any logging beyond this point will be dropped
+
+ // Flush any pending logs asynchronously.
+ c.pendingLogs.Lock()
+ flushes := c.pendingLogs.flushes
+ if len(c.pendingLogs.lines) > 0 {
+ flushes++
+ }
+ c.pendingLogs.Unlock()
+ go c.flushLog(false)
+ w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
+
// Avoid nil Write call if c.Write is never called.
if c.outCode != 0 {
w.WriteHeader(c.outCode)
@@ -186,13 +206,18 @@ var ctxs = struct {
// context represents the context of an in-flight HTTP request.
// It implements the appengine.Context and http.ResponseWriter interfaces.
type context struct {
- req *http.Request
- logger *jsonLogger
+ req *http.Request
outCode int
outHeader http.Header
outBody []byte
+ pendingLogs struct {
+ sync.Mutex
+ lines []*logpb.UserAppLogLine
+ flushes int
+ }
+
apiURL *url.URL
}
@@ -265,9 +290,11 @@ func BackgroundContext() netcontext.Context {
},
},
apiURL: apiURL(),
- logger: globalLogger(),
}
+ // TODO(dsymonds): Wire up the shutdown handler to do a final flush.
+ go ctxs.bg.logFlusher(make(chan int))
+
return toContext(ctxs.bg)
}
@@ -279,7 +306,6 @@ func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netco
c := &context{
req: req,
apiURL: apiURL,
- logger: globalLogger(),
}
ctxs.Lock()
defer ctxs.Unlock()
@@ -501,9 +527,120 @@ func (c *context) Request() *http.Request {
return c.req
}
-func ContextForTesting(req *http.Request) netcontext.Context {
- return toContext(&context{
- req: req,
- logger: testLogger,
+func (c *context) addLogLine(ll *logpb.UserAppLogLine) {
+ // Truncate long log lines.
+ // TODO(dsymonds): Check if this is still necessary.
+ const lim = 8 << 10
+ if len(*ll.Message) > lim {
+ suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
+ ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
+ }
+
+ c.pendingLogs.Lock()
+ c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
+ c.pendingLogs.Unlock()
+}
+
+var logLevelName = map[int64]string{
+ 0: "DEBUG",
+ 1: "INFO",
+ 2: "WARNING",
+ 3: "ERROR",
+ 4: "CRITICAL",
+}
+
+func logf(c *context, level int64, format string, args ...interface{}) {
+ s := fmt.Sprintf(format, args...)
+ s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
+ c.addLogLine(&logpb.UserAppLogLine{
+ TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
+ Level: &level,
+ Message: &s,
})
+ log.Print(logLevelName[level] + ": " + s)
+}
+
+// flushLog attempts to flush any pending logs to the appserver.
+// It should not be called concurrently.
+func (c *context) flushLog(force bool) (flushed bool) {
+ c.pendingLogs.Lock()
+ // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
+ n, rem := 0, 30<<20
+ for ; n < len(c.pendingLogs.lines); n++ {
+ ll := c.pendingLogs.lines[n]
+ // Each log line will require about 3 bytes of overhead.
+ nb := proto.Size(ll) + 3
+ if nb > rem {
+ break
+ }
+ rem -= nb
+ }
+ lines := c.pendingLogs.lines[:n]
+ c.pendingLogs.lines = c.pendingLogs.lines[n:]
+ c.pendingLogs.Unlock()
+
+ if len(lines) == 0 && !force {
+ // Nothing to flush.
+ return false
+ }
+
+ rescueLogs := false
+ defer func() {
+ if rescueLogs {
+ c.pendingLogs.Lock()
+ c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
+ c.pendingLogs.Unlock()
+ }
+ }()
+
+ buf, err := proto.Marshal(&logpb.UserAppLogGroup{
+ LogLine: lines,
+ })
+ if err != nil {
+ log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
+ rescueLogs = true
+ return false
+ }
+
+ req := &logpb.FlushRequest{
+ Logs: buf,
+ }
+ res := &basepb.VoidProto{}
+ c.pendingLogs.Lock()
+ c.pendingLogs.flushes++
+ c.pendingLogs.Unlock()
+ if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
+ log.Printf("internal.flushLog: Flush RPC: %v", err)
+ rescueLogs = true
+ return false
+ }
+ return true
+}
+
+const (
+ // Log flushing parameters.
+ flushInterval = 1 * time.Second
+ forceFlushInterval = 60 * time.Second
+)
+
+func (c *context) logFlusher(stop <-chan int) {
+ lastFlush := time.Now()
+ tick := time.NewTicker(flushInterval)
+ for {
+ select {
+ case <-stop:
+ // Request finished.
+ tick.Stop()
+ return
+ case <-tick.C:
+ force := time.Now().Sub(lastFlush) > forceFlushInterval
+ if c.flushLog(force) {
+ lastFlush = time.Now()
+ }
+ }
+ }
+}
+
+func ContextForTesting(req *http.Request) netcontext.Context {
+ return toContext(&context{req: req})
}