一、前言

本文使用C++17来实现线程池结构。

说明:本文代码需支持C++17标准编译器编译,以及可选单元测试工具googletest

项目地址:ThreadPool.h

关于多线程的 c++ 类和方法:

/* 互斥体 */
std::mutex mutex_; 
/* RAII 风格包装器,包装 mutex 的 lock 和 unlock */
/* 构造时调用 lock 加锁互斥量,析构时调用 unlock 解锁互斥量 */
std::lock_guard<std::mutex> lock(mutex_);
/* unique_lock 与 lock_guard 相似,构造时尝试加锁 */
/* unique_lock 可以手动解锁,而非等待到析构时 */
std::unique_lock lock(mutex_);
/* 条件变量 condition_variable */
std::condition_variable cv;
/* 解锁 lock ,阻塞当前线程,等待 notify */
/* 可能存在被非 notify 唤醒,遂添加 pred 判断 */
/* 解阻塞时, lock 被再次加锁 */
cv.wait(std::unique_lock<std::mutex>&, Predicate pred);
/* 解阻塞被 wait 阻塞的进程 */
cv.notify_one();
/* 提供异步操作返回值获取,可阻塞 */
/* 本身为 std::shared_ptr 包装,可直接返回 */
std::future<ret_t> f;
/* 可调用目标(函数)包装器 */
/* 返回值可储存在 std::future 中 */
/* 使用 get_future() 方法构造 std::shared_ptr 包装的 future */
std::packaged_task<ret_t()> task;

二、函数声明

class ThreadPool {
private:
  /* 是否停止线程 */
  bool stop_;
  /* 函数调用队列 */
  std::queue<std::function<void()>> tasks_;
  /* 互斥量,锁 tasks_ */
  std::mutex mutex_;
  /* 条件变量,通知线程从 task_ 队列取出任务执行 */
  std::condition_variable condition_;
public:
  /* 禁用拷贝 */
  ThreadPool(const ThreadPool&) = delete;
  ThreadPool(const ThreadPool&&) = delete;
  ThreadPool& operator=(const ThreadPool&) = delete;
  ThreadPool& operator=(const ThreadPool&&) = delete;
  /* 线程池中线程数 */
  ThreadPool(size_t num);
  /* 处理资源回收 */
  ~ThreadPool();
  /* 模板函数,添加函数到执行队列 */
  /* 返回 std::future<> 类型,用于异步接收 task 请求 */
  template<typename Func, typename ...Args>
  auto addTask(Func&& f, Args&& args...) -> 
    std::future<decltype(f(args...))>;
};

三、函数实现

3.1 ThreadPool 构造函数

线程池构造时,启动 num 个线程。线程确认任务队列是否存在任务,直到能够取出任务。若使用非阻塞让线程一直检查队列,浪费 CPU 资源,可以采取使用条件变量唤醒机制,直到任务队列存在任务时唤醒线程去确认任务队列是否存在任务。

ThreadPool::ThreadPool(size_t num) : 
  stop_(false),
  mutex_(),
  condition_(),
  threads_(),
  tasks_() {
  for(size_t i = 0; i < num; ++i) {
      // 线程启动,并由析构函数通过 join 阻塞至完成
      threads_.emplace_back(std::threads([this]{
        std::function<void()> task;
        // 轮询从 task_ 获取任务并执行
        while(true) {
          // 加锁队列
          {
            std::unique_lock<std::mutex> locker(this->mutex_);
            this->condition_.wait(
              locker,
              [this]()->bool{
                // 触发条件,停止线程池或者task_队列中存在任务
                return this->stop_ || !this->tasks_.empty();    
              }
            );
            if(this->stop_ && this->tasks_.empty()) {
              return;
            }
            // 获取任务
            task = std::move(this->tasks_.front());
            this->tasks_.pop();
          } // locker 析构,解锁队列
          // 执行 task
          task();
        }
      }));
  }       
}

3.2 ThreadPool 析构函数

析构函数需要通知所有线程,阻塞到所有线程执行结束,析构完成。

ThreadPool::~ThreadPool() {
  {
    // 锁临界资源 task_ stop_  
    // 设置退出条件
    std::unique_lock<std::mutex> locker(this->mutex_);
    stop_ = true;
  }
  // 阻塞到所有任务完成
  condition_.notify_all();
  for(const auto& thread : threads_) {
    thread.join();
  }
}

3.3 addTask 添加任务

往 task_ 队列添加任务,核心功能在于将函数转化为 std::function<void()>
请注意:在 c++ 中 std::bind函数只能接受具有移动构造函数的类。

// 由于调用函数为可变参数,利用可变参数模板
// 利用 decltype 获取函数 f 返回值
template<typename Func, typename ...Args>
auto ThreadPool::addTask(Func&& f, Args&& ...args) ->
  std::future<decltype(f(args...))> {
  // 函数 f 返回类型
  using ret_t  = decltype(f(args...));
  // 包装器类型
  using task_t = std::packaged_task<ret_t()>;
  // std::future 获取类型
  using future_t = std::future<ret_t>;
  task_t *task = new task_t(std::bind(f, args...));
  // 构造 std::future
  future_t ret = task->get_future();
  {
    // 在下面作用域加锁 task_ 队列
    std::lock_gurad<std::mutex> locker(mutex_);
    task_.push([task]{ (*task)(); delete task; });
  }
  // 通知线程抢占 task_ 锁,获取任务
  condition_.notify_one();
  // std::future 已由 std::shared_ptr 包装,直接返回
  return ret;
}

四、单元测试

  • 设置任务功能为阻塞等待 100 ms。
  • 用例一:线程池中线程数量与放入任务数量相同,则等待 100 ms 左右结束。
  • 用例二:线程次中线程数量为 1 ,任务数量为 2 ,则等待 200 ms 左右结束。
  • 用例三:测试析构函数时阻塞进程功能。
  • 用例四:多线程素数判断。
/* 暴力判断法 */
bool IsPrime(int n) {
  for(int i = 2; i <= n / 2; ++i) {
    if(n % i == 0) {
      return false;
    }
  }
  return true;
}
/* 测试函数 */
TEST_F(ThreadPoolTest, PrimeThreadPool) {
  ThreadPool p(10);
  for(int i = 2; i <= iMax; ++i) {
    p.addTask(IsPrime, i);
  }
}
Last modification:May 15th, 2020 at 12:08 am