Ardour  9.2-129-gdf5e1050bd
threader.h
Go to the documentation of this file.
1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
3 
4 #include <atomic>
5 #include <vector>
6 #include <algorithm>
7 
8 #include <sigc++/slot.h>
9 #include <sigc++/bind.h>
10 
11 #include "pbd/atomic.h"
12 #include "pbd/compose.h"
13 #include "pbd/mutex.h"
14 #include "pbd/thread_pool.h"
15 
17 #include "audiographer/source.h"
18 #include "audiographer/sink.h"
19 #include "audiographer/exception.h"
20 
21 namespace AudioGrapher
22 {
23 
25 class /*LIBAUDIOGRAPHER_API*/ ThreaderException : public Exception
26 {
27  public:
28  template<typename T>
29  ThreaderException (T const & thrower, std::exception const & e)
30  : Exception (thrower, string_compose ("\n\t- Dynamic type: %1\n\t- what(): %2", DebugUtils::demangled_name (e), e.what()))
31  { }
32 };
33 
35 template <typename T = DefaultSampleType>
36 class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
37 {
38  private:
39  typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
40 
41  public:
42 
48  Threader (PBD::ThreadPool& thread_pool, long wait_timeout_milliseconds = 500)
50  , wait_timeout (wait_timeout_milliseconds)
51  {
52  readers.store (0);
53  }
54 
55  virtual ~Threader () {}
56 
58  void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
59 
61  void clear_outputs () { outputs.clear (); }
62 
64  void remove_output (typename Source<T>::SinkPtr output) {
65  typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
66  outputs.erase (new_end, outputs.end());
67  }
68 
70  void process (ProcessContext<T> const & c)
71  {
72  wait_mutex.lock();
73 
74  exception.reset();
75 
76  unsigned int outs = outputs.size();
77  (void) readers.fetch_add (outs);
78  for (unsigned int i = 0; i < outs; ++i) {
79  thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
80  }
81 
82  wait();
83  }
84 
85  using Sink<T>::process;
86 
87  private:
88 
89  void wait()
90  {
91  while (readers.load () != 0) {
92  wait_cond.wait_for (wait_mutex, std::chrono::milliseconds (wait_timeout));
93  }
94 
96 
97  if (exception) {
98  throw *exception;
99  }
100  }
101 
102  void process_output(ProcessContext<T> const & c, unsigned int output)
103  {
104  try {
105  outputs[output]->process (c);
106  } catch (std::exception const & e) {
107  // Only first exception will be passed on
109  if(!exception) { exception.reset (new ThreaderException (*this, e)); }
111  }
112 
114  wait_cond.signal();
115  }
116  }
117 
119 
121 
124 
125  std::atomic<int> readers;
127 
129  std::shared_ptr<ThreaderException> exception;
130 
131 };
132 
133 } // namespace
134 
135 #endif //AUDIOGRAPHER_THREADER_H
const char * what() const
Definition: exception.h:28
std::shared_ptr< Sink< T > > SinkPtr
Class that stores exceptions thrown from different threads.
Definition: threader.h:26
ThreaderException(T const &thrower, std::exception const &e)
Definition: threader.h:29
Class for distributing processing across several threads.
Definition: threader.h:37
Threader(PBD::ThreadPool &thread_pool, long wait_timeout_milliseconds=500)
Definition: threader.h:48
PBD::Mutex wait_mutex
Definition: threader.h:122
PBD::Mutex exception_mutex
Definition: threader.h:128
void add_output(typename Source< T >::SinkPtr output)
Adds output RT safe.
Definition: threader.h:58
void process(ProcessContext< T > const &c)
Processes context concurrently by scheduling each output separately to the given thread pool.
Definition: threader.h:70
virtual ~Threader()
Definition: threader.h:55
std::vector< typename Source< T >::SinkPtr > OutputVec
Definition: threader.h:39
void clear_outputs()
Clears outputs RT safe.
Definition: threader.h:61
void remove_output(typename Source< T >::SinkPtr output)
Removes a specific output RT safe.
Definition: threader.h:64
std::shared_ptr< ThreaderException > exception
Definition: threader.h:129
void process_output(ProcessContext< T > const &c, unsigned int output)
Definition: threader.h:102
std::atomic< int > readers
Definition: threader.h:125
PBD::ThreadPool & thread_pool
Definition: threader.h:120
void signal()
Definition: mutex.h:112
bool wait_for(Mutex &mutex, std::chrono::milliseconds const &rel_time)
Definition: mutex.h:128
void lock()
Definition: mutex.h:46
void unlock()
Definition: mutex.h:56
void push(std::function< void()> task)
Definition: thread_pool.h:76
std::string string_compose(const std::string &fmt, const T1 &o1)
Definition: compose.h:246
bool atomic_dec_and_test(std::atomic< T > &aval)
Definition: atomic.h:26
std::string demangled_name(T const &obj)
Definition: demangle.h:45
Utilities for debugging.
Definition: debug_utils.h:21