Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 24 additions & 23 deletions include/exec/repeat_effect_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "trampoline_scheduler.hpp"
#include "sequence.hpp"

#include "../stdexec/__detail/__atomic.hpp"
#include <exception>
#include <type_traits>

Expand Down Expand Up @@ -82,7 +81,7 @@ namespace exec {
using __child_op_t = stdexec::connect_result_t<__child_on_sched_sender_t, __receiver_t>;

__child_t __child_;
__std::atomic_flag __started_{};
bool __has_child_op_ = false;
stdexec::__manual_lifetime<__child_op_t> __child_op_;
trampoline_scheduler __sched_;

Expand All @@ -93,11 +92,7 @@ namespace exec {
}

~__repeat_effect_state() {
if (!__started_.test(__std::memory_order_acquire)) {
__std::atomic_thread_fence(__std::memory_order_release);
// TSan does not support __std::atomic_thread_fence, so we
// need to use the TSan-specific __tsan_release instead:
STDEXEC_WHEN(STDEXEC_TSAN(), __tsan_release(&__started_));
if (__has_child_op_) {
__child_op_.__destroy();
}
}
Expand All @@ -107,30 +102,42 @@ namespace exec {
return stdexec::connect(
exec::sequence(stdexec::schedule(__sched_), __child_), __receiver_t{this});
});
__has_child_op_ = true;
}

void __destroy() noexcept {
__child_op_.__destroy();
__has_child_op_ = false;
}

void __start() noexcept {
const bool __already_started [[maybe_unused]]
= __started_.test_and_set(__std::memory_order_relaxed);
STDEXEC_ASSERT(!__already_started);
STDEXEC_ASSERT(__has_child_op_);
stdexec::start(__child_op_.__get());
}

template <class _Tag, class... _Args>
void __complete(_Tag, _Args... __args) noexcept { // Intentionally by value...
__child_op_.__destroy(); // ... because this could potentially invalidate them.
void __complete(_Tag, _Args &&...__args) noexcept {
if constexpr (same_as<_Tag, set_value_t>) {
// If the sender completed with true, we're done
STDEXEC_TRY {
const bool __done = (static_cast<bool>(__args) && ...);
const bool __done = (static_cast<bool>(static_cast<_Args &&>(__args)) && ...);
if (__done) {
stdexec::set_value(static_cast<_Receiver &&>(this->__receiver()));
} else {
return;
}
__destroy();
STDEXEC_TRY {
__connect();
stdexec::start(__child_op_.__get());
}
STDEXEC_CATCH_ALL {
stdexec::set_error(
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
return;
}
stdexec::start(__child_op_.__get());
}
STDEXEC_CATCH_ALL {
__destroy();
stdexec::set_error(
static_cast<_Receiver &&>(this->__receiver()), std::current_exception());
}
Expand Down Expand Up @@ -160,20 +167,14 @@ namespace exec {
__mexception<_INVALID_ARGUMENT_TO_REPEAT_EFFECT_UNTIL_<>, _WITH_SENDER_<_Sender>>
>;

template <class _Error>
using __error_t = completion_signatures<set_error_t(__decay_t<_Error>)>;

template <class _Sender, class... _Env>
using __completions_t = stdexec::transform_completion_signatures<
__completion_signatures_of_t<__decay_t<_Sender> &, _Env...>,
stdexec::transform_completion_signatures<
__completion_signatures_of_t<stdexec::schedule_result_t<exec::trampoline_scheduler>, _Env...>,
__eptr_completion,
__sigs::__default_set_value,
__error_t
__eptr_completion
>,
__mbind_front_q<__values_t, _Sender>::template __f,
__error_t
__mbind_front_q<__values_t, _Sender>::template __f
>;

struct __repeat_effect_tag { };
Expand Down
111 changes: 110 additions & 1 deletion test/exec/test_repeat_effect_until.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

#include <catch2/catch.hpp>

#include <limits>
#include <memory>
#include <stdexcept>
#include <utility>

namespace ex = stdexec;
Expand All @@ -37,7 +39,8 @@ namespace {
using sender_concept = ex::sender_t;
using __t = boolean_sender;
using __id = boolean_sender;
using completion_signatures = ex::completion_signatures<ex::set_value_t(bool)>;
using completion_signatures =
ex::completion_signatures<ex::set_value_t(bool), ex::set_error_t(const int&)>;

template <class Receiver>
struct operation {
Expand Down Expand Up @@ -88,6 +91,12 @@ namespace {

TEST_CASE("simple example for repeat_effect_until", "[adaptors][repeat_effect_until]") {
ex::sender auto snd = exec::repeat_effect_until(boolean_sender{});
static_assert(all_contained_in<
ex::completion_signatures<ex::set_error_t(const int&)>,
ex::completion_signatures_of_t<decltype(snd), ex::env<>>>);
static_assert(!all_contained_in<
ex::completion_signatures<ex::set_error_t(int)>,
ex::completion_signatures_of_t<decltype(snd), ex::env<>>>);
ex::sync_wait(std::move(snd));
}

Expand Down Expand Up @@ -197,4 +206,104 @@ namespace {
ex::start(op);
REQUIRE(counter == 10);
}

TEST_CASE(
"repeat_effect works correctly when the child operation sends an error type which throws when "
"decay-copied",
"[adaptors][repeat_effect]") {
struct error_type {
explicit error_type(unsigned& throw_after) noexcept
: throw_after_(throw_after) {
}
error_type(const error_type& other)
: throw_after_(other.throw_after_) {
if (!throw_after_) {
throw std::logic_error("TEST");
}
--throw_after_;
}
unsigned& throw_after_;
};
struct receiver {
using receiver_concept = ::stdexec::receiver_t;
void set_value() && noexcept {
FAIL_CHECK("Unexpected value completion signal");
}
void set_stopped() && noexcept {
FAIL_CHECK("Unexpected stopped completion signal");
}
void set_error(std::exception_ptr) && noexcept {
CHECK(!done_);
}
void set_error(const error_type&) && noexcept {
CHECK(!done_);
done_ = true;
}
bool& done_;
};
unsigned throw_after = 0;
bool done = false;
do {
const auto tmp = throw_after;
throw_after = std::numeric_limits<unsigned>::max();
auto op =
ex::connect(exec::repeat_effect(ex::just_error(error_type(throw_after))), receiver(done));
throw_after = tmp;
ex::start(op);
throw_after = tmp;
++throw_after;
} while (!done);
}

TEST_CASE(
"repeat_effect_until works correctly when the child operation sends type which throws when "
"decay-copied, and when converted to bool, and which is only rvalue convertible to bool",
"[adaptors][repeat_effect_until]") {
class value_type {
void maybe_throw_() const {
if (!throw_after_) {
throw std::logic_error("TEST");
}
--throw_after_;
}
public:
explicit value_type(unsigned& throw_after) noexcept
: throw_after_(throw_after) {
}
value_type(const value_type& other)
: throw_after_(other.throw_after_) {
maybe_throw_();
}
unsigned& throw_after_;
operator bool() && {
maybe_throw_();
return true;
}
};
struct receiver {
using receiver_concept = ::stdexec::receiver_t;
void set_value() && noexcept {
done_ = true;
}
void set_stopped() && noexcept {
FAIL_CHECK("Unexpected stopped completion signal");
}
void set_error(std::exception_ptr) && noexcept {
CHECK(!done_);
}
bool& done_;
};
unsigned throw_after = 0;
bool done = false;
do {
const auto tmp = throw_after;
throw_after = std::numeric_limits<unsigned>::max();
auto op =
ex::connect(exec::repeat_effect_until(ex::just(value_type(throw_after))), receiver(done));
throw_after = tmp;
ex::start(op);
throw_after = tmp;
++throw_after;
} while (!done);
}
} // namespace