reader.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright 2019, OpenCensus Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. //
  15. package metricexport
  16. import (
  17. "context"
  18. "fmt"
  19. "sync"
  20. "time"
  21. "go.opencensus.io/metric/metricdata"
  22. "go.opencensus.io/metric/metricproducer"
  23. "go.opencensus.io/trace"
  24. )
  25. var (
  26. defaultSampler = trace.ProbabilitySampler(0.0001)
  27. errReportingIntervalTooLow = fmt.Errorf("reporting interval less than %d", minimumReportingDuration)
  28. errAlreadyStarted = fmt.Errorf("already started")
  29. errIntervalReaderNil = fmt.Errorf("interval reader is nil")
  30. errExporterNil = fmt.Errorf("exporter is nil")
  31. errReaderNil = fmt.Errorf("reader is nil")
  32. )
  33. const (
  34. defaultReportingDuration = 60 * time.Second
  35. minimumReportingDuration = 1 * time.Second
  36. defaultSpanName = "ExportMetrics"
  37. )
  38. // ReaderOptions contains options pertaining to metrics reader.
  39. type ReaderOptions struct {
  40. // SpanName is the name used for span created to export metrics.
  41. SpanName string
  42. }
  43. // Reader reads metrics from all producers registered
  44. // with producer manager and exports those metrics using provided
  45. // exporter.
  46. type Reader struct {
  47. sampler trace.Sampler
  48. spanName string
  49. }
  50. // IntervalReader periodically reads metrics from all producers registered
  51. // with producer manager and exports those metrics using provided
  52. // exporter. Call Reader.Stop() to stop the reader.
  53. type IntervalReader struct {
  54. // ReportingInterval it the time duration between two consecutive
  55. // metrics reporting. defaultReportingDuration is used if it is not set.
  56. // It cannot be set lower than minimumReportingDuration.
  57. ReportingInterval time.Duration
  58. exporter Exporter
  59. timer *time.Ticker
  60. quit, done chan bool
  61. mu sync.RWMutex
  62. reader *Reader
  63. }
  64. // ReaderOption apply changes to ReaderOptions.
  65. type ReaderOption func(*ReaderOptions)
  66. // WithSpanName makes new reader to use given span name when exporting metrics.
  67. func WithSpanName(spanName string) ReaderOption {
  68. return func(o *ReaderOptions) {
  69. o.SpanName = spanName
  70. }
  71. }
  72. // NewReader returns a reader configured with specified options.
  73. func NewReader(o ...ReaderOption) *Reader {
  74. var opts ReaderOptions
  75. for _, op := range o {
  76. op(&opts)
  77. }
  78. reader := &Reader{defaultSampler, defaultSpanName}
  79. if opts.SpanName != "" {
  80. reader.spanName = opts.SpanName
  81. }
  82. return reader
  83. }
  84. // NewIntervalReader creates a reader. Once started it periodically
  85. // reads metrics from all producers and exports them using provided exporter.
  86. func NewIntervalReader(reader *Reader, exporter Exporter) (*IntervalReader, error) {
  87. if exporter == nil {
  88. return nil, errExporterNil
  89. }
  90. if reader == nil {
  91. return nil, errReaderNil
  92. }
  93. r := &IntervalReader{
  94. exporter: exporter,
  95. reader: reader,
  96. }
  97. return r, nil
  98. }
  99. // Start starts the IntervalReader which periodically reads metrics from all
  100. // producers registered with global producer manager. If the reporting interval
  101. // is not set prior to calling this function then default reporting interval
  102. // is used.
  103. func (ir *IntervalReader) Start() error {
  104. if ir == nil {
  105. return errIntervalReaderNil
  106. }
  107. ir.mu.Lock()
  108. defer ir.mu.Unlock()
  109. var reportingInterval = defaultReportingDuration
  110. if ir.ReportingInterval != 0 {
  111. if ir.ReportingInterval < minimumReportingDuration {
  112. return errReportingIntervalTooLow
  113. }
  114. reportingInterval = ir.ReportingInterval
  115. }
  116. if ir.done != nil {
  117. return errAlreadyStarted
  118. }
  119. ir.timer = time.NewTicker(reportingInterval)
  120. ir.quit = make(chan bool)
  121. ir.done = make(chan bool)
  122. go ir.startInternal()
  123. return nil
  124. }
  125. func (ir *IntervalReader) startInternal() {
  126. for {
  127. select {
  128. case <-ir.timer.C:
  129. ir.reader.ReadAndExport(ir.exporter)
  130. case <-ir.quit:
  131. ir.timer.Stop()
  132. ir.done <- true
  133. return
  134. }
  135. }
  136. }
  137. // Stop stops the reader from reading and exporting metrics.
  138. // Additional call to Stop are no-ops.
  139. func (ir *IntervalReader) Stop() {
  140. if ir == nil {
  141. return
  142. }
  143. ir.mu.Lock()
  144. defer ir.mu.Unlock()
  145. if ir.quit == nil {
  146. return
  147. }
  148. ir.quit <- true
  149. <-ir.done
  150. close(ir.quit)
  151. close(ir.done)
  152. ir.quit = nil
  153. }
  154. // ReadAndExport reads metrics from all producer registered with
  155. // producer manager and then exports them using provided exporter.
  156. func (r *Reader) ReadAndExport(exporter Exporter) {
  157. ctx, span := trace.StartSpan(context.Background(), r.spanName, trace.WithSampler(r.sampler))
  158. defer span.End()
  159. producers := metricproducer.GlobalManager().GetAll()
  160. data := []*metricdata.Metric{}
  161. for _, producer := range producers {
  162. data = append(data, producer.Read()...)
  163. }
  164. // TODO: [rghetia] add metrics for errors.
  165. exporter.ExportMetrics(ctx, data)
  166. }