// Copyright 2017, OpenCensus Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package trace import ( "sync" "time" "go.opencensus.io/internal" ) const ( maxBucketSize = 100000 defaultBucketSize = 10 ) var ( ssmu sync.RWMutex // protects spanStores spanStores = make(map[string]*spanStore) ) // This exists purely to avoid exposing internal methods used by z-Pages externally. type internalOnly struct{} func init() { //TODO(#412): remove internal.Trace = &internalOnly{} } // ReportActiveSpans returns the active spans for the given name. func (i internalOnly) ReportActiveSpans(name string) []*SpanData { s := spanStoreForName(name) if s == nil { return nil } var out []*SpanData s.mu.Lock() defer s.mu.Unlock() for span := range s.active { out = append(out, span.makeSpanData()) } return out } // ReportSpansByError returns a sample of error spans. // // If code is nonzero, only spans with that status code are returned. func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData { s := spanStoreForName(name) if s == nil { return nil } var out []*SpanData s.mu.Lock() defer s.mu.Unlock() if code != 0 { if b, ok := s.errors[code]; ok { for _, sd := range b.buffer { if sd == nil { break } out = append(out, sd) } } } else { for _, b := range s.errors { for _, sd := range b.buffer { if sd == nil { break } out = append(out, sd) } } } return out } // ConfigureBucketSizes sets the number of spans to keep per latency and error // bucket for different span names. func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) { for _, bc := range bcs { latencyBucketSize := bc.MaxRequestsSucceeded if latencyBucketSize < 0 { latencyBucketSize = 0 } if latencyBucketSize > maxBucketSize { latencyBucketSize = maxBucketSize } errorBucketSize := bc.MaxRequestsErrors if errorBucketSize < 0 { errorBucketSize = 0 } if errorBucketSize > maxBucketSize { errorBucketSize = maxBucketSize } spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize) } } // ReportSpansPerMethod returns a summary of what spans are being stored for each span name. func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary { out := make(map[string]internal.PerMethodSummary) ssmu.RLock() defer ssmu.RUnlock() for name, s := range spanStores { s.mu.Lock() p := internal.PerMethodSummary{ Active: len(s.active), } for code, b := range s.errors { p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{ ErrorCode: code, Size: b.size(), }) } for i, b := range s.latency { min, max := latencyBucketBounds(i) p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{ MinLatency: min, MaxLatency: max, Size: b.size(), }) } s.mu.Unlock() out[name] = p } return out } // ReportSpansByLatency returns a sample of successful spans. // // minLatency is the minimum latency of spans to be returned. // maxLatency, if nonzero, is the maximum latency of spans to be returned. func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData { s := spanStoreForName(name) if s == nil { return nil } var out []*SpanData s.mu.Lock() defer s.mu.Unlock() for i, b := range s.latency { min, max := latencyBucketBounds(i) if i+1 != len(s.latency) && max <= minLatency { continue } if maxLatency != 0 && maxLatency < min { continue } for _, sd := range b.buffer { if sd == nil { break } if minLatency != 0 || maxLatency != 0 { d := sd.EndTime.Sub(sd.StartTime) if d < minLatency { continue } if maxLatency != 0 && d > maxLatency { continue } } out = append(out, sd) } } return out } // spanStore keeps track of spans stored for a particular span name. // // It contains all active spans; a sample of spans for failed requests, // categorized by error code; and a sample of spans for successful requests, // bucketed by latency. type spanStore struct { mu sync.Mutex // protects everything below. active map[*Span]struct{} errors map[int32]*bucket latency []bucket maxSpansPerErrorBucket int } // newSpanStore creates a span store. func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore { s := &spanStore{ active: make(map[*Span]struct{}), latency: make([]bucket, len(defaultLatencies)+1), maxSpansPerErrorBucket: errorBucketSize, } for i := range s.latency { s.latency[i] = makeBucket(latencyBucketSize) } return s } // spanStoreForName returns the spanStore for the given name. // // It returns nil if it doesn't exist. func spanStoreForName(name string) *spanStore { var s *spanStore ssmu.RLock() s, _ = spanStores[name] ssmu.RUnlock() return s } // spanStoreForNameCreateIfNew returns the spanStore for the given name. // // It creates it if it didn't exist. func spanStoreForNameCreateIfNew(name string) *spanStore { ssmu.RLock() s, ok := spanStores[name] ssmu.RUnlock() if ok { return s } ssmu.Lock() defer ssmu.Unlock() s, ok = spanStores[name] if ok { return s } s = newSpanStore(name, defaultBucketSize, defaultBucketSize) spanStores[name] = s return s } // spanStoreSetSize resizes the spanStore for the given name. // // It creates it if it didn't exist. func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) { ssmu.RLock() s, ok := spanStores[name] ssmu.RUnlock() if ok { s.resize(latencyBucketSize, errorBucketSize) return } ssmu.Lock() defer ssmu.Unlock() s, ok = spanStores[name] if ok { s.resize(latencyBucketSize, errorBucketSize) return } s = newSpanStore(name, latencyBucketSize, errorBucketSize) spanStores[name] = s } func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) { s.mu.Lock() for i := range s.latency { s.latency[i].resize(latencyBucketSize) } for _, b := range s.errors { b.resize(errorBucketSize) } s.maxSpansPerErrorBucket = errorBucketSize s.mu.Unlock() } // add adds a span to the active bucket of the spanStore. func (s *spanStore) add(span *Span) { s.mu.Lock() s.active[span] = struct{}{} s.mu.Unlock() } // finished removes a span from the active set, and adds a corresponding // SpanData to a latency or error bucket. func (s *spanStore) finished(span *Span, sd *SpanData) { latency := sd.EndTime.Sub(sd.StartTime) if latency < 0 { latency = 0 } code := sd.Status.Code s.mu.Lock() delete(s.active, span) if code == 0 { s.latency[latencyBucket(latency)].add(sd) } else { if s.errors == nil { s.errors = make(map[int32]*bucket) } if b := s.errors[code]; b != nil { b.add(sd) } else { b := makeBucket(s.maxSpansPerErrorBucket) s.errors[code] = &b b.add(sd) } } s.mu.Unlock() }