Program Listing for File batch_span_processor.h

Return to documentation for file (/home/docs/checkouts/readthedocs.org/user_builds/opentelemetry-cpp/checkouts/v1.3.0/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h)

// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#pragma once

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"

#include <atomic>
#include <condition_variable>
#include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{

namespace trace
{

struct BatchSpanProcessorOptions
{
  size_t max_queue_size = 2048;

  /* The time interval between two consecutive exports. */
  std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000);

  size_t max_export_batch_size = 512;
};

class BatchSpanProcessor : public SpanProcessor
{
public:
  BatchSpanProcessor(std::unique_ptr<SpanExporter> &&exporter,
                     const BatchSpanProcessorOptions &options);

  std::unique_ptr<Recordable> MakeRecordable() noexcept override;

  void OnStart(Recordable &span,
               const opentelemetry::trace::SpanContext &parent_context) noexcept override;

  void OnEnd(std::unique_ptr<Recordable> &&span) noexcept override;

  bool ForceFlush(
      std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

  bool Shutdown(
      std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

  ~BatchSpanProcessor();

private:
  void DoBackgroundWork();

  void Export(const bool was_for_flush_called);

  void DrainQueue();

  /* The configured backend exporter */
  std::unique_ptr<SpanExporter> exporter_;

  /* Configurable parameters as per the official specs */
  const size_t max_queue_size_;
  const std::chrono::milliseconds schedule_delay_millis_;
  const size_t max_export_batch_size_;

  /* Synchronization primitives */
  std::condition_variable cv_, force_flush_cv_;
  std::mutex cv_m_, force_flush_cv_m_, shutdown_m_;

  /* The buffer/queue to which the ended spans are added */
  common::CircularBuffer<Recordable> buffer_;

  /* Important boolean flags to handle the workflow of the processor */
  std::atomic<bool> is_shutdown_{false};
  std::atomic<bool> is_force_flush_{false};
  std::atomic<bool> is_force_flush_notified_{false};

  /* The background worker thread */
  std::thread worker_thread_;
};

}  // namespace trace
}  // namespace sdk
OPENTELEMETRY_END_NAMESPACE