pion  5.0.6
scheduler.cpp
1 // ---------------------------------------------------------------------
2 // pion: a Boost C++ framework for building lightweight HTTP interfaces
3 // ---------------------------------------------------------------------
4 // Copyright (C) 2007-2014 Splunk Inc. (https://github.com/splunk/pion)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #include <boost/exception/diagnostic_information.hpp>
11 #include <boost/date_time/posix_time/posix_time_duration.hpp>
12 #include <pion/scheduler.hpp>
13 
14 namespace pion { // begin namespace pion
15 
16 
17 // static members of scheduler
18 
19 const boost::uint32_t scheduler::DEFAULT_NUM_THREADS = 8;
20 const boost::uint32_t scheduler::NSEC_IN_SECOND = 1000000000; // (10^9)
21 const boost::uint32_t scheduler::MICROSEC_IN_SECOND = 1000000; // (10^6)
22 const boost::uint32_t scheduler::KEEP_RUNNING_TIMER_SECONDS = 5;
23 
24 
25 // scheduler member functions
26 
28 {
29  // lock mutex for thread safety
30  boost::mutex::scoped_lock scheduler_lock(m_mutex);
31 
32  if (m_is_running) {
33 
34  PION_LOG_INFO(m_logger, "Shutting down the thread scheduler");
35 
36  while (m_active_users > 0) {
37  // first, wait for any active users to exit
38  PION_LOG_INFO(m_logger, "Waiting for " << m_active_users << " scheduler users to finish");
39  m_no_more_active_users.wait(scheduler_lock);
40  }
41 
42  // shut everything down
43  m_is_running = false;
44  stop_services();
45  stop_threads();
48 
49  PION_LOG_INFO(m_logger, "The thread scheduler has shutdown");
50 
51  // Make sure anyone waiting on shutdown gets notified
52  m_scheduler_has_stopped.notify_all();
53 
54  } else {
55 
56  // stop and finish everything to be certain that no events are pending
57  stop_services();
58  stop_threads();
61 
62  // Make sure anyone waiting on shutdown gets notified
63  // even if the scheduler did not startup successfully
64  m_scheduler_has_stopped.notify_all();
65  }
66 }
67 
68 void scheduler::join(void)
69 {
70  boost::mutex::scoped_lock scheduler_lock(m_mutex);
71  while (m_is_running) {
72  // sleep until scheduler_has_stopped condition is signaled
73  m_scheduler_has_stopped.wait(scheduler_lock);
74  }
75 }
76 
77 void scheduler::keep_running(boost::asio::io_service& my_service,
78  boost::asio::deadline_timer& my_timer)
79 {
80  if (m_is_running) {
81  // schedule this again to make sure the service doesn't complete
82  my_timer.expires_from_now(boost::posix_time::seconds(KEEP_RUNNING_TIMER_SECONDS));
83  my_timer.async_wait(boost::bind(&scheduler::keep_running, this,
84  boost::ref(my_service), boost::ref(my_timer)));
85  }
86 }
87 
89 {
90  if (!m_is_running) startup();
91  boost::mutex::scoped_lock scheduler_lock(m_mutex);
93 }
94 
96 {
97  boost::mutex::scoped_lock scheduler_lock(m_mutex);
98  if (--m_active_users == 0)
99  m_no_more_active_users.notify_all();
100 }
101 
102 boost::system_time scheduler::get_wakeup_time(boost::uint32_t sleep_sec,
103  boost::uint32_t sleep_nsec)
104 {
105  return boost::get_system_time() + boost::posix_time::seconds(sleep_sec) + boost::posix_time::microseconds(sleep_nsec / 1000);
106 }
107 
108 void scheduler::process_service_work(boost::asio::io_service& service) {
109  while (m_is_running) {
110  try {
111  service.run();
112  } catch (std::exception& e) {
113  PION_LOG_ERROR(m_logger, boost::diagnostic_information(e));
114  } catch (...) {
115  PION_LOG_ERROR(m_logger, "caught unrecognized exception");
116  }
117  }
118 }
119 
120 
121 // single_service_scheduler member functions
122 
124 {
125  // lock mutex for thread safety
126  boost::mutex::scoped_lock scheduler_lock(m_mutex);
127 
128  if (! m_is_running) {
129  PION_LOG_INFO(m_logger, "Starting thread scheduler");
130  m_is_running = true;
131 
132  // schedule a work item to make sure that the service doesn't complete
133  m_service.reset();
135 
136  // start multiple threads to handle async tasks
137  for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
138  boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
139  this, boost::ref(m_service)) ));
140  m_thread_pool.push_back(new_thread);
141  }
142  }
143 }
144 
145 
146 // one_to_one_scheduler member functions
147 
149 {
150  // lock mutex for thread safety
151  boost::mutex::scoped_lock scheduler_lock(m_mutex);
152 
153  if (! m_is_running) {
154  PION_LOG_INFO(m_logger, "Starting thread scheduler");
155  m_is_running = true;
156 
157  // make sure there are enough services initialized
158  while (m_service_pool.size() < m_num_threads) {
159  boost::shared_ptr<service_pair_type> service_ptr(new service_pair_type());
160  m_service_pool.push_back(service_ptr);
161  }
162 
163  // schedule a work item for each service to make sure that it doesn't complete
164  for (service_pool_type::iterator i = m_service_pool.begin(); i != m_service_pool.end(); ++i) {
165  keep_running((*i)->first, (*i)->second);
166  }
167 
168  // start multiple threads to handle async tasks
169  for (boost::uint32_t n = 0; n < m_num_threads; ++n) {
170  boost::shared_ptr<boost::thread> new_thread(new boost::thread( boost::bind(&scheduler::process_service_work,
171  this, boost::ref(m_service_pool[n]->first)) ));
172  m_thread_pool.push_back(new_thread);
173  }
174  }
175 }
176 
177 
178 } // end namespace pion
boost::condition m_scheduler_has_stopped
condition triggered when the scheduler has stopped
Definition: scheduler.hpp:183
virtual void stop_threads(void)
stops all threads used to perform work
Definition: scheduler.hpp:151
virtual void stop_services(void)
stops all services used to schedule work
Definition: scheduler.hpp:148
bool m_is_running
true if the thread scheduler is running
Definition: scheduler.hpp:192
static const boost::uint32_t DEFAULT_NUM_THREADS
default number of worker threads in the thread pool
Definition: scheduler.hpp:161
boost::asio::deadline_timer m_timer
timer used to periodically check for shutdown
Definition: scheduler.hpp:279
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
Definition: scheduler.cpp:148
virtual void finish_services(void)
finishes all services used to schedule work
Definition: scheduler.hpp:154
boost::condition m_no_more_active_users
condition triggered when there are no more active users
Definition: scheduler.hpp:180
boost::asio::io_service m_service
service used to manage async I/O events
Definition: scheduler.hpp:276
static boost::system_time get_wakeup_time(boost::uint32_t sleep_sec, boost::uint32_t sleep_nsec)
Definition: scheduler.cpp:102
static const boost::uint32_t MICROSEC_IN_SECOND
number of microseconds in one full second (10 ^ 6)
Definition: scheduler.hpp:167
void remove_active_user(void)
unregisters an active user with the thread scheduler
Definition: scheduler.cpp:95
boost::uint32_t m_num_threads
total number of worker threads in the pool
Definition: scheduler.hpp:186
ThreadPool m_thread_pool
pool of threads used to perform work
Definition: scheduler.hpp:239
boost::mutex m_mutex
mutex to make class thread-safe
Definition: scheduler.hpp:174
static const boost::uint32_t NSEC_IN_SECOND
number of nanoseconds in one full second (10 ^ 9)
Definition: scheduler.hpp:164
logger m_logger
primary logging interface used by this class
Definition: scheduler.hpp:177
virtual void shutdown(void)
Stops the thread scheduler (this is called automatically when the program exits)
Definition: scheduler.cpp:27
boost::uint32_t m_active_users
the scheduler will not shutdown until there are no more active users
Definition: scheduler.hpp:189
static const boost::uint32_t KEEP_RUNNING_TIMER_SECONDS
number of seconds a timer should wait for to keep the IO services running
Definition: scheduler.hpp:170
typedef for a pair object where first is an IO service and second is a deadline timer ...
Definition: scheduler.hpp:342
void keep_running(boost::asio::io_service &my_service, boost::asio::deadline_timer &my_timer)
Definition: scheduler.cpp:77
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
Definition: scheduler.hpp:49
void add_active_user(void)
Definition: scheduler.cpp:88
virtual void startup(void)
Starts the thread scheduler (this is called automatically when necessary)
Definition: scheduler.cpp:123
service_pool_type m_service_pool
pool of IO services used to schedule work
Definition: scheduler.hpp:353
void join(void)
the calling thread will sleep until the scheduler has stopped
Definition: scheduler.cpp:68
void process_service_work(boost::asio::io_service &service)
processes work passed to the asio service & handles uncaught exceptions
Definition: scheduler.cpp:108
virtual void finish_threads(void)
finishes all threads used to perform work
Definition: scheduler.hpp:157