15 #ifndef RCLCPP__CLIENT_HPP_
16 #define RCLCPP__CLIENT_HPP_
27 #include <unordered_map>
33 #include "rcl/error_handling.h"
34 #include "rcl/event_callback.h"
35 #include "rcl/service_introspection.h"
38 #include "rclcpp/clock.hpp"
39 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
40 #include "rclcpp/exceptions.hpp"
41 #include "rclcpp/expand_topic_or_service_name.hpp"
42 #include "rclcpp/function_traits.hpp"
43 #include "rclcpp/logging.hpp"
44 #include "rclcpp/macros.hpp"
45 #include "rclcpp/node_interfaces/node_graph_interface.hpp"
46 #include "rclcpp/qos.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/utilities.hpp"
49 #include "rclcpp/visibility_control.hpp"
51 #include "rmw/error_handling.h"
52 #include "rmw/impl/cpp/demangle.hpp"
60 template<
typename FutureT>
67 : future(std::move(impl)), request_id(req_id)
71 operator FutureT &() {
return this->future;}
76 auto get() {
return this->future.get();}
78 bool valid() const noexcept {
return this->future.valid();}
80 void wait()
const {
return this->future.wait();}
82 template<
class Rep,
class Period>
84 const std::chrono::duration<Rep, Period> & timeout_duration)
const
86 return this->future.wait_for(timeout_duration);
89 template<
class Clock,
class Duration>
91 const std::chrono::time_point<Clock, Duration> & timeout_time)
const
93 return this->future.wait_until(timeout_time);
111 template<
typename PendingRequestsT,
typename AllocatorT = std::allocator<
int64_t>>
113 prune_requests_older_than_impl(
114 PendingRequestsT & pending_requests,
115 std::mutex & pending_requests_mutex,
116 std::chrono::time_point<std::chrono::system_clock> time_point,
117 std::vector<int64_t, AllocatorT> * pruned_requests =
nullptr)
119 std::lock_guard guard(pending_requests_mutex);
120 auto old_size = pending_requests.size();
121 for (
auto it = pending_requests.begin(), last = pending_requests.end(); it != last; ) {
122 if (it->second.first < time_point) {
123 if (pruned_requests) {
124 pruned_requests->push_back(it->first);
126 it = pending_requests.erase(it);
131 return old_size - pending_requests.size();
135 namespace node_interfaces
137 class NodeBaseInterface;
143 RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(
ClientBase)
148 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);
186 std::shared_ptr<rcl_client_t>
195 std::shared_ptr<const rcl_client_t>
211 template<
typename RepT =
int64_t,
typename RatioT = std::milli>
214 std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
216 return wait_for_service_nanoseconds(
217 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
221 virtual std::shared_ptr<void> create_response() = 0;
222 virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
223 virtual void handle_response(
224 std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) = 0;
302 throw std::invalid_argument(
303 "The callback passed to set_on_new_response_callback "
308 [callback,
this](
size_t number_of_responses) {
310 callback(number_of_responses);
311 }
catch (
const std::exception & exception) {
314 "rclcpp::ClientBase@" <<
this <<
315 " caught " << rmw::impl::cpp::demangle(exception) <<
316 " exception in user-provided callback for the 'on new response' callback: " <<
321 "rclcpp::ClientBase@" <<
this <<
322 " caught unhandled exception in user-provided callback " <<
323 "for the 'on new response' callback");
327 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
333 rclcpp::detail::cpp_callback_trampoline<decltype(new_callback),
const void *,
size_t>,
334 static_cast<const void *
>(&new_callback));
337 on_new_response_callback_ = new_callback;
341 rclcpp::detail::cpp_callback_trampoline<
342 decltype(on_new_response_callback_),
const void *,
size_t>,
343 static_cast<const void *
>(&on_new_response_callback_));
350 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
351 if (on_new_response_callback_) {
353 on_new_response_callback_ =
nullptr;
362 wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);
366 get_rcl_node_handle();
370 get_rcl_node_handle()
const;
376 rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
377 std::shared_ptr<rcl_node_t> node_handle_;
378 std::shared_ptr<rclcpp::Context> context_;
381 std::recursive_mutex callback_mutex_;
386 std::function<void(
size_t)> on_new_response_callback_{
nullptr};
388 std::shared_ptr<rcl_client_t> client_handle_;
390 std::atomic<bool> in_use_by_wait_set_{
false};
393 template<
typename ServiceT>
397 using Request =
typename ServiceT::Request;
398 using Response =
typename ServiceT::Response;
400 using SharedRequest =
typename ServiceT::Request::SharedPtr;
401 using SharedResponse =
typename ServiceT::Response::SharedPtr;
403 using Promise = std::promise<SharedResponse>;
404 using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;
406 using SharedPromise = std::shared_ptr<Promise>;
407 using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
409 using Future = std::future<SharedResponse>;
410 using SharedFuture = std::shared_future<SharedResponse>;
411 using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
413 using CallbackType = std::function<void (SharedFuture)>;
414 using CallbackWithRequestType = std::function<void (SharedFutureWithRequest)>;
416 RCLCPP_SMART_PTR_DEFINITIONS(
Client)
434 SharedFuture
share() noexcept {
return this->future.share();}
463 std::shared_future<std::pair<SharedRequest, SharedResponse>>
480 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
481 const std::string & service_name,
484 srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
488 this->get_rcl_node_handle(),
489 srv_type_support_handle_,
490 service_name.c_str(),
494 auto rcl_node_handle = this->get_rcl_node_handle();
503 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create client");
525 take_response(
typename ServiceT::Response & response_out, rmw_request_id_t & request_header_out)
534 std::shared_ptr<void>
537 return std::shared_ptr<void>(
new typename ServiceT::Response());
544 std::shared_ptr<rmw_request_id_t>
549 return std::shared_ptr<rmw_request_id_t>(
new rmw_request_id_t);
559 std::shared_ptr<rmw_request_id_t> request_header,
560 std::shared_ptr<void> response)
override
562 std::optional<CallbackInfoVariant>
563 optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number);
564 if (!optional_pending_request) {
567 auto & value = *optional_pending_request;
568 auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
569 std::move(response));
570 if (std::holds_alternative<Promise>(value)) {
571 auto & promise = std::get<Promise>(value);
572 promise.set_value(std::move(typed_response));
573 }
else if (std::holds_alternative<CallbackTypeValueVariant>(value)) {
574 auto & inner = std::get<CallbackTypeValueVariant>(value);
575 const auto & callback = std::get<CallbackType>(inner);
576 auto & promise = std::get<Promise>(inner);
577 auto & future = std::get<SharedFuture>(inner);
578 promise.set_value(std::move(typed_response));
579 callback(std::move(future));
580 }
else if (std::holds_alternative<CallbackWithRequestTypeValueVariant>(value)) {
581 auto & inner = std::get<CallbackWithRequestTypeValueVariant>(value);
582 const auto & callback = std::get<CallbackWithRequestType>(inner);
583 auto & promise = std::get<PromiseWithRequest>(inner);
584 auto & future = std::get<SharedFutureWithRequest>(inner);
585 auto & request = std::get<SharedRequest>(inner);
586 promise.set_value(std::make_pair(std::move(request), std::move(typed_response)));
587 callback(std::move(future));
623 auto future = promise.get_future();
624 auto req_id = async_send_request_impl(
647 typename std::enable_if<
654 SharedFutureAndRequestId
658 auto shared_future = promise.get_future().share();
659 auto req_id = async_send_request_impl(
662 CallbackType{std::forward<CallbackT>(cb)},
664 std::move(promise)));
678 typename std::enable_if<
681 CallbackWithRequestType
685 SharedFutureWithRequestAndRequestId
688 PromiseWithRequest promise;
689 auto shared_future = promise.get_future().share();
690 auto req_id = async_send_request_impl(
693 CallbackWithRequestType{std::forward<CallbackT>(cb)},
696 std::move(promise)));
714 std::lock_guard guard(pending_requests_mutex_);
715 return pending_requests_.erase(request_id) != 0u;
761 std::lock_guard guard(pending_requests_mutex_);
762 auto ret = pending_requests_.size();
763 pending_requests_.clear();
774 template<
typename AllocatorT = std::allocator<
int64_t>>
777 std::chrono::time_point<std::chrono::system_clock> time_point,
778 std::vector<int64_t, AllocatorT> * pruned_requests =
nullptr)
780 return detail::prune_requests_older_than_impl(
782 pending_requests_mutex_,
798 Clock::SharedPtr clock,
const QoS & qos_service_event_pub,
799 rcl_service_introspection_state_t introspection_state)
805 client_handle_.get(),
807 clock->get_clock_handle(),
808 srv_type_support_handle_,
810 introspection_state);
813 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to configure client introspection");
818 using CallbackTypeValueVariant = std::tuple<CallbackType, SharedFuture, Promise>;
819 using CallbackWithRequestTypeValueVariant = std::tuple<
820 CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>;
822 using CallbackInfoVariant = std::variant<
823 std::promise<SharedResponse>,
824 CallbackTypeValueVariant,
825 CallbackWithRequestTypeValueVariant>;
828 async_send_request_impl(
const Request & request, CallbackInfoVariant value)
830 int64_t sequence_number;
831 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
834 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to send request");
836 pending_requests_.try_emplace(
838 std::make_pair(std::chrono::system_clock::now(), std::move(value)));
839 return sequence_number;
842 std::optional<CallbackInfoVariant>
843 get_and_erase_pending_request(int64_t request_number)
845 std::unique_lock<std::mutex> lock(pending_requests_mutex_);
846 auto it = this->pending_requests_.find(request_number);
847 if (it == this->pending_requests_.end()) {
848 RCUTILS_LOG_DEBUG_NAMED(
850 "Received invalid sequence number. Ignoring...");
853 std::optional<CallbackInfoVariant> value = std::move(it->second.second);
854 this->pending_requests_.erase(request_number);
858 RCLCPP_DISABLE_COPY(
Client)
863 std::chrono::time_point<std::chrono::system_clock>,
864 CallbackInfoVariant>>
866 std::mutex pending_requests_mutex_;
869 const rosidl_service_type_support_t * srv_type_support_handle_;
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(bool in_use_state)
Exchange the "in use by wait set" state for this client.
RCLCPP_PUBLIC rclcpp::QoS get_request_publisher_actual_qos() const
Get the actual request publsher QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC std::shared_ptr< rcl_client_t > get_client_handle()
Return the rcl_client_t client handle in a std::shared_ptr.
void set_on_new_response_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new response is received.
RCLCPP_PUBLIC bool take_type_erased_response(void *response_out, rmw_request_id_t &request_header_out)
Take the next response for this client as a type erased pointer.
bool wait_for_service(std::chrono::duration< RepT, RatioT > timeout=std::chrono::duration< RepT, RatioT >(-1))
Wait for a service to be ready.
RCLCPP_PUBLIC const char * get_service_name() const
Return the name of the service.
void clear_on_new_response_callback()
Unset the callback registered for new responses, if any.
RCLCPP_PUBLIC rclcpp::QoS get_response_subscription_actual_qos() const
Get the actual response subscription QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC bool service_is_ready() const
Return if the service is ready.
bool take_response(typename ServiceT::Response &response_out, rmw_request_id_t &request_header_out)
Take the next response for this client.
std::shared_ptr< rmw_request_id_t > create_request_header() override
Create a shared pointer with a rmw_request_id_t.
bool remove_pending_request(int64_t request_id)
Cleanup a pending request.
void configure_introspection(Clock::SharedPtr clock, const QoS &qos_service_event_pub, rcl_service_introspection_state_t introspection_state)
Configure client introspection.
std::shared_ptr< void > create_response() override
Create a shared pointer with the response type.
size_t prune_requests_older_than(std::chrono::time_point< std::chrono::system_clock > time_point, std::vector< int64_t, AllocatorT > *pruned_requests=nullptr)
Clean all pending requests older than a time_point.
SharedFutureWithRequestAndRequestId async_send_request(SharedRequest request, CallbackT &&cb)
Send a request to the service server and schedule a callback in the executor.
SharedFutureAndRequestId async_send_request(SharedRequest request, CallbackT &&cb)
Send a request to the service server and schedule a callback in the executor.
void handle_response(std::shared_ptr< rmw_request_id_t > request_header, std::shared_ptr< void > response) override
Handle a server response.
FutureAndRequestId async_send_request(SharedRequest request)
Send a request to the service server.
size_t prune_pending_requests()
Clean all pending requests.
bool remove_pending_request(const SharedFutureAndRequestId &future)
Cleanup a pending request.
bool remove_pending_request(const SharedFutureWithRequestAndRequestId &future)
Cleanup a pending request.
Client(rclcpp::node_interfaces::NodeBaseInterface *node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string &service_name, rcl_client_options_t &client_options)
Default constructor.
bool remove_pending_request(const FutureAndRequestId &future)
Cleanup a pending request.
Encapsulation of Quality of Service settings.
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_client_configure_service_introspection(rcl_client_t *client, rcl_node_t *node, rcl_clock_t *clock, const rosidl_service_type_support_t *type_support, const rcl_publisher_options_t publisher_options, rcl_service_introspection_state_t introspection_state)
Configures service introspection features for the client.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_client_init(rcl_client_t *client, const rcl_node_t *node, const rosidl_service_type_support_t *type_support, const char *service_name, const rcl_client_options_t *options)
Initialize a rcl client.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_send_request(const rcl_client_t *client, const void *ros_request, int64_t *sequence_number)
Send a ROS request using a client.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_options_t rcl_publisher_get_default_options(void)
Return the default publisher options in a rcl_publisher_options_t.
Options available for a rcl_client_t.
Structure which encapsulates a ROS Node.
Options available for a rcl publisher.
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
A convenient Client::Future and request id pair.
SharedFuture share() noexcept
See std::future::share().
A convenient Client::SharedFuture and request id pair.
A convenient Client::SharedFutureWithRequest and request id pair.
std::future_status wait_for(const std::chrono::duration< Rep, Period > &timeout_duration) const
See std::future::wait_for().
std::future_status wait_until(const std::chrono::time_point< Clock, Duration > &timeout_time) const
See std::future::wait_until().
FutureAndRequestId & operator=(FutureAndRequestId &&other) noexcept=default
Move assignment.
FutureAndRequestId & operator=(const FutureAndRequestId &other)=delete
Deleted copy assignment, each instance is a unique owner of the future.
FutureAndRequestId(const FutureAndRequestId &other)=delete
Deleted copy constructor, each instance is a unique owner of the future.
void wait() const
See std::future::wait().
FutureAndRequestId(FutureAndRequestId &&other) noexcept=default
Move constructor.
~FutureAndRequestId()=default
Destructor.
auto get()
See std::future::get().
bool valid() const noexcept
See std::future::valid().
#define RCL_RET_SERVICE_NAME_INVALID
Service name (same as topic name) does not pass validation.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.