Robowflex  v0.1
Making MoveIt Easy
pool.h
Go to the documentation of this file.
1 /* Author: Zachary Kingston */
2 
3 #ifndef ROBOWFLEX_POOL_
4 #define ROBOWFLEX_POOL_
5 
6 #include <memory> // for std::shared_ptr
7 #include <thread> // for std::thread
8 #include <future> // for std::future / std::promise
9 #include <functional> // for std::function
10 #include <vector> // for std::vector
11 #include <queue> // for std::queue
12 
13 namespace robowflex
14 {
15  /**
16  * make_function implementation taken from:
17  * https://stackoverflow.com/questions/27825559/why-is-there-no-stdmake-function/27826081#27826081
18  * TODO: Investigate other APIs for the submit() function in robowflex::Pool.
19  */
20 
21  /** \cond IGNORE */
22 
23  template <typename T>
24  struct function_traits : public function_traits<decltype(&T::operator())>
25  {
26  };
27 
28  // for pointers to member function
29  template <typename C, typename RT, typename... Args>
30  struct function_traits<RT (C::*)(Args...) const>
31  {
32  enum
33  {
34  arity = sizeof...(Args)
35  };
36  typedef std::function<RT(Args...)> f_type;
37  };
38 
39  // for pointers to member function
40  template <typename C, typename RT, typename... Args>
41  struct function_traits<RT (C::*)(Args...)>
42  {
43  enum
44  {
45  arity = sizeof...(Args)
46  };
47  typedef std::function<RT(Args...)> f_type;
48  };
49 
50  // for function pointers
51  template <typename RT, typename... Args>
52  struct function_traits<RT (*)(Args...)>
53  {
54  enum
55  {
56  arity = sizeof...(Args)
57  };
58  typedef std::function<RT(Args...)> f_type;
59  };
60 
61  template <typename L>
62  static typename function_traits<L>::f_type make_function(L l)
63  {
64  return (typename function_traits<L>::f_type)(l);
65  }
66 
67  // handles bind & multiple function call operator()'s
68  template <typename RT, typename... Args, class T>
69  auto make_function(T &&t) -> std::function<decltype(RT(t(std::declval<Args>()...)))(Args...)>
70  {
71  return {std::forward<T>(t)};
72  }
73 
74  // handles explicit overloads
75  template <typename RT, typename... Args>
76  auto make_function(RT (*p)(Args...)) -> std::function<RT(Args...)>
77  {
78  return {p};
79  }
80 
81  // handles explicit overloads
82  template <typename RT, typename... Args, typename C>
83  auto make_function(RT (C::*p)(Args...)) -> std::function<RT(Args...)>
84  {
85  return {p};
86  }
87 
88  /* \endcond */
89 
90  /** \brief A thread pool that can execute arbitrary functions asynchronously.
91  * Functions with arguments to be executed are put in the queue through submit(). This returns a
92  * Pool::Job that can be used to retrieve the result or cancel the job if the result is no longer needed.
93  */
94  class Pool
95  {
96  public:
97  /** \brief Interface class for Pool::Job so template parameters are not needed for the queue.
98  */
99  class Joblet
100  {
101  public:
102  /** \brief Execute the underlying function.
103  */
104  virtual void execute() = 0;
105 
106  /** \brief Cancels this job.
107  */
108  void cancel();
109 
110  /** \brief Checks if this job has been cancled.
111  * \return True if the job is cancled, false otherwise.
112  */
113  bool isCancled() const;
114 
115  protected:
116  bool canceled{false}; ///< Whether the job is cancled or not.
117  };
118 
119  /** \brief A job that returns \a RT.
120  * \tparam RT Return type of function to be executed.
121  */
122  template <typename RT>
123  class Job : public Joblet
124  {
125  public:
126  /** \brief Constructor.
127  * \param[in] function Function to execute.
128  * \param[in] args Arguments to function.
129  * \tparam Args Types of the function arguments.
130  */
131  template <typename... Args>
132  Job(const std::function<RT(Args...)> &&function, Args &&... args)
133  : function_(std::bind(function, args...)), task_(function_), future_(task_.get_future())
134  {
135  }
136 
137  /** \brief Executes the task and stores the result in \a future_
138  */
139  void execute() override
140  {
141  task_();
142  }
143 
144  /** \brief Blocking call to retrieve the result of the function.
145  * Note that if the job was canceled it is not guaranteed that this function will return, as the
146  * job might never be executed.
147  * \return The result of the job.
148  */
149  RT get()
150  {
151  return future_.get();
152  }
153 
154  /** \brief Waits until result of the job is available.
155  */
156  void wait() const
157  {
158  future_.wait();
159  }
160 
161  /** \brief Returns true if the task is done, false otherwise.
162  * \return True if task is done, false otherwise.
163  */
164  bool isDone() const
165  {
166  return waitFor(0);
167  }
168 
169  /** \brief Waits for a number of seconds to see if the task completes.
170  * \return True if task is complete, false otherwise.
171  */
172  bool waitFor(double time) const
173  {
174  return future_.wait_for(std::chrono::duration<double>(time)) == std::future_status::ready;
175  }
176 
177  private:
178  std::function<RT()> function_; ///< Bound function to execute.
179  std::packaged_task<RT()> task_; ///< Task of function.
180  std::future<RT> future_; ///< Future of function result.
181  };
182 
183  /** \brief Constructor.
184  * \param[in] n The number of threads to use. By default uses available hardware threads.
185  */
186  Pool(unsigned int n = std::thread::hardware_concurrency());
187 
188  /** \brief Destructor.
189  * Cancels all threads and joins them.
190  */
191  ~Pool();
192 
193  /** \brief Get the number of threads.
194  * \return The number of threads.
195  */
196  unsigned int getThreadCount() const;
197 
198  /** \brief Submit a function with arguments to be processed by the thread pool.
199  * Submitted functions must be wrapped with robowflex::make_function() or be a std::function type so
200  * argument template deduction works.
201  * \param[in] function Function to execute.
202  * \param[in] args Arguments to the function.
203  * \tparam RT Return type of function.
204  * \tparam Args Types of the arguments to the function.
205  * \return A job that contains information about the submitted function. This job can be canceled,
206  * which results in no execution of the function by the queue.
207  */
208  template <typename RT, typename... Args>
209  std::shared_ptr<Job<RT>> submit(const std::function<RT(Args...)> &&function, Args &&... args) const
210  {
211  auto job = std::make_shared<Job<RT>>(std::forward<const std::function<RT(Args...)>>(function),
212  std::forward<Args>(args)...);
213 
214  {
216  jobs_.emplace(job);
217 
218  cv_.notify_one();
219  }
220 
221  return job;
222  }
223 
224  /** \brief Background thread process.
225  * Executes jobs submitted from submit().
226  */
227  void run();
228 
229  private:
230  bool active_{false}; ///< Is thread pool active?
231  mutable std::mutex mutex_; ///< Job queue mutex.
232  mutable std::condition_variable cv_; ///< Job queue condition variable.
233 
235  mutable std::queue<std::shared_ptr<Joblet>> jobs_; ///< Jobs to execute.
236  };
237 } // namespace robowflex
238 
239 #endif
A job that returns RT.
Definition: pool.h:124
RT get()
Blocking call to retrieve the result of the function. Note that if the job was canceled it is not gua...
Definition: pool.h:149
std::future< RT > future_
Future of function result.
Definition: pool.h:180
std::function< RT()> function_
Bound function to execute.
Definition: pool.h:178
bool isDone() const
Returns true if the task is done, false otherwise.
Definition: pool.h:164
void wait() const
Waits until result of the job is available.
Definition: pool.h:156
Job(const std::function< RT(Args...)> &&function, Args &&... args)
Constructor.
Definition: pool.h:132
bool waitFor(double time) const
Waits for a number of seconds to see if the task completes.
Definition: pool.h:172
std::packaged_task< RT()> task_
Task of function.
Definition: pool.h:179
void execute() override
Executes the task and stores the result in future_.
Definition: pool.h:139
Interface class for Pool::Job so template parameters are not needed for the queue.
Definition: pool.h:100
virtual void execute()=0
Execute the underlying function.
bool isCancled() const
Checks if this job has been cancled.
Definition: pool.cpp:16
bool canceled
Whether the job is cancled or not.
Definition: pool.h:116
void cancel()
Cancels this job.
Definition: pool.cpp:11
A thread pool that can execute arbitrary functions asynchronously. Functions with arguments to be exe...
Definition: pool.h:95
std::mutex mutex_
Job queue mutex.
Definition: pool.h:231
unsigned int getThreadCount() const
Get the number of threads.
Definition: pool.cpp:40
bool active_
Is thread pool active?
Definition: pool.h:230
~Pool()
Destructor. Cancels all threads and joins them.
Definition: pool.cpp:31
Pool(unsigned int n=std::thread::hardware_concurrency())
Constructor.
Definition: pool.cpp:25
std::condition_variable cv_
Job queue condition variable.
Definition: pool.h:232
std::vector< std::thread > threads_
Threads.
Definition: pool.h:234
void run()
Background thread process. Executes jobs submitted from submit().
Definition: pool.cpp:45
std::queue< std::shared_ptr< Joblet > > jobs_
Jobs to execute.
Definition: pool.h:235
std::shared_ptr< Job< RT > > submit(const std::function< RT(Args...)> &&function, Args &&... args) const
Submit a function with arguments to be processed by the thread pool. Submitted functions must be wrap...
Definition: pool.h:209
T forward(T... args)
T get(T... args)
T hardware_concurrency(T... args)
Main namespace. Contains all library classes and functions.
Definition: scene.cpp:25
T wait_for(T... args)
T wait(T... args)