31 #include <cpp_redis/core/sentinel.hpp> 32 #include <cpp_redis/network/redis_connection.hpp> 33 #include <cpp_redis/network/tcp_client_iface.hpp> 45 #ifndef __CPP_REDIS_USE_CUSTOM_TCP_CLIENT 55 explicit subscriber(
const std::shared_ptr<network::tcp_client_iface>& tcp_client);
90 typedef std::function<void(const std::string& host, std::size_t port, connect_state status)>
connect_callback_t;
103 const std::string& host =
"127.0.0.1",
104 std::size_t port = 6379,
105 const connect_callback_t& connect_callback =
nullptr,
106 std::uint32_t timeout_msecs = 0,
107 std::int32_t max_reconnects = 0,
108 std::uint32_t reconnect_interval_msecs = 0);
120 const std::string& name,
121 const connect_callback_t& connect_callback =
nullptr,
122 std::uint32_t timeout_msecs = 0,
123 std::int32_t max_reconnects = 0,
124 std::uint32_t reconnect_interval_msecs = 0);
136 void disconnect(
bool wait_for_removal =
false);
164 subscriber&
auth(
const std::string& password,
const reply_callback_t& reply_callback =
nullptr);
189 subscriber&
subscribe(
const std::string& channel,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback =
nullptr);
202 subscriber&
psubscribe(
const std::string& pattern,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback =
nullptr);
238 void add_sentinel(
const std::string& host, std::size_t port, std::uint32_t timeout_msecs = 0);
264 struct callback_holder {
265 subscribe_callback_t subscribe_callback;
266 acknowledgement_callback_t acknowledgement_callback;
291 void handle_acknowledgement_reply(
const std::vector<reply>&
reply);
299 void handle_subscribe_reply(
const std::vector<reply>&
reply);
307 void handle_psubscribe_reply(
const std::vector<reply>&
reply);
317 void call_acknowledgement_callback(
const std::string& channel,
const std::map<std::string, callback_holder>& channels, std::mutex& channels_mtx, int64_t nb_chans);
324 void reconnect(
void);
334 void re_subscribe(
void);
339 bool should_reconnect(
void)
const;
344 void sleep_before_next_reconnect_attempt(
void);
349 void clear_subscriptions(
void);
360 void unprotected_subscribe(
const std::string& channel,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback);
370 void unprotected_psubscribe(
const std::string& pattern,
const subscribe_callback_t& callback,
const acknowledgement_callback_t& acknowledgement_callback);
376 std::string m_redis_server;
380 std::size_t m_redis_port = 0;
384 std::string m_master_name;
388 std::string m_password;
403 std::uint32_t m_connect_timeout_msecs = 0;
407 std::int32_t m_max_reconnects = 0;
411 std::int32_t m_current_reconnect_attempts = 0;
415 std::uint32_t m_reconnect_interval_msecs = 0;
420 std::atomic_bool m_reconnecting;
424 std::atomic_bool m_cancel;
429 std::map<std::string, callback_holder> m_subscribed_channels;
433 std::map<std::string, callback_holder> m_psubscribed_channels;
438 connect_callback_t m_connect_callback;
443 std::mutex m_psubscribed_channels_mutex;
447 std::mutex m_subscribed_channels_mutex;
452 reply_callback_t m_auth_reply_callback;
void clear_sentinels(void)
Definition: redis_connection.hpp:45
Definition: subscriber.hpp:43
std::function< void(const std::string &host, std::size_t port, connect_state status)> connect_callback_t
Definition: subscriber.hpp:90
subscriber & psubscribe(const std::string &pattern, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
subscriber & commit(void)
std::function< void(reply &)> reply_callback_t
Definition: subscriber.hpp:153
subscriber & operator=(const subscriber &)=delete
assignment operator
bool is_reconnecting(void) const
std::function< void(int64_t)> acknowledgement_callback_t
Definition: subscriber.hpp:176
connect_state
Definition: subscriber.hpp:76
void cancel_reconnect(void)
subscriber & auth(const std::string &password, const reply_callback_t &reply_callback=nullptr)
Definition: sentinel.hpp:40
void add_sentinel(const std::string &host, std::size_t port, std::uint32_t timeout_msecs=0)
void disconnect(bool wait_for_removal=false)
const sentinel & get_sentinel(void) const
subscriber & punsubscribe(const std::string &pattern)
subscriber & unsubscribe(const std::string &channel)
void connect(const std::string &host="127.0.0.1", std::size_t port=6379, const connect_callback_t &connect_callback=nullptr, std::uint32_t timeout_msecs=0, std::int32_t max_reconnects=0, std::uint32_t reconnect_interval_msecs=0)
std::function< void(const std::string &, const std::string &)> subscribe_callback_t
Definition: subscriber.hpp:170
bool is_connected(void) const
subscriber & subscribe(const std::string &channel, const subscribe_callback_t &callback, const acknowledgement_callback_t &acknowledgement_callback=nullptr)
Definition: array_builder.hpp:29