C++中线程池一般使用队列(std::queue)配合外部的std::condition_variable,或者手动构建阻塞队列(BlockQueue)来设计。

而需要使用任务优先级的时候,一般使用大根堆/小根堆的优先级队列std::priority_queue来实现。

那么问题来了,在任务优先级比较不均的时候,怎么避免低优先级任务的长时间饥饿呢?

为了实现动态公平调度:

  • 动态优先级老化(Aging):优先级动态调整:任务在队列中等待时间越长,其有效优先级逐渐升高。
  • 双队列混合轮询:每处理一定数量的高优先级任务后,强制处理低优先级任务。

首先抽象出一个Task用于记录任务的初始优先级和入队时间:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#include <chrono>

struct Task {
    int base_priority; // 初始优先级
    std::chrono::steady_clock::time_point enqueue_time;
    std::function<void()> job;

    // 计算动态优先级(等待时间越长,优先级越高)
    int dynamic_priority() const {
        auto now = std::chrono::steady_clock::now();
        auto wait_time = std::chrono::duration_cast<std::chrono::seconds>(now - enqueue_time).count();
        return base_priority + static_cast<int>(wait_time * 0.1); // 老化系数可调
    }

    // 重载比较运算符(实际比较动态优先级)
    bool operator<(const Task& other) const {
        return this->dynamic_priority() < other.dynamic_priority(); 
    }
};

整体的线程池类设计:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <queue>
#include <vector>
#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool {
public:
    ThreadPool(size_t threads, size_t high_freq = 5) 
        : high_processing_count(0), high_freq_(high_freq) {
        for(size_t i = 0; i < threads; ++i) {
            workers.emplace_back([this] { worker_loop(); });
        }
    }

    void add_task(int priority, std::function<void()> task) {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            queue.emplace(Task{priority, std::chrono::steady_clock::now(), task});
        }
        condition.notify_one();
    }

    ~ThreadPool() { /* ... 省略资源回收代码 ... */ }

private:
    std::mutex queue_mutex;
    std::condition_variable condition;
    std::priority_queue<Task> queue; // 主队列(动态优先级)
    std::queue<std::function<void()>> low_priority_queue; // 辅助队列
    std::vector<std::thread> workers;
    
    // 轮询控制
    std::atomic<int> high_processing_count;
    const int high_freq_;

    void worker_loop() {
        while(true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(queue_mutex);
                condition.wait(lock, [this] { return !queue.empty(); });

                // 动态老化:每处理high_freq_个高优任务后强制处理低优
                if(++high_processing_count % high_freq_ == 0 && 
                   !low_priority_queue.empty()) {
                    task = low_priority_queue.front();
                    low_priority_queue.pop();
                } else {
                    task = queue.top().job;
                    queue.pop();
                }
            }
            if(task) task();
        }
    }
};

拓展

  • (处理超时)时间阈值兜底:可添加最大等待时间监控,对超时任务直接提升到最高优先级
  • 优先级区间划分:将任务分为URGENT/HIGH/NORMAL等级别,不同级别采用不同老化系数
  • 根据系统负载动态调整high_freq_参数
  • 根据队列负载动态增减线程,避免资源浪费(如 C++17 的 std::jthread)。

线程池常见实现:基于C++11实现线程池 - Skykey的文章 - 知乎

C++ 并发编程(从C++11到C++17)
货比三家:C++ 中的 task based 并发