diff --git a/include/exec/repeat_effect_until.hpp b/include/exec/repeat_effect_until.hpp index 6e7cbdc39..d0084d540 100644 --- a/include/exec/repeat_effect_until.hpp +++ b/include/exec/repeat_effect_until.hpp @@ -24,7 +24,6 @@ #include "trampoline_scheduler.hpp" #include "sequence.hpp" -#include "../stdexec/__detail/__atomic.hpp" #include #include @@ -82,7 +81,7 @@ namespace exec { using __child_op_t = stdexec::connect_result_t<__child_on_sched_sender_t, __receiver_t>; __child_t __child_; - __std::atomic_flag __started_{}; + bool __has_child_op_ = false; stdexec::__manual_lifetime<__child_op_t> __child_op_; trampoline_scheduler __sched_; @@ -93,11 +92,7 @@ namespace exec { } ~__repeat_effect_state() { - if (!__started_.test(__std::memory_order_acquire)) { - __std::atomic_thread_fence(__std::memory_order_release); - // TSan does not support __std::atomic_thread_fence, so we - // need to use the TSan-specific __tsan_release instead: - STDEXEC_WHEN(STDEXEC_TSAN(), __tsan_release(&__started_)); + if (__has_child_op_) { __child_op_.__destroy(); } } @@ -107,30 +102,42 @@ namespace exec { return stdexec::connect( exec::sequence(stdexec::schedule(__sched_), __child_), __receiver_t{this}); }); + __has_child_op_ = true; + } + + void __destroy() noexcept { + __child_op_.__destroy(); + __has_child_op_ = false; } void __start() noexcept { - const bool __already_started [[maybe_unused]] - = __started_.test_and_set(__std::memory_order_relaxed); - STDEXEC_ASSERT(!__already_started); + STDEXEC_ASSERT(__has_child_op_); stdexec::start(__child_op_.__get()); } template - void __complete(_Tag, _Args... __args) noexcept { // Intentionally by value... - __child_op_.__destroy(); // ... because this could potentially invalidate them. + void __complete(_Tag, _Args &&...__args) noexcept { if constexpr (same_as<_Tag, set_value_t>) { // If the sender completed with true, we're done STDEXEC_TRY { - const bool __done = (static_cast(__args) && ...); + const bool __done = (static_cast(static_cast<_Args &&>(__args)) && ...); if (__done) { stdexec::set_value(static_cast<_Receiver &&>(this->__receiver())); - } else { + return; + } + __destroy(); + STDEXEC_TRY { __connect(); - stdexec::start(__child_op_.__get()); } + STDEXEC_CATCH_ALL { + stdexec::set_error( + static_cast<_Receiver &&>(this->__receiver()), std::current_exception()); + return; + } + stdexec::start(__child_op_.__get()); } STDEXEC_CATCH_ALL { + __destroy(); stdexec::set_error( static_cast<_Receiver &&>(this->__receiver()), std::current_exception()); } @@ -160,20 +167,14 @@ namespace exec { __mexception<_INVALID_ARGUMENT_TO_REPEAT_EFFECT_UNTIL_<>, _WITH_SENDER_<_Sender>> >; - template - using __error_t = completion_signatures)>; - template using __completions_t = stdexec::transform_completion_signatures< __completion_signatures_of_t<__decay_t<_Sender> &, _Env...>, stdexec::transform_completion_signatures< __completion_signatures_of_t, _Env...>, - __eptr_completion, - __sigs::__default_set_value, - __error_t + __eptr_completion >, - __mbind_front_q<__values_t, _Sender>::template __f, - __error_t + __mbind_front_q<__values_t, _Sender>::template __f >; struct __repeat_effect_tag { }; diff --git a/test/exec/test_repeat_effect_until.cpp b/test/exec/test_repeat_effect_until.cpp index 08f2a65bd..879d52bfa 100644 --- a/test/exec/test_repeat_effect_until.cpp +++ b/test/exec/test_repeat_effect_until.cpp @@ -26,7 +26,9 @@ #include +#include #include +#include #include namespace ex = stdexec; @@ -37,7 +39,8 @@ namespace { using sender_concept = ex::sender_t; using __t = boolean_sender; using __id = boolean_sender; - using completion_signatures = ex::completion_signatures; + using completion_signatures = + ex::completion_signatures; template struct operation { @@ -88,6 +91,12 @@ namespace { TEST_CASE("simple example for repeat_effect_until", "[adaptors][repeat_effect_until]") { ex::sender auto snd = exec::repeat_effect_until(boolean_sender{}); + static_assert(all_contained_in< + ex::completion_signatures, + ex::completion_signatures_of_t>>); + static_assert(!all_contained_in< + ex::completion_signatures, + ex::completion_signatures_of_t>>); ex::sync_wait(std::move(snd)); } @@ -197,4 +206,104 @@ namespace { ex::start(op); REQUIRE(counter == 10); } + + TEST_CASE( + "repeat_effect works correctly when the child operation sends an error type which throws when " + "decay-copied", + "[adaptors][repeat_effect]") { + struct error_type { + explicit error_type(unsigned& throw_after) noexcept + : throw_after_(throw_after) { + } + error_type(const error_type& other) + : throw_after_(other.throw_after_) { + if (!throw_after_) { + throw std::logic_error("TEST"); + } + --throw_after_; + } + unsigned& throw_after_; + }; + struct receiver { + using receiver_concept = ::stdexec::receiver_t; + void set_value() && noexcept { + FAIL_CHECK("Unexpected value completion signal"); + } + void set_stopped() && noexcept { + FAIL_CHECK("Unexpected stopped completion signal"); + } + void set_error(std::exception_ptr) && noexcept { + CHECK(!done_); + } + void set_error(const error_type&) && noexcept { + CHECK(!done_); + done_ = true; + } + bool& done_; + }; + unsigned throw_after = 0; + bool done = false; + do { + const auto tmp = throw_after; + throw_after = std::numeric_limits::max(); + auto op = + ex::connect(exec::repeat_effect(ex::just_error(error_type(throw_after))), receiver(done)); + throw_after = tmp; + ex::start(op); + throw_after = tmp; + ++throw_after; + } while (!done); + } + + TEST_CASE( + "repeat_effect_until works correctly when the child operation sends type which throws when " + "decay-copied, and when converted to bool, and which is only rvalue convertible to bool", + "[adaptors][repeat_effect_until]") { + class value_type { + void maybe_throw_() const { + if (!throw_after_) { + throw std::logic_error("TEST"); + } + --throw_after_; + } + public: + explicit value_type(unsigned& throw_after) noexcept + : throw_after_(throw_after) { + } + value_type(const value_type& other) + : throw_after_(other.throw_after_) { + maybe_throw_(); + } + unsigned& throw_after_; + operator bool() && { + maybe_throw_(); + return true; + } + }; + struct receiver { + using receiver_concept = ::stdexec::receiver_t; + void set_value() && noexcept { + done_ = true; + } + void set_stopped() && noexcept { + FAIL_CHECK("Unexpected stopped completion signal"); + } + void set_error(std::exception_ptr) && noexcept { + CHECK(!done_); + } + bool& done_; + }; + unsigned throw_after = 0; + bool done = false; + do { + const auto tmp = throw_after; + throw_after = std::numeric_limits::max(); + auto op = + ex::connect(exec::repeat_effect_until(ex::just(value_type(throw_after))), receiver(done)); + throw_after = tmp; + ex::start(op); + throw_after = tmp; + ++throw_after; + } while (!done); + } } // namespace