C++ pipelining APIs

The pipeline API allows you to run inferencing for a segmented model across multiple Edge TPUs.

For more information and a walkthrough of this API, read Pipeline a model with multiple Edge TPUs.

class coral::PipelinedModelRunner

Runs inferencing for a segmented model, using a pipeline of Edge TPUs.

This class assumes each segment has a dedicated Edge TPU, which allows all segments to run in parallel and improves throughput.

For example, if you have a pool of requests to process:

auto model_segments_interpreters =
    ModelSegmentsInterpreters(model_segments_paths);
// Caller can set custom allocators for input and output tensors with
// `input_tensor_allocator` and `output_tensor_allocator` arguments.
auto runner = PipelinedModelRunner(model_segments_interpreters);
auto* input_tensor_allocator = runner.GetInputTensorAllocator();
auto* output_tensor_allocator = runner.GetOutputTensorAllocator();

const int total_num_requests = 1000;

auto request_producer = [&runner, &total_num_requests]() {
  for (int i = 0; i < total_num_requests; ++i) {
    // Caller is responsible for allocating input tensors.
    CHECK(runner.Push(CreateInputTensors(input_tensor_allocator)).ok());
  }
};

auto result_consumer = [&runner, &total_num_requests]() {
  for (int i = 0; i < total_num_requests; ++i) {
    std::vector<Tensor> output_tensors;
    CHECK(runner.Pop(&output_tensors).ok());
    ConsumeOutputTensors(output_tensors);
    // Caller is responsible for deallocating output tensors.
    FreeTensors(output_tensor_allocator, output_tensors);
  }
};

auto producer_thread = std::thread(request_producer);
auto consumer_thread = std::thread(result_consumer);

Or, if you have a stream of requests to process:

auto model_segments_interpreters =
    ModelSegmentsInterpreters(model_segments_paths);
// Caller can set custom allocators for input and output tensors with
// `input_tensor_allocator` and `output_tensor_allocator` arguments.
auto runner = PipelinedModelRunner(model_segments_interpreters);
auto* input_tensor_allocator = runner.GetInputTensorAllocator();
auto* output_tensor_allocator = runner.GetOutputTensorAllocator();

auto request_producer = [&runner]() {
  while (true) {
    // Caller is responsible for allocating input tensors.
    CHECK(runner.Push(CreateInputTensors(input_tensor_allocator)).ok());
    if (ShouldStop()) {
      // Pushing special inputs to signal no more inputs will be pushed.
      CHECK(runner.Push({}).ok());
      break;
    }
  }
};

auto result_consumer = [&runner]() {
  std::vector<Tensor> output_tensors;
  while (runner.Pop(&output_tensors).ok() && !output_tensors.empty()) {
    ConsumeOutputTensors(output_tensors);
    // Caller is responsible for deallocating output tensors.
    FreeTensors(output_tensor_allocator, output_tensors);
  }
};

auto producer_thread = std::thread(request_producer);
auto consumer_thread = std::thread(result_consumer);

This class is thread-safe.

Public Functions

PipelinedModelRunner(const std::vector<tflite::Interpreter*> &model_segments_interpreters, Allocator *input_tensor_allocator = nullptr, Allocator *output_tensor_allocator = nullptr)

Initializes the PipelinedModelRunner with model segments.

Note:

  • input_tensor_allocator is only used to free the input tensors, as this class assumes that input tensors are allocated by caller.

  • output_tensor_allocator is only used to allocate output tensors, as this class assumes that output tensors are freed by caller after consuming them.

Parameters
  • model_segments_interpreters: A vector of pointers to tflite::Interpreter objects, each representing a model segment and unique Edge TPU context. model_segments_interpreters[0] should be the first segment interpreter of the model, model_segments_interpreters[1] is the second segment, and so on.

  • input_tensor_allocator: A custom Allocator for input tensors. By default (nullptr), it uses an allocator provided by this class.

  • output_tensor_allocator: A custom Allocator for output tensors. By default (nullptr), it uses an allocator provided by this class.

Allocator *GetInputTensorAllocator() const

Returns the default input tensor allocator (or the allocator given to the constructor).

Allocator *GetOutputTensorAllocator() const

Returns the default output tensor allocator (or the allocator given to the constructor).

void SetInputQueueSize(size_t size)

Sets input queue size.

By default, input queue size is unlimited.

Note: It is OK to change queue size threshold when

PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.
Parameters
  • size: Input queue size.

void SetOutputQueueSize(size_t size)

Sets output queue size.

By default, output queue size is unlimited.

Note: It is OK to change queue size threshold when

PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.
Parameters
  • size: Output queue size.

absl::Status Push(const std::vector<PipelineTensor> &input_tensors)

Pushes input tensors to be processed by the pipeline.

Note:

  • Caller is responsible for allocating memory for input tensors. By default, this class will free those tensors when they are consumed. Caller can set a custom allocator for input tensors if needed.

  • Pushing an empty vector {} is allowed, which signals the class that no more inputs will be added (the function will return false if inputs were pushed after this special push). This special push allows Pop()’s consumer to properly drain unconsumed output tensors. See above example for details.

  • Caller will get blocked if current input queue size is greater than input queue size threshold. By default, input queue size threshold is unlimited, i.e., call to Push() is non-blocking.

Parameters
  • input_tensors: A vector of input tensors, each wrapped as a PipelineTensor. The order must match Interpreter::inputs() from the first model segment.

Return

absl::OkStatus if successful; absl::InternalError otherwise.

absl::Status Pop(std::vector<PipelineTensor> *output_tensors)

Gets output tensors from the pipeline.

Note:

  • Caller is responsible for deallocating memory for output tensors after consuming the tensors. By default, the output tensors are allocated using default tensor allocator. Caller can set a custom allocator for output tensors if needed.

  • Caller will get blocked if there is no output tensors available and no empty push is received.

Parameters
  • output_tensors: A pointer to a vector of PipelineTensor objects where outputs should be stored. Returned output tensors order matches Interpreter::outputs() of the last model segment.

Return

absl::OkStatus when output is received, or the pipeline input queue has already been stopped, and is empty, in which case output_tensors will be empty. Otherwise absl::InternalError.

std::vector<SegmentStats> GetSegmentStats() const

Returns performance stats for each segment.

struct coral::PipelineTensor

A tensor in the pipeline system.

This is a simplified version of TfLiteTensor.

Public Members

std::string name

Unique tensor name.

TfLiteType type

The data type specification for data stored in data.

This affects what member of data union should be used.

Buffer *buffer

Underlying memory buffer for tensor. Allocated by Allocator.

size_t bytes

The number of bytes required to store the data of this tensor.

That is: (bytes of each element) * dims[0] * ... * dims[n-1]. For example, if type is kTfLiteFloat32 and dims = {3, 2} then bytes = sizeof(float) * 3 * 2 = 4 * 3 * 2 = 24.

void coral::FreePipelineTensors(const std::vector<PipelineTensor> &tensors, Allocator *allocator)

Deallocates the memory for the given tensors.

Use this to free output tensors each time you process the results.

Parameters
  • tensors: A vector of PipelineTensor objects to release.

  • allocator: The Allocator originally used to allocate the tensors.

struct coral::SegmentStats

Performance statistics for one segment of model pipeline.

Public Members

int64_t total_time_ns = 0

Total time spent traversing this segment so far (in nanoseconds).

uint64_t num_inferences = 0

Number of inferences processed so far.

class coral::Allocator

Public Functions

Buffer *Alloc(size_t size_bytes) = 0

Allocates size_bytes bytes of memory.

Parameters
  • size_bytes: The number of bytes to allocate.

Return

A pointer to valid Buffer object.

void Free(Buffer *buffer) = 0

Deallocates memory at the given block.

class coral::Buffer

Public Functions

void *ptr() = 0

Returns user space pointer. Returned pointer can be nullptr.

void *MapToHost()

Maps buffer to host address space and returns the pointer.

Returns nullptr if the mapping was not successful. If the buffer was mapped before or does not require mapping, then this is an no-op.

Note if the underlying buffer is backed by DMA, DMA_BUF_IOCTL_SYNC ioctl call might be needed for cache coherence. When such memory is consumed by Edge TPU, such ioctl call is NOT necessary as the driver only uses the pointer to identify the backing physical pages for DMA.

bool UnmapFromHost()

Unmaps buffer from host address space.

Returns true if successful; false otherwise. This should be called explicitly if MapToHost() was called before. Calling on an already unmapped buffer is a no-op.

int fd()

Returns file descriptor, -1 if buffer is NOT backed by file descriptor.