Ardour  8.12
threader.h
Go to the documentation of this file.
1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
3 
4 #include <atomic>
5 #include <vector>
6 #include <algorithm>
7 
8 #include <glibmm/threadpool.h>
9 #include <glibmm/timeval.h>
10 #include <sigc++/slot.h>
11 #include <boost/format.hpp>
12 
13 #include <glib.h>
14 
15 #include "pbd/atomic.h"
16 
18 #include "audiographer/source.h"
19 #include "audiographer/sink.h"
20 #include "audiographer/exception.h"
21 
22 namespace AudioGrapher
23 {
24 
26 class /*LIBAUDIOGRAPHER_API*/ ThreaderException : public Exception
27 {
28  public:
29  template<typename T>
30  ThreaderException (T const & thrower, std::exception const & e)
31  : Exception (thrower,
32  boost::str ( boost::format
33  ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
34  % DebugUtils::demangled_name (e) % e.what() ))
35  { }
36 };
37 
39 template <typename T = DefaultSampleType>
40 class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
41 {
42  private:
43  typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
44 
45  public:
46 
52  Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
54  , wait_timeout (wait_timeout_milliseconds)
55  {
56  readers.store (0);
57  }
58 
59  virtual ~Threader () {}
60 
62  void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
63 
65  void clear_outputs () { outputs.clear (); }
66 
68  void remove_output (typename Source<T>::SinkPtr output) {
69  typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
70  outputs.erase (new_end, outputs.end());
71  }
72 
74  void process (ProcessContext<T> const & c)
75  {
76  wait_mutex.lock();
77 
78  exception.reset();
79 
80  unsigned int outs = outputs.size();
81  (void) readers.fetch_add (outs);
82  for (unsigned int i = 0; i < outs; ++i) {
83  thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
84  }
85 
86  wait();
87  }
88 
89  using Sink<T>::process;
90 
91  private:
92 
93  void wait()
94  {
95  while (readers.load () != 0) {
96  gint64 end_time = g_get_monotonic_time () + (wait_timeout * G_TIME_SPAN_MILLISECOND);
97  wait_cond.wait_until(wait_mutex, end_time);
98  }
99 
100  wait_mutex.unlock();
101 
102  if (exception) {
103  throw *exception;
104  }
105  }
106 
107  void process_output(ProcessContext<T> const & c, unsigned int output)
108  {
109  try {
110  outputs[output]->process (c);
111  } catch (std::exception const & e) {
112  // Only first exception will be passed on
113  exception_mutex.lock();
114  if(!exception) { exception.reset (new ThreaderException (*this, e)); }
115  exception_mutex.unlock();
116  }
117 
119  wait_cond.signal();
120  }
121  }
122 
124 
125  Glib::ThreadPool& thread_pool;
126  Glib::Threads::Mutex wait_mutex;
127  Glib::Threads::Cond wait_cond;
128 
129  std::atomic<int> readers;
131 
132  Glib::Threads::Mutex exception_mutex;
133  std::shared_ptr<ThreaderException> exception;
134 
135 };
136 
137 } // namespace
138 
139 #endif //AUDIOGRAPHER_THREADER_H
const char * what() const
Definition: exception.h:30
std::shared_ptr< Sink< T > > SinkPtr
Class that stores exceptions thrown from different threads.
Definition: threader.h:27
ThreaderException(T const &thrower, std::exception const &e)
Definition: threader.h:30
Class for distributing processing across several threads.
Definition: threader.h:41
Glib::ThreadPool & thread_pool
Definition: threader.h:125
Glib::Threads::Mutex wait_mutex
Definition: threader.h:126
void add_output(typename Source< T >::SinkPtr output)
Adds output RT safe.
Definition: threader.h:62
Glib::Threads::Mutex exception_mutex
Definition: threader.h:132
void process(ProcessContext< T > const &c)
Processes context concurrently by scheduling each output separately to the given thread pool.
Definition: threader.h:74
Threader(Glib::ThreadPool &thread_pool, long wait_timeout_milliseconds=500)
Definition: threader.h:52
virtual ~Threader()
Definition: threader.h:59
std::vector< typename Source< T >::SinkPtr > OutputVec
Definition: threader.h:43
void clear_outputs()
Clears outputs RT safe.
Definition: threader.h:65
void remove_output(typename Source< T >::SinkPtr output)
Removes a specific output RT safe.
Definition: threader.h:68
std::shared_ptr< ThreaderException > exception
Definition: threader.h:133
void process_output(ProcessContext< T > const &c, unsigned int output)
Definition: threader.h:107
Glib::Threads::Cond wait_cond
Definition: threader.h:127
std::atomic< int > readers
Definition: threader.h:129
bool atomic_dec_and_test(std::atomic< T > &aval)
Definition: atomic.h:27
std::string demangled_name(T const &obj)
Definition: demangle.h:46
Utilities for debugging.
Definition: debug_utils.h:21