123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284 |
- /*************************************************************************
- *
- * Copyright 2021 Realm Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **************************************************************************/
- #pragma once
- #include <condition_variable>
- #include <mutex>
- #include <type_traits>
- #include "realm/exceptions.hpp"
- #include "realm/status_with.hpp"
- #include "realm/util/assert.hpp"
- #include "realm/util/bind_ptr.hpp"
- #include "realm/util/features.h"
- #include "realm/util/functional.hpp"
- #include "realm/util/optional.hpp"
- #include "realm/util/scope_exit.hpp"
- namespace realm::util {
- namespace future_details {
- template <typename T>
- class Promise;
- template <typename T>
- class CopyablePromiseHolder;
- template <typename T>
- class Future;
- template <>
- class Future<void>;
- template <typename T>
- constexpr static bool is_future = false;
- template <typename T>
- constexpr static bool is_future<Future<T>> = true;
- // FakeVoid is a helper type for futures for callbacks or values that return/are void but need to be represented
- // as a value. It should not be visible to callers outside of future_details.
- struct FakeVoid {
- };
- template <typename T>
- using VoidToFakeVoid = std::conditional_t<std::is_void_v<T>, FakeVoid, T>;
- template <typename T>
- using FakeVoidToVoid = std::conditional_t<std::is_same_v<T, FakeVoid>, void, T>;
- // UnstatusType/UnwrappedType and their implementations are internal helper types for futures to deduce the actual
- // type of a value being wrapped by a StatusWith or Future (or a Status in case of void). They should not be visible
- // outside of future_details.
- template <typename T>
- struct UnstatusTypeImpl {
- using type = T;
- };
- template <typename T>
- struct UnstatusTypeImpl<StatusWith<T>> {
- using type = T;
- };
- template <>
- struct UnstatusTypeImpl<Status> {
- using type = void;
- };
- template <typename T>
- using UnstatusType = typename UnstatusTypeImpl<T>::type;
- template <typename T>
- struct UnwrappedTypeImpl {
- static_assert(!is_future<T>);
- static_assert(!is_status_or_status_with<T>);
- using type = T;
- };
- template <typename T>
- struct UnwrappedTypeImpl<Future<T>> {
- using type = T;
- };
- template <typename T>
- struct UnwrappedTypeImpl<StatusWith<T>> {
- using type = T;
- };
- template <>
- struct UnwrappedTypeImpl<Status> {
- using type = void;
- };
- template <typename T>
- using UnwrappedType = typename UnwrappedTypeImpl<T>::type;
- /**
- * call() normalizes arguments to hide the FakeVoid shenanigans from users of Futures.
- * In the future it may also expand tuples to argument lists.
- */
- template <typename Func, typename Arg>
- inline auto call(Func&& func, Arg&& arg)
- {
- return func(std::forward<Arg>(arg));
- }
- template <typename Func>
- inline auto call(Func&& func, FakeVoid)
- {
- return func();
- }
- template <typename Func>
- inline auto call(Func&& func, StatusWith<FakeVoid> sw)
- {
- return func(sw.get_status());
- }
- /**
- * no_throw_call() normalizes return values so everything returns StatusWith<T>. Exceptions are
- * converted to !OK statuses. void and Status returns are converted to StatusWith<FakeVoid>
- */
- template <typename Func, typename... Args>
- inline auto no_throw_call(Func&& func, Args&&... args) noexcept
- {
- using RawResult = decltype(call(func, std::forward<Args>(args)...));
- using Result = StatusWith<VoidToFakeVoid<UnstatusType<RawResult>>>;
- try {
- if constexpr (std::is_void_v<RawResult>) {
- call(func, std::forward<Args>(args)...);
- return Result(FakeVoid());
- }
- else if constexpr (std::is_same_v<RawResult, Status>) {
- auto s = call(func, std::forward<Args>(args)...);
- if (!s.is_ok()) {
- return Result(std::move(s));
- }
- return Result(FakeVoid());
- }
- else {
- return Result(call(func, std::forward<Args>(args)...));
- }
- }
- catch (...) {
- return Result(exception_to_status());
- }
- }
- /**
- * throwing_call() normalizes return values so everything returns T or FakeVoid. !OK Statuses are
- * converted exceptions. void and Status returns are converted to FakeVoid.
- *
- * This is equivalent to uassertStatusOK(statusCall(func, args...)), but avoids catching just to
- * rethrow.
- */
- template <typename Func, typename... Args>
- inline auto throwing_call(Func&& func, Args&&... args)
- {
- using Result = decltype(call(func, std::forward<Args>(args)...));
- if constexpr (std::is_void_v<Result>) {
- call(func, std::forward<Args>(args)...);
- return FakeVoid{};
- }
- else if constexpr (std::is_same_v<Result, Status>) {
- auto res = (call(func, std::forward<Args>(args)...));
- if (!res.is_ok()) {
- throw Exception(std::move(res));
- }
- return FakeVoid{};
- }
- else if constexpr (is_status_with<Result>) {
- auto res = (call(func, std::forward<Args>(args)...));
- if (!res.is_ok()) {
- throw Exception(std::move(res.get_status()));
- }
- return std::move(res.get_value());
- }
- else {
- return call(func, std::forward<Args>(args)...);
- }
- }
- template <typename Func, typename... Args>
- using RawNormalizedCallResult = decltype(throwing_call(std::declval<Func>(), std::declval<Args>()...));
- template <typename Func, typename... Args>
- using NormalizedCallResult = std::conditional_t<std::is_same<RawNormalizedCallResult<Func, Args...>, FakeVoid>::value,
- void, RawNormalizedCallResult<Func, Args...>>;
- template <typename T>
- struct FutureContinuationResultImpl {
- using type = T;
- };
- template <typename T>
- struct FutureContinuationResultImpl<Future<T>> {
- using type = T;
- };
- template <typename T>
- struct FutureContinuationResultImpl<StatusWith<T>> {
- using type = T;
- };
- template <>
- struct FutureContinuationResultImpl<Status> {
- using type = void;
- };
- class FutureRefCountable {
- FutureRefCountable(const FutureRefCountable&) = delete;
- FutureRefCountable& operator=(const FutureRefCountable&) = delete;
- public:
- void thread_unsafe_inc_refs_to(uint32_t count) const
- {
- REALM_ASSERT_DEBUG(m_refs.load(std::memory_order_relaxed) == (count - 1));
- m_refs.store(count, std::memory_order_relaxed);
- }
- protected:
- FutureRefCountable() = default;
- virtual ~FutureRefCountable() = default;
- template <typename>
- friend class ::realm::util::bind_ptr;
- void bind_ptr() const noexcept
- {
- // Assert that we haven't rolled over the counter here.
- auto ref_count = m_refs.fetch_add(1, std::memory_order_relaxed) + 1;
- REALM_ASSERT_DEBUG(ref_count != 0);
- }
- void unbind_ptr() const noexcept
- {
- if (m_refs.fetch_sub(1, std::memory_order_acq_rel) == 1) {
- delete this;
- }
- }
- private:
- mutable std::atomic<uint32_t> m_refs{0};
- };
- template <typename T, typename... Args, typename = std::enable_if_t<std::is_base_of_v<FutureRefCountable, T>>>
- util::bind_ptr<T> make_intrusive(Args&&... args)
- {
- auto ptr = new T(std::forward<Args>(args)...);
- ptr->thread_unsafe_inc_refs_to(1);
- return util::bind_ptr<T>(ptr, util::bind_ptr_base::adopt_tag{});
- }
- template <typename T>
- struct SharedStateImpl;
- template <typename T>
- using SharedState = SharedStateImpl<VoidToFakeVoid<T>>;
- /**
- * SSB is SharedStateBase, and this is its current state.
- */
- enum class SSBState : uint8_t {
- Init,
- Waiting,
- Finished, // This should stay last since we have code like assert(state < Finished).
- };
- struct SharedStateBase : public FutureRefCountable {
- SharedStateBase(const SharedStateBase&) = delete;
- SharedStateBase(SharedStateBase&&) = delete;
- SharedStateBase& operator=(const SharedStateBase&) = delete;
- SharedStateBase& operator=(SharedStateBase&&) = delete;
- virtual ~SharedStateBase() = default;
- // This is called by the future side.
- void wait() noexcept
- {
- if (m_state.load(std::memory_order_acquire) == SSBState::Finished) {
- return;
- }
- m_cv.emplace();
- auto old_state = SSBState::Init;
- if (REALM_UNLIKELY(
- !m_state.compare_exchange_strong(old_state, SSBState::Waiting, std::memory_order_acq_rel))) {
- REALM_ASSERT_DEBUG(old_state == SSBState::Finished);
- return;
- }
- std::unique_lock<std::mutex> lk(m_mutex);
- m_cv->wait(lk, [&] {
- return m_state.load(std::memory_order_acquire) == SSBState::Finished;
- });
- }
- void transition_to_finished() noexcept
- {
- auto old_state = m_state.exchange(SSBState::Finished, std::memory_order_acq_rel);
- if (old_state == SSBState::Init) {
- return;
- }
- REALM_ASSERT_DEBUG(old_state == SSBState::Waiting);
- #ifdef REALM_DEBUG
- // If you hit this limit one of two things has probably happened
- //
- // 1. The justForContinuation optimization isn't working.
- // 2. You may be creating a variable length chain.
- constexpr size_t kMaxDepth = 32;
- size_t depth = 0;
- for (auto ssb = m_continuation.get(); ssb;
- ssb = ssb->m_state.load(std::memory_order_acquire) == SSBState::Waiting ? ssb->m_continuation.get()
- : nullptr) {
- depth++;
- REALM_ASSERT(depth < kMaxDepth);
- }
- #endif
- if (m_callback) {
- m_callback(this);
- }
- if (m_cv) {
- std::lock_guard<std::mutex> lk(m_mutex);
- m_cv->notify_all();
- }
- }
- void set_status(Status status) noexcept
- {
- REALM_ASSERT_DEBUG(m_state.load() < SSBState::Finished);
- m_status = std::move(status);
- transition_to_finished();
- }
- // All the rest of the methods are only called by the promise side.
- //
- // Concurrency Rules for members: Each non-atomic member is initially owned by either the
- // Promise side or the Future side, indicated by a P/F comment. The general rule is that members
- // representing the propagating data are owned by Promise, while members representing what
- // to do with the data are owned by Future. The owner may freely modify the members it owns
- // until it releases them by doing a release-store to state of Finished from Promise or
- // Waiting from Future. Promise can acquire access to all members by doing an acquire-load of
- // state and seeing Waiting (or Future with Finished). Transitions should be done via
- // acquire-release exchanges to combine both actions.
- //
- // Future::propagateResults uses an alternative mechanism to transfer ownership of the
- // continuation member. The logical Future-side does a release-store of true to
- // isJustForContinuation, and the Promise-side can do an acquire-load seeing true to get access.
- //
- std::atomic<SSBState> m_state{SSBState::Init};
- // This is used to prevent infinite chains of SharedStates that just propagate results.
- std::atomic<bool> m_is_just_for_continuation{false};
- // This is likely to be a different derived type from this, since it is the logical output of
- // callback.
- util::bind_ptr<SharedStateBase> m_continuation; // F
- util::UniqueFunction<void(SharedStateBase*)> m_callback; // F
- std::mutex m_mutex; // F
- util::Optional<std::condition_variable> m_cv; // F
- Status m_status = Status::OK(); // P
- protected:
- SharedStateBase() = default;
- };
- template <typename T>
- struct SharedStateImpl final : public SharedStateBase {
- static_assert(!std::is_void_v<T>);
- void fill_from(SharedState<T>&& other)
- {
- REALM_ASSERT_DEBUG(m_state.load() < SSBState::Finished);
- REALM_ASSERT_DEBUG(other.m_state.load() == SSBState::Finished);
- REALM_ASSERT_DEBUG(m_owned_by_promise.load());
- if (other.m_status.is_ok()) {
- m_data = std::move(other.m_data);
- }
- else {
- m_status = std::move(other.m_status);
- }
- transition_to_finished();
- }
- template <typename... Args>
- void emplace_value(Args&&... args) noexcept
- {
- REALM_ASSERT_DEBUG(m_state.load() < SSBState::Finished);
- REALM_ASSERT_DEBUG(m_owned_by_promise.load());
- try {
- m_data.emplace(std::forward<Args>(args)...);
- }
- catch (...) {
- m_status = exception_to_status();
- }
- transition_to_finished();
- }
- void set_from(StatusOrStatusWith<T> roe)
- {
- if (roe.is_ok()) {
- emplace_value(std::move(roe.get_value()));
- }
- else {
- set_status(roe.get_status());
- }
- }
- void disown() const
- {
- REALM_ASSERT(m_owned_by_promise.exchange(false));
- }
- void claim() const
- {
- REALM_ASSERT(!m_owned_by_promise.exchange(true));
- }
- mutable std::atomic<bool> m_owned_by_promise{true};
- util::Optional<T> m_data; // P
- };
- } // namespace future_details
- // These are in the future_details namespace to get access to its contents, but they are part of the
- // public API.
- using future_details::CopyablePromiseHolder;
- using future_details::Future;
- using future_details::Promise;
- /**
- * This class represents the producer side of a Future.
- *
- * This is a single-shot class. You may only extract the Future once, and you may either set a value
- * or error at most once. Extracting the future and setting the value/error can be done in either
- * order.
- *
- * If the Future has been extracted, but no value or error has been set at the time this Promise is
- * destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered
- * a programmer error, and should not be relied upon. We may make it debug-fatal in the future.
- *
- * Only one thread can use a given Promise at a time. It is legal to have different threads setting
- * the value/error and extracting the Future, but it is the user's responsibility to ensure that
- * those calls are strictly synchronized. This is usually easiest to achieve by calling
- * make_promise_future<T>() then passing a SharedPromise to the completing threads.
- *
- * If the result is ready when producing the Future, it is more efficient to use
- * make_ready_future_with() or Future<T>::make_ready() than to use a Promise<T>.
- */
- template <typename T>
- class future_details::Promise {
- public:
- using value_type = T;
- Promise() = default;
- ~Promise()
- {
- if (REALM_UNLIKELY(m_shared_state)) {
- m_shared_state->set_status({ErrorCodes::BrokenPromise, "Broken Promise"});
- }
- }
- // If we want to enable move-assignability, we need to handle breaking the promise on the old
- // value of this.
- Promise& operator=(Promise&&) = delete;
- // The default move construction is fine.
- Promise(Promise&&) noexcept = default;
- /**
- * Sets the value into this Promise when the passed-in Future completes, which may have already
- * happened. If it hasn't, it is still safe to destroy this Promise since it is no longer
- * involved.
- */
- void set_from(Future<T>&& future) noexcept;
- void set_from_status_with(StatusWith<T> sw) noexcept
- {
- set_impl([&] {
- m_shared_state->set_from(std::move(sw));
- });
- }
- template <typename... Args>
- void emplace_value(Args&&... args) noexcept
- {
- set_impl([&] {
- m_shared_state->emplace_value(std::forward<Args>(args)...);
- });
- }
- void set_error(Status status) noexcept
- {
- REALM_ASSERT_DEBUG(!status.is_ok());
- set_impl([&] {
- m_shared_state->set_status(std::move(status));
- });
- }
- static auto make_promise_future_impl()
- {
- struct PromiseAndFuture {
- Promise<T> promise;
- Future<T> future = promise.get_future();
- };
- return PromiseAndFuture();
- }
- private:
- // This is not public because we found it frequently was involved in races. The
- // `make_promise_future<T>` API avoids those races entirely.
- Future<T> get_future() noexcept;
- friend class Future<void>;
- friend class CopyablePromiseHolder<T>;
- Promise(util::bind_ptr<SharedState<T>> shared_state)
- : m_shared_state(std::move(shared_state))
- {
- m_shared_state->claim();
- }
- util::bind_ptr<SharedState<T>> release() &&
- {
- auto ret = std::move(m_shared_state);
- ret->disown();
- return ret;
- }
- template <typename Func>
- void set_impl(Func&& do_set) noexcept
- {
- REALM_ASSERT(m_shared_state);
- do_set();
- m_shared_state.reset();
- }
- util::bind_ptr<SharedState<T>> m_shared_state = make_intrusive<SharedState<T>>();
- };
- /**
- * CopyablePromiseHolder<T> is a lightweight copyable holder for Promises so they can be captured inside
- * of std::function's and other types that require all members to be copyable.
- *
- * The only thing you can do with a CopyablePromiseHolder is extract a regular promise from it exactly once,
- * and copy/move it as you would a util::bind_ptr.
- *
- * Do not use this type to try to fill a Promise from multiple places or threads.
- */
- template <typename T>
- class future_details::CopyablePromiseHolder {
- public:
- CopyablePromiseHolder(Promise<T>&& input)
- : m_shared_state(std::move(input).release())
- {
- }
- CopyablePromiseHolder(const Promise<T>&) = delete;
- Promise<T> get_promise()
- {
- REALM_ASSERT(m_shared_state);
- return Promise<T>(std::move(m_shared_state));
- }
- private:
- util::bind_ptr<SharedState<T>> m_shared_state;
- };
- /**
- * Future<T> is logically a possibly-deferred T or exception_ptr
- * As is usual for rvalue-qualified methods, you may call at most one of them on a given Future.
- *
- * A future may be passed between threads, but only one thread may use it at a time.
- */
- template <typename T>
- class REALM_NODISCARD future_details::Future {
- public:
- static_assert(!is_future<T>, "Future<Future<T>> is banned. Just use Future<T> instead.");
- static_assert(!std::is_reference<T>::value, "Future<T&> is banned.");
- static_assert(!std::is_const<T>::value, "Future<const T> is banned.");
- static_assert(!std::is_array<T>::value, "Future<T[]> is banned.");
- using value_type = T;
- /**
- * Constructs a Future in a moved-from state that can only be assigned to or destroyed.
- */
- Future() = default;
- Future& operator=(Future&&) = default;
- Future(Future&&) = default;
- Future(const Future&) = delete;
- Future& operator=(const Future&) = delete;
- /* implicit */ Future(T val)
- : Future(make_ready(std::move(val)))
- {
- }
- /* implicit */ Future(Status status)
- : Future(make_ready(std::move(status)))
- {
- }
- /* implicit */ Future(StatusWith<T> sw)
- : Future(make_ready(std::move(sw)))
- {
- }
- /**
- * Make a ready Future<T> from a value for cases where you don't need to wait asynchronously.
- *
- * Calling this is faster than getting a Future out of a Promise, and is effectively free. It is
- * fast enough that you never need to avoid returning a Future from an API, even if the result
- * is ready 99.99% of the time.
- *
- * As an example, if you are handing out results from a batch, you can use this when for each
- * result while you have a batch, then use a Promise to return a not-ready Future when you need
- * to get another batch.
- */
- static Future<T> make_ready(T val)
- { // TODO emplace?
- Future out;
- out.m_immediate = std::move(val);
- return out;
- }
- static Future<T> make_ready(Status status)
- {
- auto out = Future<T>(make_intrusive<SharedState<T>>());
- out.m_shared->set_status(std::move(status));
- return out;
- }
- static Future<T> make_ready(StatusWith<T> val)
- {
- if (val.is_ok()) {
- return make_ready(std::move(val.get_value()));
- }
- return make_ready(val.get_status());
- }
- /**
- * If this returns true, get() is guaranteed not to block and callbacks will be immediately
- * invoked. You can't assume anything if this returns false since it may be completed
- * immediately after checking (unless you have independent knowledge that this Future can't
- * complete in the background).
- *
- * Callers must still call get() or similar, even on Future<void>, to ensure that they are
- * correctly sequenced with the completing task, and to be informed about whether the Promise
- * completed successfully.
- *
- * This is generally only useful as an optimization to avoid prep work, such as setting up
- * timeouts, that is unnecessary if the Future is ready already.
- */
- bool is_ready() const
- {
- // This can be a relaxed load because callers are not allowed to use it to establish
- // ordering.
- return m_immediate || m_shared->m_state.load(std::memory_order_relaxed) == SSBState::Finished;
- }
- /**
- * Gets the value out of this Future, blocking until it is ready.
- *
- * get() methods throw on error, while get_no_throw() returns a Statuswith<T> with either a value
- * or an error Status.
- *
- * These methods can be called multiple times, except for the rvalue overloads.
- */
- T get() &&
- {
- return std::move(get_impl());
- }
- T& get() &
- {
- return get_impl();
- }
- const T& get() const&
- {
- return const_cast<Future*>(this)->get_impl();
- }
- StatusWith<T> get_no_throw() const& noexcept
- {
- if (m_immediate) {
- return *m_immediate;
- }
- m_shared->wait();
- if (!m_shared->m_data) {
- return m_shared->m_status;
- }
- return *m_shared->m_data;
- }
- StatusWith<T> get_no_throw() && noexcept
- {
- if (m_immediate) {
- return std::move(*m_immediate);
- }
- m_shared->wait();
- if (!m_shared->m_data) {
- return m_shared->m_status;
- }
- return std::move(*m_shared->m_data);
- }
- /**
- * This ends the Future continuation chain by calling a callback on completion. Use this to
- * escape back into a callback-based API.
- *
- * The callback must not throw since it is called from a noexcept context. The callback must take a
- * StatusOrStatusWith as its argument and have a return type of void.
- */
- template <typename Func> // StatusOrStatusWith<T> -> void
- void get_async(Func&& func) && noexcept
- {
- static_assert(std::is_void_v<std::invoke_result_t<Func, StatusOrStatusWith<FakeVoidToVoid<T>>>>);
- return general_impl(
- // on ready success:
- [&](T&& val) {
- call(func, StatusWith<T>(std::move(val)));
- },
- // on ready failure:
- [&](Status&& status) {
- call(func, StatusWith<T>(std::move(status)));
- },
- // on not ready:
- [&] {
- m_shared->m_callback = [func = std::forward<Func>(func)](SharedStateBase* ssb) mutable noexcept {
- const auto input = static_cast<SharedState<T>*>(ssb);
- if (input->m_status.is_ok()) {
- call(func, StatusWith<T>(std::move(*input->m_data)));
- }
- else {
- call(func, StatusWith<T>(std::move(input->m_status)));
- }
- };
- });
- }
- //
- // The remaining methods are all continuation based and take a callback and return a Future.
- // Each method has a comment indicating the supported signatures for that callback, and a
- // description of when the callback is invoked and how the impacts the returned Future. It may
- // be helpful to think of Future continuation chains as a pipeline of stages that take input
- // from earlier stages and produce output for later stages.
- //
- // Be aware that the callback may be invoked inline at the call-site or at the producer when
- // setting the value. Therefore, you should avoid doing blocking work inside of a callback.
- // Additionally, avoid acquiring any locks or mutexes that the caller already holds, otherwise
- // you risk a deadlock. If either of these concerns apply to your callback, it should schedule
- // itself on an executor, rather than doing work in the callback.
- //
- // Error handling in callbacks: all exceptions thrown propagate to the returned Future
- // automatically.
- //
- // Callbacks that return Future<T> are automatically unwrapped and connected to the returned
- // Future<T>, rather than producing a Future<Future<T>>.
- /**
- * Callbacks passed to then() are only called if the input Future completes successfully.
- * Otherwise the error propagates automatically, bypassing the callback.
- */
- template <typename Func>
- auto then(Func&& func) && noexcept
- {
- using Result = NormalizedCallResult<Func, T>;
- if constexpr (!is_future<Result>) {
- return general_impl(
- // on ready success:
- [&](T&& val) {
- return Future<Result>::make_ready(no_throw_call(func, std::move(val)));
- },
- // on ready failure:
- [&](Status&& status) {
- return Future<Result>::make_ready(std::move(status));
- },
- // on not ready yet:
- [&] {
- return make_continuation<Result>(
- [func = std::forward<Func>(func)](SharedState<T>* input,
- SharedState<Result>* output) mutable noexcept {
- if (!input->m_status.is_ok()) {
- output->set_status(input->m_status);
- return;
- }
- output->set_from(no_throw_call(func, std::move(*input->m_data)));
- });
- });
- }
- else {
- using UnwrappedResult = typename Result::value_type;
- return general_impl(
- // on ready success:
- [&](T&& val) {
- try {
- return Future<UnwrappedResult>(throwing_call(func, std::move(val)));
- }
- catch (...) {
- return Future<UnwrappedResult>::make_ready(exception_to_status());
- }
- },
- // on ready failure:
- [&](Status&& status) {
- return Future<UnwrappedResult>::make_ready(std::move(status));
- },
- // on not ready yet:
- [&] {
- return make_continuation<UnwrappedResult>(
- [func = std::forward<Func>(func)](SharedState<T>* input,
- SharedState<UnwrappedResult>* output) mutable noexcept {
- if (!input->m_status.is_ok())
- return output->set_status(std::move(input->m_status));
- try {
- throwing_call(func, std::move(*input->m_data)).propagate_result_to(output);
- }
- catch (...) {
- output->set_status(exception_to_status());
- }
- });
- });
- }
- }
- /*
- * Callbacks passed to on_completion() are always called with a StatusWith<T> when the input future completes.
- */
- template <typename Func>
- auto on_completion(Func&& func) && noexcept
- {
- using Wrapper = StatusOrStatusWith<T>;
- using Result = NormalizedCallResult<Func, StatusOrStatusWith<T>>;
- if constexpr (!is_future<Result>) {
- return general_impl(
- // on ready success:
- [&](T&& val) {
- return Future<Result>::make_ready(
- no_throw_call(std::forward<Func>(func), Wrapper(std::move(val))));
- },
- // on ready failure:
- [&](Status&& status) {
- return Future<Result>::make_ready(
- no_throw_call(std::forward<Func>(func), Wrapper(std::move(status))));
- },
- // on not ready yet:
- [&] {
- return make_continuation<Result>(
- [func = std::forward<Func>(func)](SharedState<T>* input,
- SharedState<Result>* output) mutable noexcept {
- if (!input->m_status.is_ok())
- return output->set_from(no_throw_call(func, Wrapper(std::move(input->m_status))));
- output->set_from(no_throw_call(func, Wrapper(std::move(*input->m_data))));
- });
- });
- }
- else {
- using UnwrappedResult = typename Result::value_type;
- return general_impl(
- // on ready success:
- [&](T&& val) {
- try {
- return Future<UnwrappedResult>(
- throwing_call(std::forward<Func>(func), Wrapper(std::move(val))));
- }
- catch (...) {
- return Future<UnwrappedResult>::make_ready(exception_to_status());
- }
- },
- // on ready failure:
- [&](Status&& status) {
- try {
- return Future<UnwrappedResult>(
- throwing_call(std::forward<Func>(func), Wrapper(std::move(status))));
- }
- catch (...) {
- return Future<UnwrappedResult>::make_ready(exception_to_status());
- }
- },
- // on not ready yet:
- [&] {
- return make_continuation<UnwrappedResult>(
- [func = std::forward<Func>(func)](SharedState<T>* input,
- SharedState<UnwrappedResult>* output) mutable noexcept {
- if (!input->m_status.is_ok()) {
- try {
- throwing_call(func, Wrapper(std::move(input->m_status)))
- .propagate_result_to(output);
- }
- catch (...) {
- output->set_status(exception_to_status());
- }
- return;
- }
- try {
- throwing_call(func, Wrapper(std::move(*input->m_data))).propagate_result_to(output);
- }
- catch (...) {
- output->set_status(exception_to_status());
- }
- });
- });
- }
- }
- /**
- * Callbacks passed to on_error() are only called if the input Future completes with an error.
- * Otherwise, the successful result propagates automatically, bypassing the callback.
- *
- * The callback can either produce a replacement value (which must be a T), return a replacement
- * Future<T> (such as a by retrying), or return/throw a replacement error.
- *
- * Note that this will only catch errors produced by earlier stages; it is not registering a
- * general error handler for the entire chain.
- */
- template <typename Func>
- Future<FakeVoidToVoid<T>> on_error(Func&& func) && noexcept
- {
- using Result = NormalizedCallResult<Func, Status>;
- static_assert(std::is_same<VoidToFakeVoid<UnwrappedType<Result>>, T>::value,
- "func passed to Future<T>::onError must return T, StatusWith<T>, or Future<T>");
- if constexpr (!is_future<Result>) {
- return general_impl(
- // on ready success:
- [&](T&& val) {
- return Future<T>::make_ready(std::move(val));
- },
- // on ready failure:
- [&](Status&& status) {
- return Future<T>::make_ready(no_throw_call(func, std::move(status)));
- },
- // on not ready yet:
- [&] {
- return make_continuation<T>([func = std::forward<Func>(func)](
- SharedState<T>* input, SharedState<T>* output) mutable noexcept {
- if (input->m_status.is_ok())
- return output->emplace_value(std::move(*input->m_data));
- output->set_from(no_throw_call(func, std::move(input->m_status)));
- });
- });
- }
- else {
- return general_impl(
- // on ready success:
- [&](T&& val) {
- return Future<T>::make_ready(std::move(val));
- },
- // on ready failure:
- [&](Status&& status) {
- try {
- return Future<T>(throwing_call(func, std::move(status)));
- }
- catch (...) {
- return Future<T>::make_ready(exception_to_status());
- }
- },
- // on not ready yet:
- [&] {
- return make_continuation<T>([func = std::forward<Func>(func)](
- SharedState<T>* input, SharedState<T>* output) mutable noexcept {
- if (input->m_status.is_ok())
- return output->emplace_value(std::move(*input->m_data));
- try {
- throwing_call(func, std::move(input->m_status)).propagate_result_to(output);
- }
- catch (...) {
- output->set_status(exception_to_status());
- }
- });
- });
- }
- }
- Future<void> ignore_value() && noexcept;
- private:
- template <typename T2>
- friend class Future;
- friend class Promise<T>;
- T& get_impl()
- {
- if (m_immediate) {
- return *m_immediate;
- }
- m_shared->wait();
- if (!m_shared->m_status.is_ok()) {
- throw Exception(m_shared->m_status);
- }
- return *m_shared->m_data;
- }
- // All callbacks are called immediately so they are allowed to capture everything by reference.
- // All callbacks should return the same return type.
- template <typename SuccessFunc, typename FailFunc, typename NotReady>
- auto general_impl(SuccessFunc&& success, FailFunc&& fail, NotReady&& notReady) noexcept
- {
- if (m_immediate) {
- return success(std::move(*m_immediate));
- }
- if (m_shared->m_state.load(std::memory_order_acquire) == SSBState::Finished) {
- if (m_shared->m_data) {
- return success(std::move(*m_shared->m_data));
- }
- else {
- return fail(std::move(m_shared->m_status));
- }
- }
- // This is always done after notReady, which never throws. It is in a ScopeExit to
- // support both void- and value-returning notReady implementations since we can't assign
- // void to a variable.
- auto guard = util::make_scope_exit([&]() noexcept {
- auto old_state = SSBState::Init;
- if (REALM_UNLIKELY(!m_shared->m_state.compare_exchange_strong(old_state, SSBState::Waiting,
- std::memory_order_acq_rel))) {
- REALM_ASSERT_DEBUG(old_state == SSBState::Finished);
- m_shared->m_callback(m_shared.get());
- }
- });
- return notReady();
- }
- template <typename Result, typename OnReady>
- inline Future<Result> make_continuation(OnReady&& on_ready)
- {
- REALM_ASSERT(!m_shared->m_callback && !m_shared->m_continuation);
- auto continuation = make_intrusive<SharedState<Result>>();
- continuation->thread_unsafe_inc_refs_to(2);
- m_shared->m_continuation.reset(continuation.get(), util::bind_ptr_base::adopt_tag{});
- m_shared->m_callback = [on_ready = std::forward<OnReady>(on_ready)](SharedStateBase* ssb) mutable noexcept {
- const auto input = static_cast<SharedState<T>*>(ssb);
- const auto output = static_cast<SharedState<Result>*>(ssb->m_continuation.get());
- on_ready(input, output);
- };
- return Future<VoidToFakeVoid<Result>>(std::move(continuation));
- }
- void propagate_result_to(SharedState<T>* output) && noexcept
- {
- general_impl(
- // on ready success:
- [&](T&& val) {
- output->emplace_value(std::move(val));
- },
- // on ready failure:
- [&](Status&& status) {
- output->set_status(std::move(status));
- },
- // on not ready yet:
- [&] {
- // If the output is just for continuation, bypass it and just directly fill in the
- // SharedState that it would write to. The concurrency situation is a bit subtle
- // here since we are the Future-side of shared, but the Promise-side of output.
- // The rule is that p->isJustForContinuation must be acquire-read as true before
- // examining p->continuation, and p->continuation must be written before doing the
- // release-store of true to p->isJustForContinuation.
- if (output->m_is_just_for_continuation.load(std::memory_order_acquire)) {
- m_shared->m_continuation = std::move(output->m_continuation);
- }
- else {
- m_shared->m_continuation = util::bind_ptr(output);
- }
- m_shared->m_is_just_for_continuation.store(true, std::memory_order_release);
- m_shared->m_callback = [](SharedStateBase* ssb) noexcept {
- const auto input = static_cast<SharedState<T>*>(ssb);
- const auto output = static_cast<SharedState<T>*>(ssb->m_continuation.get());
- output->fill_from(std::move(*input));
- };
- });
- }
- explicit Future(util::bind_ptr<SharedState<T>> ptr)
- : m_shared(std::move(ptr))
- {
- }
- // At most one of these will be active.
- util::Optional<T> m_immediate;
- util::bind_ptr<SharedState<T>> m_shared;
- };
- /**
- * The void specialization of Future<T>. See the general Future<T> for detailed documentation.
- * It should be the same as the generic Future<T> with the following exceptions:
- * - Anything mentioning StatusWith<T> will use Status instead.
- * - Anything returning references to T will just return void since there are no void references.
- * - Anything taking a T argument will receive no arguments.
- */
- template <>
- class REALM_NODISCARD future_details::Future<void> {
- public:
- using value_type = void;
- /* implicit */ Future()
- : Future(make_ready())
- {
- }
- /* implicit */ Future(Status status)
- : Future(make_ready(std::move(status)))
- {
- }
- static Future<void> make_ready()
- {
- return Future<FakeVoid>::make_ready(FakeVoid{});
- }
- static Future<void> make_ready(Status status)
- {
- if (status.is_ok())
- return make_ready();
- return Future<FakeVoid>::make_ready(std::move(status));
- }
- bool is_ready() const
- {
- return inner.is_ready();
- }
- void get() const
- {
- inner.get();
- }
- Status get_no_throw() const noexcept
- {
- return inner.get_no_throw().get_status();
- }
- template <typename Func> // Status -> void
- void get_async(Func&& func) && noexcept
- {
- return std::move(inner).get_async(std::forward<Func>(func));
- }
- template <typename Func> // () -> T or StatusWith<T> or Future<T>
- auto then(Func&& func) && noexcept
- {
- return std::move(inner).then(std::forward<Func>(func));
- }
- template <typename Func>
- auto on_error(Func&& func) && noexcept
- {
- return std::move(inner).on_error(std::forward<Func>(func));
- }
- template <typename Func>
- auto on_completion(Func&& func) && noexcept
- {
- return std::move(inner).on_completion(std::forward<Func>(func));
- }
- Future<void> ignore_value() && noexcept
- {
- return std::move(*this);
- }
- private:
- template <typename T>
- friend class Future;
- friend class Promise<void>;
- explicit Future(util::bind_ptr<SharedState<FakeVoid>> ptr)
- : inner(std::move(ptr))
- {
- }
- /*implicit*/ Future(Future<FakeVoid>&& inner)
- : inner(std::move(inner))
- {
- }
- /*implicit*/ operator Future<FakeVoid>() &&
- {
- return std::move(inner);
- }
- void propagate_result_to(SharedState<void>* output) && noexcept
- {
- std::move(inner).propagate_result_to(output);
- }
- static Future<void> make_ready(StatusWith<FakeVoid> status)
- {
- return Future<FakeVoid>::make_ready(std::move(status));
- }
- Future<FakeVoid> inner;
- };
- /**
- * Returns a bound Promise and Future in a struct with friendly names (promise and future) that also
- * works well with C++17 structured bindings.
- */
- template <typename T>
- inline auto make_promise_future()
- {
- return Promise<T>::make_promise_future_impl();
- }
- /**
- * This metafunction allows APIs that take callbacks and return Future to avoid doing their own type
- * calculus. This results in the base value_type that would result from passing Func to a
- * Future<T>::then(), with the same normalizing of T/StatusWith<T>/Future<T> returns. This is
- * primarily useful for implementations of executors rather than their users.
- *
- * This returns the unwrapped T rather than Future<T> so it will be easy to create a Promise<T>.
- *
- * Examples:
- *
- * FutureContinuationResult<std::function<void()>> == void
- * FutureContinuationResult<std::function<Status()>> == void
- * FutureContinuationResult<std::function<Future<void>()>> == void
- *
- * FutureContinuationResult<std::function<int()>> == int
- * FutureContinuationResult<std::function<StatusWith<int>()>> == int
- * FutureContinuationResult<std::function<Future<int>()>> == int
- *
- * FutureContinuationResult<std::function<int(bool)>, bool> == int
- *
- * FutureContinuationResult<std::function<int(bool)>, NotBool> SFINAE-safe substitution failure.
- */
- template <typename Func, typename... Args>
- using FutureContinuationResult =
- typename future_details::FutureContinuationResultImpl<std::invoke_result_t<Func, Args...>>::type;
- //
- // Implementations of methods that couldn't be defined in the class due to ordering requirements.
- //
- template <typename T>
- inline Future<T> Promise<T>::get_future() noexcept
- {
- m_shared_state->thread_unsafe_inc_refs_to(2);
- return Future<T>(util::bind_ptr<SharedState<T>>(m_shared_state.get(), util::bind_ptr_base::adopt_tag{}));
- }
- template <typename T>
- inline void Promise<T>::set_from(Future<T>&& future) noexcept
- {
- set_impl([&] {
- std::move(future).propagate_result_to(m_shared_state.get());
- });
- }
- template <typename T>
- inline Future<void> Future<T>::ignore_value() && noexcept
- {
- return std::move(*this).then([](auto&&) {});
- }
- } // namespace realm::util
|