15 #ifndef RCLCPP__CLIENT_HPP_
16 #define RCLCPP__CLIENT_HPP_
27 #include <unordered_map>
33 #include "rcl/error_handling.h"
34 #include "rcl/event_callback.h"
35 #include "rcl/service_introspection.h"
38 #include "rclcpp/clock.hpp"
39 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
40 #include "rclcpp/exceptions.hpp"
41 #include "rclcpp/expand_topic_or_service_name.hpp"
42 #include "rclcpp/function_traits.hpp"
43 #include "rclcpp/logging.hpp"
44 #include "rclcpp/macros.hpp"
45 #include "rclcpp/node_interfaces/node_graph_interface.hpp"
46 #include "rclcpp/qos.hpp"
47 #include "rclcpp/type_support_decl.hpp"
48 #include "rclcpp/utilities.hpp"
49 #include "rclcpp/visibility_control.hpp"
51 #include "rmw/error_handling.h"
52 #include "rmw/impl/cpp/demangle.hpp"
60 template<
typename FutureT>
67 : future(std::move(impl)), request_id(req_id)
71 operator FutureT &() {
return this->future;}
78 [[deprecated(
"FutureAndRequestId: use .future instead of an implicit conversion")]]
79 operator FutureT() {
return this->future;}
84 auto get() {
return this->future.get();}
86 bool valid() const noexcept {
return this->future.valid();}
88 void wait()
const {
return this->future.wait();}
90 template<
class Rep,
class Period>
92 const std::chrono::duration<Rep, Period> & timeout_duration)
const
94 return this->future.wait_for(timeout_duration);
97 template<
class Clock,
class Duration>
99 const std::chrono::time_point<Clock, Duration> & timeout_time)
const
101 return this->future.wait_until(timeout_time);
119 template<
typename PendingRequestsT,
typename AllocatorT = std::allocator<
int64_t>>
121 prune_requests_older_than_impl(
122 PendingRequestsT & pending_requests,
123 std::mutex & pending_requests_mutex,
124 std::chrono::time_point<std::chrono::system_clock> time_point,
125 std::vector<int64_t, AllocatorT> * pruned_requests =
nullptr)
127 std::lock_guard guard(pending_requests_mutex);
128 auto old_size = pending_requests.size();
129 for (
auto it = pending_requests.begin(), last = pending_requests.end(); it != last; ) {
130 if (it->second.first < time_point) {
131 if (pruned_requests) {
132 pruned_requests->push_back(it->first);
134 it = pending_requests.erase(it);
139 return old_size - pending_requests.size();
143 namespace node_interfaces
145 class NodeBaseInterface;
151 RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(
ClientBase)
156 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);
194 std::shared_ptr<rcl_client_t>
203 std::shared_ptr<const rcl_client_t>
219 template<
typename RepT =
int64_t,
typename RatioT = std::milli>
222 std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
224 return wait_for_service_nanoseconds(
225 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
229 virtual std::shared_ptr<void> create_response() = 0;
230 virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
231 virtual void handle_response(
232 std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) = 0;
310 throw std::invalid_argument(
311 "The callback passed to set_on_new_response_callback "
316 [callback,
this](
size_t number_of_responses) {
318 callback(number_of_responses);
319 }
catch (
const std::exception & exception) {
322 "rclcpp::ClientBase@" <<
this <<
323 " caught " << rmw::impl::cpp::demangle(exception) <<
324 " exception in user-provided callback for the 'on new response' callback: " <<
329 "rclcpp::ClientBase@" <<
this <<
330 " caught unhandled exception in user-provided callback " <<
331 "for the 'on new response' callback");
335 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
341 rclcpp::detail::cpp_callback_trampoline<decltype(new_callback),
const void *,
size_t>,
342 static_cast<const void *
>(&new_callback));
345 on_new_response_callback_ = new_callback;
349 rclcpp::detail::cpp_callback_trampoline<
350 decltype(on_new_response_callback_),
const void *,
size_t>,
351 static_cast<const void *
>(&on_new_response_callback_));
358 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
359 if (on_new_response_callback_) {
361 on_new_response_callback_ =
nullptr;
370 wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);
374 get_rcl_node_handle();
378 get_rcl_node_handle()
const;
384 rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
385 std::shared_ptr<rcl_node_t> node_handle_;
386 std::shared_ptr<rclcpp::Context> context_;
389 std::recursive_mutex callback_mutex_;
394 std::function<void(
size_t)> on_new_response_callback_{
nullptr};
396 std::shared_ptr<rcl_client_t> client_handle_;
398 std::atomic<bool> in_use_by_wait_set_{
false};
401 template<
typename ServiceT>
405 using Request =
typename ServiceT::Request;
406 using Response =
typename ServiceT::Response;
408 using SharedRequest =
typename ServiceT::Request::SharedPtr;
409 using SharedResponse =
typename ServiceT::Response::SharedPtr;
411 using Promise = std::promise<SharedResponse>;
412 using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;
414 using SharedPromise = std::shared_ptr<Promise>;
415 using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
417 using Future = std::future<SharedResponse>;
418 using SharedFuture = std::shared_future<SharedResponse>;
419 using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
421 using CallbackType = std::function<void (SharedFuture)>;
422 using CallbackWithRequestType = std::function<void (SharedFutureWithRequest)>;
424 RCLCPP_SMART_PTR_DEFINITIONS(
Client)
445 "FutureAndRequestId: use .future.share() instead of an implicit conversion")]]
446 operator SharedFuture() {
return this->future.
share();}
451 SharedFuture
share() noexcept {
return this->future.share();}
480 std::shared_future<std::pair<SharedRequest, SharedResponse>>
497 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
498 const std::string & service_name,
501 srv_type_support_handle_(rosidl_typesupport_cpp::get_service_type_support_handle<ServiceT>())
505 this->get_rcl_node_handle(),
506 srv_type_support_handle_,
507 service_name.c_str(),
511 auto rcl_node_handle = this->get_rcl_node_handle();
520 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create client");
542 take_response(
typename ServiceT::Response & response_out, rmw_request_id_t & request_header_out)
551 std::shared_ptr<void>
554 return std::shared_ptr<void>(
new typename ServiceT::Response());
561 std::shared_ptr<rmw_request_id_t>
566 return std::shared_ptr<rmw_request_id_t>(
new rmw_request_id_t);
576 std::shared_ptr<rmw_request_id_t> request_header,
577 std::shared_ptr<void> response)
override
579 std::optional<CallbackInfoVariant>
580 optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number);
581 if (!optional_pending_request) {
584 auto & value = *optional_pending_request;
585 auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
586 std::move(response));
587 if (std::holds_alternative<Promise>(value)) {
588 auto & promise = std::get<Promise>(value);
589 promise.set_value(std::move(typed_response));
590 }
else if (std::holds_alternative<CallbackTypeValueVariant>(value)) {
591 auto & inner = std::get<CallbackTypeValueVariant>(value);
592 const auto & callback = std::get<CallbackType>(inner);
593 auto & promise = std::get<Promise>(inner);
594 auto & future = std::get<SharedFuture>(inner);
595 promise.set_value(std::move(typed_response));
596 callback(std::move(future));
597 }
else if (std::holds_alternative<CallbackWithRequestTypeValueVariant>(value)) {
598 auto & inner = std::get<CallbackWithRequestTypeValueVariant>(value);
599 const auto & callback = std::get<CallbackWithRequestType>(inner);
600 auto & promise = std::get<PromiseWithRequest>(inner);
601 auto & future = std::get<SharedFutureWithRequest>(inner);
602 auto & request = std::get<SharedRequest>(inner);
603 promise.set_value(std::make_pair(std::move(request), std::move(typed_response)));
604 callback(std::move(future));
640 auto future = promise.get_future();
641 auto req_id = async_send_request_impl(
664 typename std::enable_if<
671 SharedFutureAndRequestId
675 auto shared_future = promise.get_future().share();
676 auto req_id = async_send_request_impl(
679 CallbackType{std::forward<CallbackT>(cb)},
681 std::move(promise)));
695 typename std::enable_if<
698 CallbackWithRequestType
702 SharedFutureWithRequestAndRequestId
705 PromiseWithRequest promise;
706 auto shared_future = promise.get_future().share();
707 auto req_id = async_send_request_impl(
710 CallbackWithRequestType{std::forward<CallbackT>(cb)},
713 std::move(promise)));
731 std::lock_guard guard(pending_requests_mutex_);
732 return pending_requests_.erase(request_id) != 0u;
778 std::lock_guard guard(pending_requests_mutex_);
779 auto ret = pending_requests_.size();
780 pending_requests_.clear();
791 template<
typename AllocatorT = std::allocator<
int64_t>>
794 std::chrono::time_point<std::chrono::system_clock> time_point,
795 std::vector<int64_t, AllocatorT> * pruned_requests =
nullptr)
797 return detail::prune_requests_older_than_impl(
799 pending_requests_mutex_,
812 Clock::SharedPtr clock,
const QoS & qos_service_event_pub,
813 rcl_service_introspection_state_t introspection_state)
819 client_handle_.get(),
821 clock->get_clock_handle(),
822 srv_type_support_handle_,
824 introspection_state);
827 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to configure client introspection");
832 using CallbackTypeValueVariant = std::tuple<CallbackType, SharedFuture, Promise>;
833 using CallbackWithRequestTypeValueVariant = std::tuple<
834 CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>;
836 using CallbackInfoVariant = std::variant<
837 std::promise<SharedResponse>,
838 CallbackTypeValueVariant,
839 CallbackWithRequestTypeValueVariant>;
842 async_send_request_impl(
const Request & request, CallbackInfoVariant value)
844 int64_t sequence_number;
845 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
848 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to send request");
850 pending_requests_.try_emplace(
852 std::make_pair(std::chrono::system_clock::now(), std::move(value)));
853 return sequence_number;
856 std::optional<CallbackInfoVariant>
857 get_and_erase_pending_request(int64_t request_number)
859 std::unique_lock<std::mutex> lock(pending_requests_mutex_);
860 auto it = this->pending_requests_.find(request_number);
861 if (it == this->pending_requests_.end()) {
862 RCUTILS_LOG_DEBUG_NAMED(
864 "Received invalid sequence number. Ignoring...");
867 std::optional<CallbackInfoVariant> value = std::move(it->second.second);
868 this->pending_requests_.erase(request_number);
872 RCLCPP_DISABLE_COPY(
Client)
877 std::chrono::time_point<std::chrono::system_clock>,
878 CallbackInfoVariant>>
880 std::mutex pending_requests_mutex_;
883 const rosidl_service_type_support_t * srv_type_support_handle_;
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(bool in_use_state)
Exchange the "in use by wait set" state for this client.
RCLCPP_PUBLIC rclcpp::QoS get_request_publisher_actual_qos() const
Get the actual request publsher QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC std::shared_ptr< rcl_client_t > get_client_handle()
Return the rcl_client_t client handle in a std::shared_ptr.
void set_on_new_response_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new response is received.
RCLCPP_PUBLIC bool take_type_erased_response(void *response_out, rmw_request_id_t &request_header_out)
Take the next response for this client as a type erased pointer.
bool wait_for_service(std::chrono::duration< RepT, RatioT > timeout=std::chrono::duration< RepT, RatioT >(-1))
Wait for a service to be ready.
RCLCPP_PUBLIC const char * get_service_name() const
Return the name of the service.
void clear_on_new_response_callback()
Unset the callback registered for new responses, if any.
RCLCPP_PUBLIC rclcpp::QoS get_response_subscription_actual_qos() const
Get the actual response subscription QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC bool service_is_ready() const
Return if the service is ready.
bool take_response(typename ServiceT::Response &response_out, rmw_request_id_t &request_header_out)
Take the next response for this client.
std::shared_ptr< rmw_request_id_t > create_request_header() override
Create a shared pointer with a rmw_request_id_t.
bool remove_pending_request(int64_t request_id)
Cleanup a pending request.
void configure_introspection(Clock::SharedPtr clock, const QoS &qos_service_event_pub, rcl_service_introspection_state_t introspection_state)
Configure client introspection.
std::shared_ptr< void > create_response() override
Create a shared pointer with the response type.
size_t prune_requests_older_than(std::chrono::time_point< std::chrono::system_clock > time_point, std::vector< int64_t, AllocatorT > *pruned_requests=nullptr)
Clean all pending requests older than a time_point.
SharedFutureWithRequestAndRequestId async_send_request(SharedRequest request, CallbackT &&cb)
Send a request to the service server and schedule a callback in the executor.
SharedFutureAndRequestId async_send_request(SharedRequest request, CallbackT &&cb)
Send a request to the service server and schedule a callback in the executor.
void handle_response(std::shared_ptr< rmw_request_id_t > request_header, std::shared_ptr< void > response) override
Handle a server response.
FutureAndRequestId async_send_request(SharedRequest request)
Send a request to the service server.
size_t prune_pending_requests()
Clean all pending requests.
bool remove_pending_request(const SharedFutureAndRequestId &future)
Cleanup a pending request.
bool remove_pending_request(const SharedFutureWithRequestAndRequestId &future)
Cleanup a pending request.
Client(rclcpp::node_interfaces::NodeBaseInterface *node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string &service_name, rcl_client_options_t &client_options)
Default constructor.
bool remove_pending_request(const FutureAndRequestId &future)
Cleanup a pending request.
Encapsulation of Quality of Service settings.
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_client_configure_service_introspection(rcl_client_t *client, rcl_node_t *node, rcl_clock_t *clock, const rosidl_service_type_support_t *type_support, const rcl_publisher_options_t publisher_options, rcl_service_introspection_state_t introspection_state)
Configures service introspection features for the client.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_client_init(rcl_client_t *client, const rcl_node_t *node, const rosidl_service_type_support_t *type_support, const char *service_name, const rcl_client_options_t *options)
Initialize a rcl client.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_send_request(const rcl_client_t *client, const void *ros_request, int64_t *sequence_number)
Send a ROS request using a client.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
RCL_PUBLIC RCL_WARN_UNUSED rcl_publisher_options_t rcl_publisher_get_default_options(void)
Return the default publisher options in a rcl_publisher_options_t.
Options available for a rcl_client_t.
Structure which encapsulates a ROS Node.
Options available for a rcl publisher.
rmw_qos_profile_t qos
Middleware quality of service settings for the publisher.
A convenient Client::Future and request id pair.
SharedFuture share() noexcept
See std::future::share().
A convenient Client::SharedFuture and request id pair.
A convenient Client::SharedFutureWithRequest and request id pair.
std::future_status wait_for(const std::chrono::duration< Rep, Period > &timeout_duration) const
See std::future::wait_for().
std::future_status wait_until(const std::chrono::time_point< Clock, Duration > &timeout_time) const
See std::future::wait_until().
FutureAndRequestId & operator=(FutureAndRequestId &&other) noexcept=default
Move assignment.
FutureAndRequestId & operator=(const FutureAndRequestId &other)=delete
Deleted copy assignment, each instance is a unique owner of the future.
FutureAndRequestId(const FutureAndRequestId &other)=delete
Deleted copy constructor, each instance is a unique owner of the future.
void wait() const
See std::future::wait().
FutureAndRequestId(FutureAndRequestId &&other) noexcept=default
Move constructor.
~FutureAndRequestId()=default
Destructor.
auto get()
See std::future::get().
bool valid() const noexcept
See std::future::valid().
#define RCL_RET_SERVICE_NAME_INVALID
Service name (same as topic name) does not pass validation.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.