From 657e4d2f7b622712872301b6a266d48e6b943923 Mon Sep 17 00:00:00 2001 From: Robert Leahy Date: Sun, 28 Dec 2025 18:00:41 -0500 Subject: [PATCH] exec::repeat_effect_until: Throwing Decay-Copy & Value Category Preservation The asynchronous loop of exec::repeat_effect_until proceeds until the child operation sends a value which converts to true. Previously this process proceeded as follows: 1. Accept the child operation's result by value to avoid dangling references into the operation state (see next step) 2. Destroy the child operation state 3. Convert the result accepted in step 1 to bool and check if it's true, if so end the operation, otherwise 4. Connect the child sender 5. Start the new child operation Unfortunately step 1 meant that the result of the child operation would be decay-copied to pass it by value. This occurred within a noexcept function and therefore if that decay-copy threw std::terminate would be called. Moreover the previous implementation did not forward the result in step 3. This meant that if the child's result type was only rvalue convertible to bool compilation would fail. Additionally the same pass-by-value strategy was used for errors. However when handling an error there's no need to destroy the child operation state due to the fact the operation is ending and therefore doesn't need to reconnect the child sender for the next iteration (note this logic also applies to successful completion). Fixed all of the above by handling completion of the child operation as follows: 1. If the child completed with error or stopped simply forward that completion through (note the child operation state is not destroyed and will be cleaned up by the destructor of the operation state for exec::repeat_effect_until) (note that this saves one decay-copy over the previous implementation but requires a branch in the destructor, which was already present), otherwise 2. Forward the result, convert that forwarded expression to bool, and check if it's true, if so end the operation (note that once again the child operation state is not destroyed and once again a decay-copy is eliminated), otherwise 3. Destroy the child operation state 4. Connect the child sender 5. Start the new child operation --- include/exec/repeat_effect_until.hpp | 47 ++++++----- test/exec/test_repeat_effect_until.cpp | 111 ++++++++++++++++++++++++- 2 files changed, 134 insertions(+), 24 deletions(-) diff --git a/include/exec/repeat_effect_until.hpp b/include/exec/repeat_effect_until.hpp index 6e7cbdc39..d0084d540 100644 --- a/include/exec/repeat_effect_until.hpp +++ b/include/exec/repeat_effect_until.hpp @@ -24,7 +24,6 @@ #include "trampoline_scheduler.hpp" #include "sequence.hpp" -#include "../stdexec/__detail/__atomic.hpp" #include #include @@ -82,7 +81,7 @@ namespace exec { using __child_op_t = stdexec::connect_result_t<__child_on_sched_sender_t, __receiver_t>; __child_t __child_; - __std::atomic_flag __started_{}; + bool __has_child_op_ = false; stdexec::__manual_lifetime<__child_op_t> __child_op_; trampoline_scheduler __sched_; @@ -93,11 +92,7 @@ namespace exec { } ~__repeat_effect_state() { - if (!__started_.test(__std::memory_order_acquire)) { - __std::atomic_thread_fence(__std::memory_order_release); - // TSan does not support __std::atomic_thread_fence, so we - // need to use the TSan-specific __tsan_release instead: - STDEXEC_WHEN(STDEXEC_TSAN(), __tsan_release(&__started_)); + if (__has_child_op_) { __child_op_.__destroy(); } } @@ -107,30 +102,42 @@ namespace exec { return stdexec::connect( exec::sequence(stdexec::schedule(__sched_), __child_), __receiver_t{this}); }); + __has_child_op_ = true; + } + + void __destroy() noexcept { + __child_op_.__destroy(); + __has_child_op_ = false; } void __start() noexcept { - const bool __already_started [[maybe_unused]] - = __started_.test_and_set(__std::memory_order_relaxed); - STDEXEC_ASSERT(!__already_started); + STDEXEC_ASSERT(__has_child_op_); stdexec::start(__child_op_.__get()); } template - void __complete(_Tag, _Args... __args) noexcept { // Intentionally by value... - __child_op_.__destroy(); // ... because this could potentially invalidate them. + void __complete(_Tag, _Args &&...__args) noexcept { if constexpr (same_as<_Tag, set_value_t>) { // If the sender completed with true, we're done STDEXEC_TRY { - const bool __done = (static_cast(__args) && ...); + const bool __done = (static_cast(static_cast<_Args &&>(__args)) && ...); if (__done) { stdexec::set_value(static_cast<_Receiver &&>(this->__receiver())); - } else { + return; + } + __destroy(); + STDEXEC_TRY { __connect(); - stdexec::start(__child_op_.__get()); } + STDEXEC_CATCH_ALL { + stdexec::set_error( + static_cast<_Receiver &&>(this->__receiver()), std::current_exception()); + return; + } + stdexec::start(__child_op_.__get()); } STDEXEC_CATCH_ALL { + __destroy(); stdexec::set_error( static_cast<_Receiver &&>(this->__receiver()), std::current_exception()); } @@ -160,20 +167,14 @@ namespace exec { __mexception<_INVALID_ARGUMENT_TO_REPEAT_EFFECT_UNTIL_<>, _WITH_SENDER_<_Sender>> >; - template - using __error_t = completion_signatures)>; - template using __completions_t = stdexec::transform_completion_signatures< __completion_signatures_of_t<__decay_t<_Sender> &, _Env...>, stdexec::transform_completion_signatures< __completion_signatures_of_t, _Env...>, - __eptr_completion, - __sigs::__default_set_value, - __error_t + __eptr_completion >, - __mbind_front_q<__values_t, _Sender>::template __f, - __error_t + __mbind_front_q<__values_t, _Sender>::template __f >; struct __repeat_effect_tag { }; diff --git a/test/exec/test_repeat_effect_until.cpp b/test/exec/test_repeat_effect_until.cpp index 08f2a65bd..879d52bfa 100644 --- a/test/exec/test_repeat_effect_until.cpp +++ b/test/exec/test_repeat_effect_until.cpp @@ -26,7 +26,9 @@ #include +#include #include +#include #include namespace ex = stdexec; @@ -37,7 +39,8 @@ namespace { using sender_concept = ex::sender_t; using __t = boolean_sender; using __id = boolean_sender; - using completion_signatures = ex::completion_signatures; + using completion_signatures = + ex::completion_signatures; template struct operation { @@ -88,6 +91,12 @@ namespace { TEST_CASE("simple example for repeat_effect_until", "[adaptors][repeat_effect_until]") { ex::sender auto snd = exec::repeat_effect_until(boolean_sender{}); + static_assert(all_contained_in< + ex::completion_signatures, + ex::completion_signatures_of_t>>); + static_assert(!all_contained_in< + ex::completion_signatures, + ex::completion_signatures_of_t>>); ex::sync_wait(std::move(snd)); } @@ -197,4 +206,104 @@ namespace { ex::start(op); REQUIRE(counter == 10); } + + TEST_CASE( + "repeat_effect works correctly when the child operation sends an error type which throws when " + "decay-copied", + "[adaptors][repeat_effect]") { + struct error_type { + explicit error_type(unsigned& throw_after) noexcept + : throw_after_(throw_after) { + } + error_type(const error_type& other) + : throw_after_(other.throw_after_) { + if (!throw_after_) { + throw std::logic_error("TEST"); + } + --throw_after_; + } + unsigned& throw_after_; + }; + struct receiver { + using receiver_concept = ::stdexec::receiver_t; + void set_value() && noexcept { + FAIL_CHECK("Unexpected value completion signal"); + } + void set_stopped() && noexcept { + FAIL_CHECK("Unexpected stopped completion signal"); + } + void set_error(std::exception_ptr) && noexcept { + CHECK(!done_); + } + void set_error(const error_type&) && noexcept { + CHECK(!done_); + done_ = true; + } + bool& done_; + }; + unsigned throw_after = 0; + bool done = false; + do { + const auto tmp = throw_after; + throw_after = std::numeric_limits::max(); + auto op = + ex::connect(exec::repeat_effect(ex::just_error(error_type(throw_after))), receiver(done)); + throw_after = tmp; + ex::start(op); + throw_after = tmp; + ++throw_after; + } while (!done); + } + + TEST_CASE( + "repeat_effect_until works correctly when the child operation sends type which throws when " + "decay-copied, and when converted to bool, and which is only rvalue convertible to bool", + "[adaptors][repeat_effect_until]") { + class value_type { + void maybe_throw_() const { + if (!throw_after_) { + throw std::logic_error("TEST"); + } + --throw_after_; + } + public: + explicit value_type(unsigned& throw_after) noexcept + : throw_after_(throw_after) { + } + value_type(const value_type& other) + : throw_after_(other.throw_after_) { + maybe_throw_(); + } + unsigned& throw_after_; + operator bool() && { + maybe_throw_(); + return true; + } + }; + struct receiver { + using receiver_concept = ::stdexec::receiver_t; + void set_value() && noexcept { + done_ = true; + } + void set_stopped() && noexcept { + FAIL_CHECK("Unexpected stopped completion signal"); + } + void set_error(std::exception_ptr) && noexcept { + CHECK(!done_); + } + bool& done_; + }; + unsigned throw_after = 0; + bool done = false; + do { + const auto tmp = throw_after; + throw_after = std::numeric_limits::max(); + auto op = + ex::connect(exec::repeat_effect_until(ex::just(value_type(throw_after))), receiver(done)); + throw_after = tmp; + ex::start(op); + throw_after = tmp; + ++throw_after; + } while (!done); + } } // namespace