一、前言

本文使用C++11来实现阻塞队列结构。

说明:本文代码需支持C++11标准编译器编译,以及可选单元测试工具googletest

项目地址:BlockingQueue.h

二、函数声明

template<typename T>
class BlockingQueue {
  using self_t = BlockingQueue;
public:
  BlockingQueue(size_t);
  ~BlockingQueue() = default;
    /* 禁止拷贝 */
  BlockingQueue(const self_t&) = delete;
  BlockingQueue(const self_t&&) = delete;
  BlockingQueue& operator=(const self_t&) = delete;
  BlockingQueue& operator=(const self_t&&) = delete;
    /* 添加数据,生产者 */
  void enqueue(const T&);
  /* 取出数据,消费者 */
  T dequeue();
private:
  /* 队列最大值 */
  size_t sMax_;
    /* 队列及锁 */
  std::queue<T> queue_;
  std::mutex mutex_;
  std::condition_variable not_full_;
  std::condition_variable not_empty_;
};

三、函数实现

3.1 构造函数

/* 初始化队列最大值和其他成员 */
template<typename T>
BlockingQueue<T>::BlockingQueue(size_t num) :
  queue_(),
  mutex_(),
  not_full_(),
  not_empty_() {
  sMax_ = num;
}

3.2 enqueue 函数

template<typename T>
void BlockingQueue<T>::enqueue(const T& val) {
  /* 加锁 */
  std::unique_lock<std::mutex> locker(mutex_);
  /* 队列满,阻塞至消费者消费后通知 */
  if(queue_.size() >= sMax_) {
    not_full_.wait(locker, [this]()->bool{ return this->queue_.size() < sMax_; });
  }
  /* wait后队列自动加锁*/
  /* 添加数据,并通知消费者不空 */
  queue_.push(val);
  not_empty_.notify_one();
}

3.3 dequeue 函数

template<typename T>
T BlockingQueue<T>::dequeue() {
  /* 加锁 */
  std::unique_lock<std::mutex> locker(mutex_);
  /* 队列空,阻塞至生产者生产后通知 */
  if(queue_.empty()) {
    not_empty_.wait(locker, [this]()->bool{ return !(this->queue_.empty()); });
  }
  /* 获取数据,并通知生产者不满 */
  T ret = std::move(queue_.front());
  queue_.pop();
  not_full_.notify_one();
  return ret;
}

四、单元测试

  • 用例一:测试阻塞队列构造和析构函数。
  • 用例二:测试阻塞队列为空满时阻塞功能。
  • 用例三:测试生产者消费者模型(设置生产者和消费者不同速率)。
/* 生产消费者模型 */
/* item 仓库存放最大值 */
/* iCt 消费者消费速度,毫秒 */
/* iPt 生产者生产速度,毫秒 */
void ConsumerProducer(int item, int iCt, int iPt) {
  std::shared_ptr<BlockingQueue<int>> q = 
    std::make_shared<BlockingQueue<int>>(10);

  std::thread p([q, item, iPt]{
        for(int i = 0; i < item; ++i) {
          q->enqueue(i);
          std::this_thread::sleep_for(std::chrono::milliseconds(iPt));
        }
      });
  std::thread c([q, item, iCt]{
        int cmp = 0;
        while(true) {
          int i = q->dequeue();
          ASSERT_EQ(cmp, i);
          ++cmp;
          if(cmp == item) { return; }
          std::this_thread::sleep_for(std::chrono::milliseconds(iCt));
        }
      });

  p.join();
  c.join();
}
Last modification:May 15th, 2020 at 12:07 am