15 #ifndef RCLCPP__CLIENT_HPP_
16 #define RCLCPP__CLIENT_HPP_
20 #include <unordered_map>
32 #include "rcl/error_handling.h"
33 #include "rcl/event_callback.h"
36 #include "rclcpp/detail/cpp_callback_trampoline.hpp"
37 #include "rclcpp/exceptions.hpp"
38 #include "rclcpp/expand_topic_or_service_name.hpp"
39 #include "rclcpp/function_traits.hpp"
40 #include "rclcpp/logging.hpp"
41 #include "rclcpp/macros.hpp"
42 #include "rclcpp/node_interfaces/node_graph_interface.hpp"
43 #include "rclcpp/qos.hpp"
44 #include "rclcpp/type_support_decl.hpp"
45 #include "rclcpp/utilities.hpp"
46 #include "rclcpp/visibility_control.hpp"
48 #include "rmw/error_handling.h"
49 #include "rmw/impl/cpp/demangle.hpp"
57 template<
typename FutureT>
64 : future(std::move(impl)), request_id(req_id)
68 operator FutureT &() {
return this->future;}
75 [[deprecated(
"FutureAndRequestId: use .future instead of an implicit conversion")]]
76 operator FutureT() {
return this->future;}
81 auto get() {
return this->future.get();}
83 bool valid() const noexcept {
return this->future.valid();}
85 void wait()
const {
return this->future.wait();}
87 template<
class Rep,
class Period>
89 const std::chrono::duration<Rep, Period> & timeout_duration)
const
91 return this->future.wait_for(timeout_duration);
94 template<
class Clock,
class Duration>
96 const std::chrono::time_point<Clock, Duration> & timeout_time)
const
98 return this->future.wait_until(timeout_time);
117 namespace node_interfaces
119 class NodeBaseInterface;
125 RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(
ClientBase)
130 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph);
168 std::shared_ptr<rcl_client_t>
177 std::shared_ptr<const rcl_client_t>
193 template<
typename RepT =
int64_t,
typename RatioT = std::milli>
196 std::chrono::duration<RepT, RatioT> timeout = std::chrono::duration<RepT, RatioT>(-1))
198 return wait_for_service_nanoseconds(
199 std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
203 virtual std::shared_ptr<void> create_response() = 0;
204 virtual std::shared_ptr<rmw_request_id_t> create_request_header() = 0;
205 virtual void handle_response(
206 std::shared_ptr<rmw_request_id_t> request_header, std::shared_ptr<void> response) = 0;
284 throw std::invalid_argument(
285 "The callback passed to set_on_new_response_callback "
290 [callback,
this](
size_t number_of_responses) {
292 callback(number_of_responses);
293 }
catch (
const std::exception & exception) {
296 "rclcpp::ClientBase@" <<
this <<
297 " caught " << rmw::impl::cpp::demangle(exception) <<
298 " exception in user-provided callback for the 'on new response' callback: " <<
303 "rclcpp::ClientBase@" <<
this <<
304 " caught unhandled exception in user-provided callback " <<
305 "for the 'on new response' callback");
309 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
315 rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
316 static_cast<const void *
>(&new_callback));
319 on_new_response_callback_ = new_callback;
323 rclcpp::detail::cpp_callback_trampoline<const void *, size_t>,
324 static_cast<const void *
>(&on_new_response_callback_));
331 std::lock_guard<std::recursive_mutex> lock(callback_mutex_);
332 if (on_new_response_callback_) {
334 on_new_response_callback_ =
nullptr;
343 wait_for_service_nanoseconds(std::chrono::nanoseconds timeout);
347 get_rcl_node_handle();
351 get_rcl_node_handle()
const;
357 rclcpp::node_interfaces::NodeGraphInterface::WeakPtr node_graph_;
358 std::shared_ptr<rcl_node_t> node_handle_;
359 std::shared_ptr<rclcpp::Context> context_;
362 std::shared_ptr<rcl_client_t> client_handle_;
364 std::atomic<bool> in_use_by_wait_set_{
false};
366 std::recursive_mutex callback_mutex_;
367 std::function<void(
size_t)> on_new_response_callback_{
nullptr};
370 template<
typename ServiceT>
374 using Request =
typename ServiceT::Request;
375 using Response =
typename ServiceT::Response;
377 using SharedRequest =
typename ServiceT::Request::SharedPtr;
378 using SharedResponse =
typename ServiceT::Response::SharedPtr;
380 using Promise = std::promise<SharedResponse>;
381 using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;
383 using SharedPromise = std::shared_ptr<Promise>;
384 using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;
386 using Future = std::future<SharedResponse>;
387 using SharedFuture = std::shared_future<SharedResponse>;
388 using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;
390 using CallbackType = std::function<void (SharedFuture)>;
391 using CallbackWithRequestType = std::function<void (SharedFutureWithRequest)>;
393 RCLCPP_SMART_PTR_DEFINITIONS(
Client)
414 "FutureAndRequestId: use .future.share() instead of an implicit conversion")]]
415 operator SharedFuture() {
return this->future.
share();}
420 SharedFuture
share() noexcept {
return this->future.share();}
449 std::shared_future<std::pair<SharedRequest, SharedResponse>>
466 rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph,
467 const std::string & service_name,
471 using rosidl_typesupport_cpp::get_service_type_support_handle;
472 auto service_type_support_handle =
473 get_service_type_support_handle<ServiceT>();
476 this->get_rcl_node_handle(),
477 service_type_support_handle,
478 service_name.c_str(),
482 auto rcl_node_handle = this->get_rcl_node_handle();
491 rclcpp::exceptions::throw_from_rcl_error(ret,
"could not create client");
513 take_response(
typename ServiceT::Response & response_out, rmw_request_id_t & request_header_out)
522 std::shared_ptr<void>
525 return std::shared_ptr<void>(
new typename ServiceT::Response());
532 std::shared_ptr<rmw_request_id_t>
537 return std::shared_ptr<rmw_request_id_t>(
new rmw_request_id_t);
547 std::shared_ptr<rmw_request_id_t> request_header,
548 std::shared_ptr<void> response)
override
550 std::optional<CallbackInfoVariant>
551 optional_pending_request = this->get_and_erase_pending_request(request_header->sequence_number);
552 if (!optional_pending_request) {
555 auto & value = *optional_pending_request;
556 auto typed_response = std::static_pointer_cast<typename ServiceT::Response>(
557 std::move(response));
558 if (std::holds_alternative<Promise>(value)) {
559 auto & promise = std::get<Promise>(value);
560 promise.set_value(std::move(typed_response));
561 }
else if (std::holds_alternative<CallbackTypeValueVariant>(value)) {
562 auto & inner = std::get<CallbackTypeValueVariant>(value);
563 const auto & callback = std::get<CallbackType>(inner);
564 auto & promise = std::get<Promise>(inner);
565 auto & future = std::get<SharedFuture>(inner);
566 promise.set_value(std::move(typed_response));
567 callback(std::move(future));
568 }
else if (std::holds_alternative<CallbackWithRequestTypeValueVariant>(value)) {
569 auto & inner = std::get<CallbackWithRequestTypeValueVariant>(value);
570 const auto & callback = std::get<CallbackWithRequestType>(inner);
571 auto & promise = std::get<PromiseWithRequest>(inner);
572 auto & future = std::get<SharedFutureWithRequest>(inner);
573 auto & request = std::get<SharedRequest>(inner);
574 promise.set_value(std::make_pair(std::move(request), std::move(typed_response)));
575 callback(std::move(future));
611 auto future = promise.get_future();
612 auto req_id = async_send_request_impl(
635 typename std::enable_if<
642 SharedFutureAndRequestId
646 auto shared_future = promise.get_future().share();
647 auto req_id = async_send_request_impl(
650 CallbackType{std::forward<CallbackT>(cb)},
652 std::move(promise)));
666 typename std::enable_if<
669 CallbackWithRequestType
673 SharedFutureWithRequestAndRequestId
676 PromiseWithRequest promise;
677 auto shared_future = promise.get_future().share();
678 auto req_id = async_send_request_impl(
681 CallbackWithRequestType{std::forward<CallbackT>(cb)},
684 std::move(promise)));
702 std::lock_guard guard(pending_requests_mutex_);
703 return pending_requests_.erase(request_id) != 0u;
749 std::lock_guard guard(pending_requests_mutex_);
750 auto ret = pending_requests_.size();
751 pending_requests_.clear();
762 template<
typename AllocatorT = std::allocator<
int64_t>>
765 std::chrono::time_point<std::chrono::system_clock> time_point,
766 std::vector<int64_t, AllocatorT> * pruned_requests =
nullptr)
768 std::lock_guard guard(pending_requests_mutex_);
769 auto old_size = pending_requests_.size();
770 for (
auto it = pending_requests_.begin(), last = pending_requests_.end(); it != last; ) {
771 if (it->second.first < time_point) {
772 if (pruned_requests) {
773 pruned_requests->push_back(it->first);
775 it = pending_requests_.erase(it);
780 return old_size - pending_requests_.size();
784 using CallbackTypeValueVariant = std::tuple<CallbackType, SharedFuture, Promise>;
785 using CallbackWithRequestTypeValueVariant = std::tuple<
786 CallbackWithRequestType, SharedRequest, SharedFutureWithRequest, PromiseWithRequest>;
788 using CallbackInfoVariant = std::variant<
789 std::promise<SharedResponse>,
790 CallbackTypeValueVariant,
791 CallbackWithRequestTypeValueVariant>;
794 async_send_request_impl(
const Request & request, CallbackInfoVariant value)
796 int64_t sequence_number;
797 std::lock_guard<std::mutex> lock(pending_requests_mutex_);
800 rclcpp::exceptions::throw_from_rcl_error(ret,
"failed to send request");
802 pending_requests_.try_emplace(
804 std::make_pair(std::chrono::system_clock::now(), std::move(value)));
805 return sequence_number;
808 std::optional<CallbackInfoVariant>
809 get_and_erase_pending_request(int64_t request_number)
811 std::unique_lock<std::mutex> lock(pending_requests_mutex_);
812 auto it = this->pending_requests_.find(request_number);
813 if (it == this->pending_requests_.end()) {
814 RCUTILS_LOG_DEBUG_NAMED(
816 "Received invalid sequence number. Ignoring...");
819 std::optional<CallbackInfoVariant> value = std::move(it->second.second);
820 this->pending_requests_.erase(request_number);
824 RCLCPP_DISABLE_COPY(
Client)
829 std::chrono::time_point<std::chrono::system_clock>,
830 CallbackInfoVariant>>
832 std::mutex pending_requests_mutex_;
RCLCPP_PUBLIC bool exchange_in_use_by_wait_set_state(bool in_use_state)
Exchange the "in use by wait set" state for this client.
RCLCPP_PUBLIC rclcpp::QoS get_request_publisher_actual_qos() const
Get the actual request publsher QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC std::shared_ptr< rcl_client_t > get_client_handle()
Return the rcl_client_t client handle in a std::shared_ptr.
void set_on_new_response_callback(std::function< void(size_t)> callback)
Set a callback to be called when each new response is received.
RCLCPP_PUBLIC bool take_type_erased_response(void *response_out, rmw_request_id_t &request_header_out)
Take the next response for this client as a type erased pointer.
bool wait_for_service(std::chrono::duration< RepT, RatioT > timeout=std::chrono::duration< RepT, RatioT >(-1))
Wait for a service to be ready.
RCLCPP_PUBLIC const char * get_service_name() const
Return the name of the service.
void clear_on_new_response_callback()
Unset the callback registered for new responses, if any.
RCLCPP_PUBLIC rclcpp::QoS get_response_subscription_actual_qos() const
Get the actual response subscription QoS settings, after the defaults have been determined.
RCLCPP_PUBLIC bool service_is_ready() const
Return if the service is ready.
bool take_response(typename ServiceT::Response &response_out, rmw_request_id_t &request_header_out)
Take the next response for this client.
std::shared_ptr< rmw_request_id_t > create_request_header() override
Create a shared pointer with a rmw_request_id_t.
bool remove_pending_request(int64_t request_id)
Cleanup a pending request.
std::shared_ptr< void > create_response() override
Create a shared pointer with the response type.
size_t prune_requests_older_than(std::chrono::time_point< std::chrono::system_clock > time_point, std::vector< int64_t, AllocatorT > *pruned_requests=nullptr)
Clean all pending requests older than a time_point.
SharedFutureWithRequestAndRequestId async_send_request(SharedRequest request, CallbackT &&cb)
Send a request to the service server and schedule a callback in the executor.
SharedFutureAndRequestId async_send_request(SharedRequest request, CallbackT &&cb)
Send a request to the service server and schedule a callback in the executor.
void handle_response(std::shared_ptr< rmw_request_id_t > request_header, std::shared_ptr< void > response) override
Handle a server response.
FutureAndRequestId async_send_request(SharedRequest request)
Send a request to the service server.
size_t prune_pending_requests()
Clean all pending requests.
bool remove_pending_request(const SharedFutureAndRequestId &future)
Cleanup a pending request.
bool remove_pending_request(const SharedFutureWithRequestAndRequestId &future)
Cleanup a pending request.
Client(rclcpp::node_interfaces::NodeBaseInterface *node_base, rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, const std::string &service_name, rcl_client_options_t &client_options)
Default constructor.
bool remove_pending_request(const FutureAndRequestId &future)
Cleanup a pending request.
Encapsulation of Quality of Service settings.
Pure virtual interface class for the NodeBase part of the Node API.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_client_init(rcl_client_t *client, const rcl_node_t *node, const rosidl_service_type_support_t *type_support, const char *service_name, const rcl_client_options_t *options)
Initialize a rcl client.
RCL_PUBLIC RCL_WARN_UNUSED rcl_ret_t rcl_send_request(const rcl_client_t *client, const void *ros_request, int64_t *sequence_number)
Send a ROS request using a client.
Versions of rosidl_typesupport_cpp::get_message_type_support_handle that handle adapted types.
RCLCPP_PUBLIC std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_name(const rcl_node_t *node)
Return the name of the node.
RCL_PUBLIC RCL_WARN_UNUSED const char * rcl_node_get_namespace(const rcl_node_t *node)
Return the namespace of the node.
Options available for a rcl_client_t.
Structure which encapsulates a ROS Node.
A convenient Client::Future and request id pair.
SharedFuture share() noexcept
See std::future::share().
A convenient Client::SharedFuture and request id pair.
A convenient Client::SharedFutureWithRequest and request id pair.
std::future_status wait_for(const std::chrono::duration< Rep, Period > &timeout_duration) const
See std::future::wait_for().
std::future_status wait_until(const std::chrono::time_point< Clock, Duration > &timeout_time) const
See std::future::wait_until().
FutureAndRequestId & operator=(FutureAndRequestId &&other) noexcept=default
Move assignment.
FutureAndRequestId & operator=(const FutureAndRequestId &other)=delete
Deleted copy assignment, each instance is a unique owner of the future.
FutureAndRequestId(const FutureAndRequestId &other)=delete
Deleted copy constructor, each instance is a unique owner of the future.
void wait() const
See std::future::wait().
FutureAndRequestId(FutureAndRequestId &&other) noexcept=default
Move constructor.
~FutureAndRequestId()=default
Destructor.
auto get()
See std::future::get().
bool valid() const noexcept
See std::future::valid().
#define RCL_RET_SERVICE_NAME_INVALID
Service name (same as topic name) does not pass validation.
#define RCL_RET_OK
Success return code.
rmw_ret_t rcl_ret_t
The type that holds an rcl return code.