Tacopie  3.0.0
Tacopie is a TCP Client & Server C++11 library.
io_service.hpp
1 // MIT License
2 //
3 // Copyright (c) 2016-2017 Simon Ninon <simon.ninon@gmail.com>
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to deal
7 // in the Software without restriction, including without limitation the rights
8 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9 // copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in all
13 // copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21 // SOFTWARE.
22 
23 #pragma once
24 
25 #include <atomic>
26 #include <condition_variable>
27 #include <functional>
28 #include <memory>
29 #include <mutex>
30 #include <thread>
31 #include <unordered_map>
32 #include <vector>
33 
34 #ifdef _WIN32
35 #include <Winsock2.h>
36 #else
37 #include <sys/select.h>
38 #endif /* _WIN32 */
39 
40 #include <tacopie/network/self_pipe.hpp>
41 #include <tacopie/network/tcp_socket.hpp>
42 #include <tacopie/utils/thread_pool.hpp>
43 
44 #ifndef __TACOPIE_IO_SERVICE_NB_WORKERS
45 #define __TACOPIE_IO_SERVICE_NB_WORKERS 1
46 #endif /* __TACOPIE_IO_SERVICE_NB_WORKERS */
47 
48 namespace tacopie {
49 
54 class io_service {
55 public:
59  io_service(void);
60 
62  ~io_service(void);
63 
65  io_service(const io_service&) = delete;
67  io_service& operator=(const io_service&) = delete;
68 
69 public:
77  void set_nb_workers(std::size_t nb_threads);
78 
79 public:
82  typedef std::function<void(fd_t)> event_callback_t;
83 
93  void track(const tcp_socket& socket, const event_callback_t& rd_callback = nullptr, const event_callback_t& wr_callback = nullptr);
94 
102  void set_rd_callback(const tcp_socket& socket, const event_callback_t& event_callback);
103 
111  void set_wr_callback(const tcp_socket& socket, const event_callback_t& event_callback);
112 
123  void untrack(const tcp_socket& socket);
124 
131  void wait_for_removal(const tcp_socket& socket);
132 
133 private:
144  struct tracked_socket {
146  tracked_socket(void)
147  : rd_callback(nullptr)
148  , wr_callback(nullptr) {}
149 
151  event_callback_t rd_callback;
152  std::atomic<bool> is_executing_rd_callback = ATOMIC_VAR_INIT(false);
153 
155  event_callback_t wr_callback;
156  std::atomic<bool> is_executing_wr_callback = ATOMIC_VAR_INIT(false);
157 
159  std::atomic<bool> marked_for_untrack = ATOMIC_VAR_INIT(false);
160  };
161 
162 private:
167  void poll(void);
168 
175  int init_poll_fds_info(void);
176 
181  void process_events(void);
182 
189  void process_rd_event(const fd_t& fd, tracked_socket& socket);
190 
197  void process_wr_event(const fd_t& fd, tracked_socket& socket);
198 
199 private:
203  std::unordered_map<fd_t, tracked_socket> m_tracked_sockets;
204 
208  std::atomic<bool> m_should_stop;
209 
213  std::thread m_poll_worker;
214 
218  utils::thread_pool m_callback_workers;
219 
223  std::mutex m_tracked_sockets_mtx;
224 
228  std::vector<fd_t> m_polled_fds;
229 
233  fd_set m_rd_set;
234 
238  fd_set m_wr_set;
239 
243  std::condition_variable m_wait_for_removal_condvar;
244 
248  tacopie::self_pipe m_notifier;
249 };
250 
256 const std::shared_ptr<io_service>& get_default_io_service(void);
257 
263 void set_default_io_service(const std::shared_ptr<io_service>& service);
264 
265 } // namespace tacopie
void set_nb_workers(std::size_t nb_threads)
~io_service(void)
dtor
std::function< void(fd_t)> event_callback_t
Definition: io_service.hpp:82
void untrack(const tcp_socket &socket)
io_service & operator=(const io_service &)=delete
assignment operator
Definition: tcp_socket.hpp:38
Definition: io_service.hpp:54
void wait_for_removal(const tcp_socket &socket)
Definition: self_pipe.hpp:33
Definition: io_service.hpp:48
void set_wr_callback(const tcp_socket &socket, const event_callback_t &event_callback)
void track(const tcp_socket &socket, const event_callback_t &rd_callback=nullptr, const event_callback_t &wr_callback=nullptr)
Definition: thread_pool.hpp:41
void set_rd_callback(const tcp_socket &socket, const event_callback_t &event_callback)