1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
14  
#include <algorithm>
14  
#include <algorithm>
15  
#include <atomic>
15  
#include <atomic>
16  
#include <condition_variable>
16  
#include <condition_variable>
17  
#include <cstdio>
17  
#include <cstdio>
18  
#include <mutex>
18  
#include <mutex>
19  
#include <thread>
19  
#include <thread>
20  
#include <vector>
20  
#include <vector>
21  

21  

22  
/*
22  
/*
23  
    Thread pool implementation using a shared work queue.
23  
    Thread pool implementation using a shared work queue.
24  

24  

25  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
25  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
26  
    in a single queue protected by a mutex. Worker threads wait on a
26  
    in a single queue protected by a mutex. Worker threads wait on a
27  
    condition_variable until work is available or stop is requested.
27  
    condition_variable until work is available or stop is requested.
28  

28  

29  
    Threads are started lazily on first post() via std::call_once to avoid
29  
    Threads are started lazily on first post() via std::call_once to avoid
30  
    spawning threads for pools that are constructed but never used. Each
30  
    spawning threads for pools that are constructed but never used. Each
31  
    thread is named with a configurable prefix plus index for debugger
31  
    thread is named with a configurable prefix plus index for debugger
32  
    visibility.
32  
    visibility.
33  

33  

34  
    Work tracking: on_work_started/on_work_finished maintain an atomic
34  
    Work tracking: on_work_started/on_work_finished maintain an atomic
35  
    outstanding_work_ counter. join() blocks until this counter reaches
35  
    outstanding_work_ counter. join() blocks until this counter reaches
36  
    zero, then signals workers to stop and joins threads.
36  
    zero, then signals workers to stop and joins threads.
37  

37  

38  
    Two shutdown paths:
38  
    Two shutdown paths:
39  
    - join(): waits for outstanding work to drain, then stops workers.
39  
    - join(): waits for outstanding work to drain, then stops workers.
40  
    - stop(): immediately signals workers to exit; queued work is abandoned.
40  
    - stop(): immediately signals workers to exit; queued work is abandoned.
41  
    - Destructor: stop() then join() (abandon + wait for threads).
41  
    - Destructor: stop() then join() (abandon + wait for threads).
42  
*/
42  
*/
43  

43  

44  
namespace boost {
44  
namespace boost {
45  
namespace capy {
45  
namespace capy {
46  

46  

47  
//------------------------------------------------------------------------------
47  
//------------------------------------------------------------------------------
48  

48  

49  
class thread_pool::impl
49  
class thread_pool::impl
50  
{
50  
{
51  
    struct work : detail::intrusive_queue<work>::node
51  
    struct work : detail::intrusive_queue<work>::node
52  
    {
52  
    {
53  
        std::coroutine_handle<> h_;
53  
        std::coroutine_handle<> h_;
54  

54  

55  
        explicit work(std::coroutine_handle<> h) noexcept
55  
        explicit work(std::coroutine_handle<> h) noexcept
56  
            : h_(h)
56  
            : h_(h)
57  
        {
57  
        {
58  
        }
58  
        }
59  

59  

60  
        void run()
60  
        void run()
61  
        {
61  
        {
62  
            auto h = h_;
62  
            auto h = h_;
63  
            delete this;
63  
            delete this;
64  
            h.resume();
64  
            h.resume();
65  
        }
65  
        }
66  

66  

67  
        void destroy()
67  
        void destroy()
68  
        {
68  
        {
69  
            auto h = h_;
69  
            auto h = h_;
70  
            delete this;
70  
            delete this;
71  
            if(h && h != std::noop_coroutine())
71  
            if(h && h != std::noop_coroutine())
72  
                h.destroy();
72  
                h.destroy();
73  
        }
73  
        }
74  
    };
74  
    };
75  

75  

76  
    std::mutex mutex_;
76  
    std::mutex mutex_;
77  
    std::condition_variable cv_;
77  
    std::condition_variable cv_;
78  
    detail::intrusive_queue<work> q_;
78  
    detail::intrusive_queue<work> q_;
79  
    std::vector<std::thread> threads_;
79  
    std::vector<std::thread> threads_;
80  
    std::atomic<std::size_t> outstanding_work_{0};
80  
    std::atomic<std::size_t> outstanding_work_{0};
81  
    bool stop_{false};
81  
    bool stop_{false};
82  
    bool joined_{false};
82  
    bool joined_{false};
83  
    std::size_t num_threads_;
83  
    std::size_t num_threads_;
84  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
84  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
85  
    std::once_flag start_flag_;
85  
    std::once_flag start_flag_;
86  

86  

87  
public:
87  
public:
88  
    ~impl()
88  
    ~impl()
89  
    {
89  
    {
90  
        while(auto* w = q_.pop())
90  
        while(auto* w = q_.pop())
91  
            w->destroy();
91  
            w->destroy();
92  
    }
92  
    }
93  

93  

94  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
94  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
95  
        : num_threads_(num_threads)
95  
        : num_threads_(num_threads)
96  
    {
96  
    {
97  
        if(num_threads_ == 0)
97  
        if(num_threads_ == 0)
98  
            num_threads_ = std::max(
98  
            num_threads_ = std::max(
99  
                std::thread::hardware_concurrency(), 1u);
99  
                std::thread::hardware_concurrency(), 1u);
100  

100  

101  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
101  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
102  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
102  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
103  
        thread_name_prefix_[n] = '\0';
103  
        thread_name_prefix_[n] = '\0';
104  
    }
104  
    }
105  

105  

106  
    void
106  
    void
107  
    post(std::coroutine_handle<> h)
107  
    post(std::coroutine_handle<> h)
108  
    {
108  
    {
109  
        ensure_started();
109  
        ensure_started();
110  
        auto* w = new work(h);
110  
        auto* w = new work(h);
111  
        {
111  
        {
112  
            std::lock_guard<std::mutex> lock(mutex_);
112  
            std::lock_guard<std::mutex> lock(mutex_);
113  
            q_.push(w);
113  
            q_.push(w);
114  
        }
114  
        }
115  
        cv_.notify_one();
115  
        cv_.notify_one();
116  
    }
116  
    }
117  

117  

118  
    void
118  
    void
119  
    on_work_started() noexcept
119  
    on_work_started() noexcept
120  
    {
120  
    {
121  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
121  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
122  
    }
122  
    }
123  

123  

124  
    void
124  
    void
125  
    on_work_finished() noexcept
125  
    on_work_finished() noexcept
126  
    {
126  
    {
127  
        if(outstanding_work_.fetch_sub(
127  
        if(outstanding_work_.fetch_sub(
128  
            1, std::memory_order_acq_rel) == 1)
128  
            1, std::memory_order_acq_rel) == 1)
129  
        {
129  
        {
130  
            std::lock_guard<std::mutex> lock(mutex_);
130  
            std::lock_guard<std::mutex> lock(mutex_);
131  
            if(joined_ && !stop_)
131  
            if(joined_ && !stop_)
132  
                stop_ = true;
132  
                stop_ = true;
133  
            cv_.notify_all();
133  
            cv_.notify_all();
134  
        }
134  
        }
135  
    }
135  
    }
136  

136  

137  
    void
137  
    void
138  
    join() noexcept
138  
    join() noexcept
139  
    {
139  
    {
140  
        {
140  
        {
141  
            std::unique_lock<std::mutex> lock(mutex_);
141  
            std::unique_lock<std::mutex> lock(mutex_);
142  
            if(joined_)
142  
            if(joined_)
143  
                return;
143  
                return;
144  
            joined_ = true;
144  
            joined_ = true;
145  

145  

146  
            if(outstanding_work_.load(
146  
            if(outstanding_work_.load(
147  
                std::memory_order_acquire) == 0)
147  
                std::memory_order_acquire) == 0)
148  
            {
148  
            {
149  
                stop_ = true;
149  
                stop_ = true;
150  
                cv_.notify_all();
150  
                cv_.notify_all();
151  
            }
151  
            }
152  
            else
152  
            else
153  
            {
153  
            {
154  
                cv_.wait(lock, [this]{
154  
                cv_.wait(lock, [this]{
155  
                    return stop_;
155  
                    return stop_;
156  
                });
156  
                });
157  
            }
157  
            }
158  
        }
158  
        }
159  

159  

160  
        for(auto& t : threads_)
160  
        for(auto& t : threads_)
161  
            if(t.joinable())
161  
            if(t.joinable())
162  
                t.join();
162  
                t.join();
163  
    }
163  
    }
164  

164  

165  
    void
165  
    void
166  
    stop() noexcept
166  
    stop() noexcept
167  
    {
167  
    {
168  
        {
168  
        {
169  
            std::lock_guard<std::mutex> lock(mutex_);
169  
            std::lock_guard<std::mutex> lock(mutex_);
170  
            stop_ = true;
170  
            stop_ = true;
171  
        }
171  
        }
172  
        cv_.notify_all();
172  
        cv_.notify_all();
173  
    }
173  
    }
174  

174  

175  
private:
175  
private:
176  
    void
176  
    void
177  
    ensure_started()
177  
    ensure_started()
178  
    {
178  
    {
179  
        std::call_once(start_flag_, [this]{
179  
        std::call_once(start_flag_, [this]{
180  
            threads_.reserve(num_threads_);
180  
            threads_.reserve(num_threads_);
181  
            for(std::size_t i = 0; i < num_threads_; ++i)
181  
            for(std::size_t i = 0; i < num_threads_; ++i)
182  
                threads_.emplace_back([this, i]{ run(i); });
182  
                threads_.emplace_back([this, i]{ run(i); });
183  
        });
183  
        });
184  
    }
184  
    }
185  

185  

186  
    void
186  
    void
187  
    run(std::size_t index)
187  
    run(std::size_t index)
188  
    {
188  
    {
189  
        // Build name; set_current_thread_name truncates to platform limits.
189  
        // Build name; set_current_thread_name truncates to platform limits.
190  
        char name[16];
190  
        char name[16];
191  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
191  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
192  
        set_current_thread_name(name);
192  
        set_current_thread_name(name);
193  

193  

194  
        for(;;)
194  
        for(;;)
195  
        {
195  
        {
196  
            work* w = nullptr;
196  
            work* w = nullptr;
197  
            {
197  
            {
198  
                std::unique_lock<std::mutex> lock(mutex_);
198  
                std::unique_lock<std::mutex> lock(mutex_);
199  
                cv_.wait(lock, [this]{
199  
                cv_.wait(lock, [this]{
200  
                    return !q_.empty() ||
200  
                    return !q_.empty() ||
201  
                        stop_;
201  
                        stop_;
202  
                });
202  
                });
203  
                if(stop_)
203  
                if(stop_)
204  
                    return;
204  
                    return;
205  
                w = q_.pop();
205  
                w = q_.pop();
206  
            }
206  
            }
207  
            if(w)
207  
            if(w)
208  
                w->run();
208  
                w->run();
209  
        }
209  
        }
210  
    }
210  
    }
211  
};
211  
};
212  

212  

213  
//------------------------------------------------------------------------------
213  
//------------------------------------------------------------------------------
214  

214  

215  
thread_pool::
215  
thread_pool::
216  
~thread_pool()
216  
~thread_pool()
217  
{
217  
{
218  
    impl_->stop();
218  
    impl_->stop();
219  
    impl_->join();
219  
    impl_->join();
220  
    shutdown();
220  
    shutdown();
221  
    destroy();
221  
    destroy();
222  
    delete impl_;
222  
    delete impl_;
223  
}
223  
}
224  

224  

225  
thread_pool::
225  
thread_pool::
226  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
226  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
227  
    : impl_(new impl(num_threads, thread_name_prefix))
227  
    : impl_(new impl(num_threads, thread_name_prefix))
228  
{
228  
{
229  
    this->set_frame_allocator(std::allocator<void>{});
229  
    this->set_frame_allocator(std::allocator<void>{});
230  
}
230  
}
231  

231  

232  
void
232  
void
233  
thread_pool::
233  
thread_pool::
234  
join() noexcept
234  
join() noexcept
235  
{
235  
{
236  
    impl_->join();
236  
    impl_->join();
237  
}
237  
}
238  

238  

239  
void
239  
void
240  
thread_pool::
240  
thread_pool::
241  
stop() noexcept
241  
stop() noexcept
242  
{
242  
{
243  
    impl_->stop();
243  
    impl_->stop();
244  
}
244  
}
245  

245  

246  
//------------------------------------------------------------------------------
246  
//------------------------------------------------------------------------------
247  

247  

248  
thread_pool::executor_type
248  
thread_pool::executor_type
249  
thread_pool::
249  
thread_pool::
250  
get_executor() const noexcept
250  
get_executor() const noexcept
251  
{
251  
{
252  
    return executor_type(
252  
    return executor_type(
253  
        const_cast<thread_pool&>(*this));
253  
        const_cast<thread_pool&>(*this));
254  
}
254  
}
255  

255  

256  
void
256  
void
257  
thread_pool::executor_type::
257  
thread_pool::executor_type::
258  
on_work_started() const noexcept
258  
on_work_started() const noexcept
259  
{
259  
{
260  
    pool_->impl_->on_work_started();
260  
    pool_->impl_->on_work_started();
261  
}
261  
}
262  

262  

263  
void
263  
void
264  
thread_pool::executor_type::
264  
thread_pool::executor_type::
265  
on_work_finished() const noexcept
265  
on_work_finished() const noexcept
266  
{
266  
{
267  
    pool_->impl_->on_work_finished();
267  
    pool_->impl_->on_work_finished();
268  
}
268  
}
269  

269  

270  
void
270  
void
271  
thread_pool::executor_type::
271  
thread_pool::executor_type::
272  
post(std::coroutine_handle<> h) const
272  
post(std::coroutine_handle<> h) const
273  
{
273  
{
274  
    pool_->impl_->post(h);
274  
    pool_->impl_->post(h);
275  
}
275  
}
276  

276  

277  
} // capy
277  
} // capy
278  
} // boost
278  
} // boost