future.hpp 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284
  1. /*************************************************************************
  2. *
  3. * Copyright 2021 Realm Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. **************************************************************************/
  18. #pragma once
  19. #include <condition_variable>
  20. #include <mutex>
  21. #include <type_traits>
  22. #include "realm/exceptions.hpp"
  23. #include "realm/status_with.hpp"
  24. #include "realm/util/assert.hpp"
  25. #include "realm/util/bind_ptr.hpp"
  26. #include "realm/util/features.h"
  27. #include "realm/util/functional.hpp"
  28. #include "realm/util/optional.hpp"
  29. #include "realm/util/scope_exit.hpp"
  30. namespace realm::util {
  31. namespace future_details {
  32. template <typename T>
  33. class Promise;
  34. template <typename T>
  35. class CopyablePromiseHolder;
  36. template <typename T>
  37. class Future;
  38. template <>
  39. class Future<void>;
  40. template <typename T>
  41. constexpr static bool is_future = false;
  42. template <typename T>
  43. constexpr static bool is_future<Future<T>> = true;
  44. // FakeVoid is a helper type for futures for callbacks or values that return/are void but need to be represented
  45. // as a value. It should not be visible to callers outside of future_details.
  46. struct FakeVoid {
  47. };
  48. template <typename T>
  49. using VoidToFakeVoid = std::conditional_t<std::is_void_v<T>, FakeVoid, T>;
  50. template <typename T>
  51. using FakeVoidToVoid = std::conditional_t<std::is_same_v<T, FakeVoid>, void, T>;
  52. // UnstatusType/UnwrappedType and their implementations are internal helper types for futures to deduce the actual
  53. // type of a value being wrapped by a StatusWith or Future (or a Status in case of void). They should not be visible
  54. // outside of future_details.
  55. template <typename T>
  56. struct UnstatusTypeImpl {
  57. using type = T;
  58. };
  59. template <typename T>
  60. struct UnstatusTypeImpl<StatusWith<T>> {
  61. using type = T;
  62. };
  63. template <>
  64. struct UnstatusTypeImpl<Status> {
  65. using type = void;
  66. };
  67. template <typename T>
  68. using UnstatusType = typename UnstatusTypeImpl<T>::type;
  69. template <typename T>
  70. struct UnwrappedTypeImpl {
  71. static_assert(!is_future<T>);
  72. static_assert(!is_status_or_status_with<T>);
  73. using type = T;
  74. };
  75. template <typename T>
  76. struct UnwrappedTypeImpl<Future<T>> {
  77. using type = T;
  78. };
  79. template <typename T>
  80. struct UnwrappedTypeImpl<StatusWith<T>> {
  81. using type = T;
  82. };
  83. template <>
  84. struct UnwrappedTypeImpl<Status> {
  85. using type = void;
  86. };
  87. template <typename T>
  88. using UnwrappedType = typename UnwrappedTypeImpl<T>::type;
  89. /**
  90. * call() normalizes arguments to hide the FakeVoid shenanigans from users of Futures.
  91. * In the future it may also expand tuples to argument lists.
  92. */
  93. template <typename Func, typename Arg>
  94. inline auto call(Func&& func, Arg&& arg)
  95. {
  96. return func(std::forward<Arg>(arg));
  97. }
  98. template <typename Func>
  99. inline auto call(Func&& func, FakeVoid)
  100. {
  101. return func();
  102. }
  103. template <typename Func>
  104. inline auto call(Func&& func, StatusWith<FakeVoid> sw)
  105. {
  106. return func(sw.get_status());
  107. }
  108. /**
  109. * no_throw_call() normalizes return values so everything returns StatusWith<T>. Exceptions are
  110. * converted to !OK statuses. void and Status returns are converted to StatusWith<FakeVoid>
  111. */
  112. template <typename Func, typename... Args>
  113. inline auto no_throw_call(Func&& func, Args&&... args) noexcept
  114. {
  115. using RawResult = decltype(call(func, std::forward<Args>(args)...));
  116. using Result = StatusWith<VoidToFakeVoid<UnstatusType<RawResult>>>;
  117. try {
  118. if constexpr (std::is_void_v<RawResult>) {
  119. call(func, std::forward<Args>(args)...);
  120. return Result(FakeVoid());
  121. }
  122. else if constexpr (std::is_same_v<RawResult, Status>) {
  123. auto s = call(func, std::forward<Args>(args)...);
  124. if (!s.is_ok()) {
  125. return Result(std::move(s));
  126. }
  127. return Result(FakeVoid());
  128. }
  129. else {
  130. return Result(call(func, std::forward<Args>(args)...));
  131. }
  132. }
  133. catch (...) {
  134. return Result(exception_to_status());
  135. }
  136. }
  137. /**
  138. * throwing_call() normalizes return values so everything returns T or FakeVoid. !OK Statuses are
  139. * converted exceptions. void and Status returns are converted to FakeVoid.
  140. *
  141. * This is equivalent to uassertStatusOK(statusCall(func, args...)), but avoids catching just to
  142. * rethrow.
  143. */
  144. template <typename Func, typename... Args>
  145. inline auto throwing_call(Func&& func, Args&&... args)
  146. {
  147. using Result = decltype(call(func, std::forward<Args>(args)...));
  148. if constexpr (std::is_void_v<Result>) {
  149. call(func, std::forward<Args>(args)...);
  150. return FakeVoid{};
  151. }
  152. else if constexpr (std::is_same_v<Result, Status>) {
  153. auto res = (call(func, std::forward<Args>(args)...));
  154. if (!res.is_ok()) {
  155. throw Exception(std::move(res));
  156. }
  157. return FakeVoid{};
  158. }
  159. else if constexpr (is_status_with<Result>) {
  160. auto res = (call(func, std::forward<Args>(args)...));
  161. if (!res.is_ok()) {
  162. throw Exception(std::move(res.get_status()));
  163. }
  164. return std::move(res.get_value());
  165. }
  166. else {
  167. return call(func, std::forward<Args>(args)...);
  168. }
  169. }
  170. template <typename Func, typename... Args>
  171. using RawNormalizedCallResult = decltype(throwing_call(std::declval<Func>(), std::declval<Args>()...));
  172. template <typename Func, typename... Args>
  173. using NormalizedCallResult = std::conditional_t<std::is_same<RawNormalizedCallResult<Func, Args...>, FakeVoid>::value,
  174. void, RawNormalizedCallResult<Func, Args...>>;
  175. template <typename T>
  176. struct FutureContinuationResultImpl {
  177. using type = T;
  178. };
  179. template <typename T>
  180. struct FutureContinuationResultImpl<Future<T>> {
  181. using type = T;
  182. };
  183. template <typename T>
  184. struct FutureContinuationResultImpl<StatusWith<T>> {
  185. using type = T;
  186. };
  187. template <>
  188. struct FutureContinuationResultImpl<Status> {
  189. using type = void;
  190. };
  191. class FutureRefCountable {
  192. FutureRefCountable(const FutureRefCountable&) = delete;
  193. FutureRefCountable& operator=(const FutureRefCountable&) = delete;
  194. public:
  195. void thread_unsafe_inc_refs_to(uint32_t count) const
  196. {
  197. REALM_ASSERT_DEBUG(m_refs.load(std::memory_order_relaxed) == (count - 1));
  198. m_refs.store(count, std::memory_order_relaxed);
  199. }
  200. protected:
  201. FutureRefCountable() = default;
  202. virtual ~FutureRefCountable() = default;
  203. template <typename>
  204. friend class ::realm::util::bind_ptr;
  205. void bind_ptr() const noexcept
  206. {
  207. // Assert that we haven't rolled over the counter here.
  208. auto ref_count = m_refs.fetch_add(1, std::memory_order_relaxed) + 1;
  209. REALM_ASSERT_DEBUG(ref_count != 0);
  210. }
  211. void unbind_ptr() const noexcept
  212. {
  213. if (m_refs.fetch_sub(1, std::memory_order_acq_rel) == 1) {
  214. delete this;
  215. }
  216. }
  217. private:
  218. mutable std::atomic<uint32_t> m_refs{0};
  219. };
  220. template <typename T, typename... Args, typename = std::enable_if_t<std::is_base_of_v<FutureRefCountable, T>>>
  221. util::bind_ptr<T> make_intrusive(Args&&... args)
  222. {
  223. auto ptr = new T(std::forward<Args>(args)...);
  224. ptr->thread_unsafe_inc_refs_to(1);
  225. return util::bind_ptr<T>(ptr, util::bind_ptr_base::adopt_tag{});
  226. }
  227. template <typename T>
  228. struct SharedStateImpl;
  229. template <typename T>
  230. using SharedState = SharedStateImpl<VoidToFakeVoid<T>>;
  231. /**
  232. * SSB is SharedStateBase, and this is its current state.
  233. */
  234. enum class SSBState : uint8_t {
  235. Init,
  236. Waiting,
  237. Finished, // This should stay last since we have code like assert(state < Finished).
  238. };
  239. struct SharedStateBase : public FutureRefCountable {
  240. SharedStateBase(const SharedStateBase&) = delete;
  241. SharedStateBase(SharedStateBase&&) = delete;
  242. SharedStateBase& operator=(const SharedStateBase&) = delete;
  243. SharedStateBase& operator=(SharedStateBase&&) = delete;
  244. virtual ~SharedStateBase() = default;
  245. // This is called by the future side.
  246. void wait() noexcept
  247. {
  248. if (m_state.load(std::memory_order_acquire) == SSBState::Finished) {
  249. return;
  250. }
  251. m_cv.emplace();
  252. auto old_state = SSBState::Init;
  253. if (REALM_UNLIKELY(
  254. !m_state.compare_exchange_strong(old_state, SSBState::Waiting, std::memory_order_acq_rel))) {
  255. REALM_ASSERT_DEBUG(old_state == SSBState::Finished);
  256. return;
  257. }
  258. std::unique_lock<std::mutex> lk(m_mutex);
  259. m_cv->wait(lk, [&] {
  260. return m_state.load(std::memory_order_acquire) == SSBState::Finished;
  261. });
  262. }
  263. void transition_to_finished() noexcept
  264. {
  265. auto old_state = m_state.exchange(SSBState::Finished, std::memory_order_acq_rel);
  266. if (old_state == SSBState::Init) {
  267. return;
  268. }
  269. REALM_ASSERT_DEBUG(old_state == SSBState::Waiting);
  270. #ifdef REALM_DEBUG
  271. // If you hit this limit one of two things has probably happened
  272. //
  273. // 1. The justForContinuation optimization isn't working.
  274. // 2. You may be creating a variable length chain.
  275. constexpr size_t kMaxDepth = 32;
  276. size_t depth = 0;
  277. for (auto ssb = m_continuation.get(); ssb;
  278. ssb = ssb->m_state.load(std::memory_order_acquire) == SSBState::Waiting ? ssb->m_continuation.get()
  279. : nullptr) {
  280. depth++;
  281. REALM_ASSERT(depth < kMaxDepth);
  282. }
  283. #endif
  284. if (m_callback) {
  285. m_callback(this);
  286. }
  287. if (m_cv) {
  288. std::lock_guard<std::mutex> lk(m_mutex);
  289. m_cv->notify_all();
  290. }
  291. }
  292. void set_status(Status status) noexcept
  293. {
  294. REALM_ASSERT_DEBUG(m_state.load() < SSBState::Finished);
  295. m_status = std::move(status);
  296. transition_to_finished();
  297. }
  298. // All the rest of the methods are only called by the promise side.
  299. //
  300. // Concurrency Rules for members: Each non-atomic member is initially owned by either the
  301. // Promise side or the Future side, indicated by a P/F comment. The general rule is that members
  302. // representing the propagating data are owned by Promise, while members representing what
  303. // to do with the data are owned by Future. The owner may freely modify the members it owns
  304. // until it releases them by doing a release-store to state of Finished from Promise or
  305. // Waiting from Future. Promise can acquire access to all members by doing an acquire-load of
  306. // state and seeing Waiting (or Future with Finished). Transitions should be done via
  307. // acquire-release exchanges to combine both actions.
  308. //
  309. // Future::propagateResults uses an alternative mechanism to transfer ownership of the
  310. // continuation member. The logical Future-side does a release-store of true to
  311. // isJustForContinuation, and the Promise-side can do an acquire-load seeing true to get access.
  312. //
  313. std::atomic<SSBState> m_state{SSBState::Init};
  314. // This is used to prevent infinite chains of SharedStates that just propagate results.
  315. std::atomic<bool> m_is_just_for_continuation{false};
  316. // This is likely to be a different derived type from this, since it is the logical output of
  317. // callback.
  318. util::bind_ptr<SharedStateBase> m_continuation; // F
  319. util::UniqueFunction<void(SharedStateBase*)> m_callback; // F
  320. std::mutex m_mutex; // F
  321. util::Optional<std::condition_variable> m_cv; // F
  322. Status m_status = Status::OK(); // P
  323. protected:
  324. SharedStateBase() = default;
  325. };
  326. template <typename T>
  327. struct SharedStateImpl final : public SharedStateBase {
  328. static_assert(!std::is_void_v<T>);
  329. void fill_from(SharedState<T>&& other)
  330. {
  331. REALM_ASSERT_DEBUG(m_state.load() < SSBState::Finished);
  332. REALM_ASSERT_DEBUG(other.m_state.load() == SSBState::Finished);
  333. REALM_ASSERT_DEBUG(m_owned_by_promise.load());
  334. if (other.m_status.is_ok()) {
  335. m_data = std::move(other.m_data);
  336. }
  337. else {
  338. m_status = std::move(other.m_status);
  339. }
  340. transition_to_finished();
  341. }
  342. template <typename... Args>
  343. void emplace_value(Args&&... args) noexcept
  344. {
  345. REALM_ASSERT_DEBUG(m_state.load() < SSBState::Finished);
  346. REALM_ASSERT_DEBUG(m_owned_by_promise.load());
  347. try {
  348. m_data.emplace(std::forward<Args>(args)...);
  349. }
  350. catch (...) {
  351. m_status = exception_to_status();
  352. }
  353. transition_to_finished();
  354. }
  355. void set_from(StatusOrStatusWith<T> roe)
  356. {
  357. if (roe.is_ok()) {
  358. emplace_value(std::move(roe.get_value()));
  359. }
  360. else {
  361. set_status(roe.get_status());
  362. }
  363. }
  364. void disown() const
  365. {
  366. REALM_ASSERT(m_owned_by_promise.exchange(false));
  367. }
  368. void claim() const
  369. {
  370. REALM_ASSERT(!m_owned_by_promise.exchange(true));
  371. }
  372. mutable std::atomic<bool> m_owned_by_promise{true};
  373. util::Optional<T> m_data; // P
  374. };
  375. } // namespace future_details
  376. // These are in the future_details namespace to get access to its contents, but they are part of the
  377. // public API.
  378. using future_details::CopyablePromiseHolder;
  379. using future_details::Future;
  380. using future_details::Promise;
  381. /**
  382. * This class represents the producer side of a Future.
  383. *
  384. * This is a single-shot class. You may only extract the Future once, and you may either set a value
  385. * or error at most once. Extracting the future and setting the value/error can be done in either
  386. * order.
  387. *
  388. * If the Future has been extracted, but no value or error has been set at the time this Promise is
  389. * destroyed, a error will be set with ErrorCode::BrokenPromise. This should generally be considered
  390. * a programmer error, and should not be relied upon. We may make it debug-fatal in the future.
  391. *
  392. * Only one thread can use a given Promise at a time. It is legal to have different threads setting
  393. * the value/error and extracting the Future, but it is the user's responsibility to ensure that
  394. * those calls are strictly synchronized. This is usually easiest to achieve by calling
  395. * make_promise_future<T>() then passing a SharedPromise to the completing threads.
  396. *
  397. * If the result is ready when producing the Future, it is more efficient to use
  398. * make_ready_future_with() or Future<T>::make_ready() than to use a Promise<T>.
  399. */
  400. template <typename T>
  401. class future_details::Promise {
  402. public:
  403. using value_type = T;
  404. Promise() = default;
  405. ~Promise()
  406. {
  407. if (REALM_UNLIKELY(m_shared_state)) {
  408. m_shared_state->set_status({ErrorCodes::BrokenPromise, "Broken Promise"});
  409. }
  410. }
  411. // If we want to enable move-assignability, we need to handle breaking the promise on the old
  412. // value of this.
  413. Promise& operator=(Promise&&) = delete;
  414. // The default move construction is fine.
  415. Promise(Promise&&) noexcept = default;
  416. /**
  417. * Sets the value into this Promise when the passed-in Future completes, which may have already
  418. * happened. If it hasn't, it is still safe to destroy this Promise since it is no longer
  419. * involved.
  420. */
  421. void set_from(Future<T>&& future) noexcept;
  422. void set_from_status_with(StatusWith<T> sw) noexcept
  423. {
  424. set_impl([&] {
  425. m_shared_state->set_from(std::move(sw));
  426. });
  427. }
  428. template <typename... Args>
  429. void emplace_value(Args&&... args) noexcept
  430. {
  431. set_impl([&] {
  432. m_shared_state->emplace_value(std::forward<Args>(args)...);
  433. });
  434. }
  435. void set_error(Status status) noexcept
  436. {
  437. REALM_ASSERT_DEBUG(!status.is_ok());
  438. set_impl([&] {
  439. m_shared_state->set_status(std::move(status));
  440. });
  441. }
  442. static auto make_promise_future_impl()
  443. {
  444. struct PromiseAndFuture {
  445. Promise<T> promise;
  446. Future<T> future = promise.get_future();
  447. };
  448. return PromiseAndFuture();
  449. }
  450. private:
  451. // This is not public because we found it frequently was involved in races. The
  452. // `make_promise_future<T>` API avoids those races entirely.
  453. Future<T> get_future() noexcept;
  454. friend class Future<void>;
  455. friend class CopyablePromiseHolder<T>;
  456. Promise(util::bind_ptr<SharedState<T>> shared_state)
  457. : m_shared_state(std::move(shared_state))
  458. {
  459. m_shared_state->claim();
  460. }
  461. util::bind_ptr<SharedState<T>> release() &&
  462. {
  463. auto ret = std::move(m_shared_state);
  464. ret->disown();
  465. return ret;
  466. }
  467. template <typename Func>
  468. void set_impl(Func&& do_set) noexcept
  469. {
  470. REALM_ASSERT(m_shared_state);
  471. do_set();
  472. m_shared_state.reset();
  473. }
  474. util::bind_ptr<SharedState<T>> m_shared_state = make_intrusive<SharedState<T>>();
  475. };
  476. /**
  477. * CopyablePromiseHolder<T> is a lightweight copyable holder for Promises so they can be captured inside
  478. * of std::function's and other types that require all members to be copyable.
  479. *
  480. * The only thing you can do with a CopyablePromiseHolder is extract a regular promise from it exactly once,
  481. * and copy/move it as you would a util::bind_ptr.
  482. *
  483. * Do not use this type to try to fill a Promise from multiple places or threads.
  484. */
  485. template <typename T>
  486. class future_details::CopyablePromiseHolder {
  487. public:
  488. CopyablePromiseHolder(Promise<T>&& input)
  489. : m_shared_state(std::move(input).release())
  490. {
  491. }
  492. CopyablePromiseHolder(const Promise<T>&) = delete;
  493. Promise<T> get_promise()
  494. {
  495. REALM_ASSERT(m_shared_state);
  496. return Promise<T>(std::move(m_shared_state));
  497. }
  498. private:
  499. util::bind_ptr<SharedState<T>> m_shared_state;
  500. };
  501. /**
  502. * Future<T> is logically a possibly-deferred T or exception_ptr
  503. * As is usual for rvalue-qualified methods, you may call at most one of them on a given Future.
  504. *
  505. * A future may be passed between threads, but only one thread may use it at a time.
  506. */
  507. template <typename T>
  508. class REALM_NODISCARD future_details::Future {
  509. public:
  510. static_assert(!is_future<T>, "Future<Future<T>> is banned. Just use Future<T> instead.");
  511. static_assert(!std::is_reference<T>::value, "Future<T&> is banned.");
  512. static_assert(!std::is_const<T>::value, "Future<const T> is banned.");
  513. static_assert(!std::is_array<T>::value, "Future<T[]> is banned.");
  514. using value_type = T;
  515. /**
  516. * Constructs a Future in a moved-from state that can only be assigned to or destroyed.
  517. */
  518. Future() = default;
  519. Future& operator=(Future&&) = default;
  520. Future(Future&&) = default;
  521. Future(const Future&) = delete;
  522. Future& operator=(const Future&) = delete;
  523. /* implicit */ Future(T val)
  524. : Future(make_ready(std::move(val)))
  525. {
  526. }
  527. /* implicit */ Future(Status status)
  528. : Future(make_ready(std::move(status)))
  529. {
  530. }
  531. /* implicit */ Future(StatusWith<T> sw)
  532. : Future(make_ready(std::move(sw)))
  533. {
  534. }
  535. /**
  536. * Make a ready Future<T> from a value for cases where you don't need to wait asynchronously.
  537. *
  538. * Calling this is faster than getting a Future out of a Promise, and is effectively free. It is
  539. * fast enough that you never need to avoid returning a Future from an API, even if the result
  540. * is ready 99.99% of the time.
  541. *
  542. * As an example, if you are handing out results from a batch, you can use this when for each
  543. * result while you have a batch, then use a Promise to return a not-ready Future when you need
  544. * to get another batch.
  545. */
  546. static Future<T> make_ready(T val)
  547. { // TODO emplace?
  548. Future out;
  549. out.m_immediate = std::move(val);
  550. return out;
  551. }
  552. static Future<T> make_ready(Status status)
  553. {
  554. auto out = Future<T>(make_intrusive<SharedState<T>>());
  555. out.m_shared->set_status(std::move(status));
  556. return out;
  557. }
  558. static Future<T> make_ready(StatusWith<T> val)
  559. {
  560. if (val.is_ok()) {
  561. return make_ready(std::move(val.get_value()));
  562. }
  563. return make_ready(val.get_status());
  564. }
  565. /**
  566. * If this returns true, get() is guaranteed not to block and callbacks will be immediately
  567. * invoked. You can't assume anything if this returns false since it may be completed
  568. * immediately after checking (unless you have independent knowledge that this Future can't
  569. * complete in the background).
  570. *
  571. * Callers must still call get() or similar, even on Future<void>, to ensure that they are
  572. * correctly sequenced with the completing task, and to be informed about whether the Promise
  573. * completed successfully.
  574. *
  575. * This is generally only useful as an optimization to avoid prep work, such as setting up
  576. * timeouts, that is unnecessary if the Future is ready already.
  577. */
  578. bool is_ready() const
  579. {
  580. // This can be a relaxed load because callers are not allowed to use it to establish
  581. // ordering.
  582. return m_immediate || m_shared->m_state.load(std::memory_order_relaxed) == SSBState::Finished;
  583. }
  584. /**
  585. * Gets the value out of this Future, blocking until it is ready.
  586. *
  587. * get() methods throw on error, while get_no_throw() returns a Statuswith<T> with either a value
  588. * or an error Status.
  589. *
  590. * These methods can be called multiple times, except for the rvalue overloads.
  591. */
  592. T get() &&
  593. {
  594. return std::move(get_impl());
  595. }
  596. T& get() &
  597. {
  598. return get_impl();
  599. }
  600. const T& get() const&
  601. {
  602. return const_cast<Future*>(this)->get_impl();
  603. }
  604. StatusWith<T> get_no_throw() const& noexcept
  605. {
  606. if (m_immediate) {
  607. return *m_immediate;
  608. }
  609. m_shared->wait();
  610. if (!m_shared->m_data) {
  611. return m_shared->m_status;
  612. }
  613. return *m_shared->m_data;
  614. }
  615. StatusWith<T> get_no_throw() && noexcept
  616. {
  617. if (m_immediate) {
  618. return std::move(*m_immediate);
  619. }
  620. m_shared->wait();
  621. if (!m_shared->m_data) {
  622. return m_shared->m_status;
  623. }
  624. return std::move(*m_shared->m_data);
  625. }
  626. /**
  627. * This ends the Future continuation chain by calling a callback on completion. Use this to
  628. * escape back into a callback-based API.
  629. *
  630. * The callback must not throw since it is called from a noexcept context. The callback must take a
  631. * StatusOrStatusWith as its argument and have a return type of void.
  632. */
  633. template <typename Func> // StatusOrStatusWith<T> -> void
  634. void get_async(Func&& func) && noexcept
  635. {
  636. static_assert(std::is_void_v<std::invoke_result_t<Func, StatusOrStatusWith<FakeVoidToVoid<T>>>>);
  637. return general_impl(
  638. // on ready success:
  639. [&](T&& val) {
  640. call(func, StatusWith<T>(std::move(val)));
  641. },
  642. // on ready failure:
  643. [&](Status&& status) {
  644. call(func, StatusWith<T>(std::move(status)));
  645. },
  646. // on not ready:
  647. [&] {
  648. m_shared->m_callback = [func = std::forward<Func>(func)](SharedStateBase* ssb) mutable noexcept {
  649. const auto input = static_cast<SharedState<T>*>(ssb);
  650. if (input->m_status.is_ok()) {
  651. call(func, StatusWith<T>(std::move(*input->m_data)));
  652. }
  653. else {
  654. call(func, StatusWith<T>(std::move(input->m_status)));
  655. }
  656. };
  657. });
  658. }
  659. //
  660. // The remaining methods are all continuation based and take a callback and return a Future.
  661. // Each method has a comment indicating the supported signatures for that callback, and a
  662. // description of when the callback is invoked and how the impacts the returned Future. It may
  663. // be helpful to think of Future continuation chains as a pipeline of stages that take input
  664. // from earlier stages and produce output for later stages.
  665. //
  666. // Be aware that the callback may be invoked inline at the call-site or at the producer when
  667. // setting the value. Therefore, you should avoid doing blocking work inside of a callback.
  668. // Additionally, avoid acquiring any locks or mutexes that the caller already holds, otherwise
  669. // you risk a deadlock. If either of these concerns apply to your callback, it should schedule
  670. // itself on an executor, rather than doing work in the callback.
  671. //
  672. // Error handling in callbacks: all exceptions thrown propagate to the returned Future
  673. // automatically.
  674. //
  675. // Callbacks that return Future<T> are automatically unwrapped and connected to the returned
  676. // Future<T>, rather than producing a Future<Future<T>>.
  677. /**
  678. * Callbacks passed to then() are only called if the input Future completes successfully.
  679. * Otherwise the error propagates automatically, bypassing the callback.
  680. */
  681. template <typename Func>
  682. auto then(Func&& func) && noexcept
  683. {
  684. using Result = NormalizedCallResult<Func, T>;
  685. if constexpr (!is_future<Result>) {
  686. return general_impl(
  687. // on ready success:
  688. [&](T&& val) {
  689. return Future<Result>::make_ready(no_throw_call(func, std::move(val)));
  690. },
  691. // on ready failure:
  692. [&](Status&& status) {
  693. return Future<Result>::make_ready(std::move(status));
  694. },
  695. // on not ready yet:
  696. [&] {
  697. return make_continuation<Result>(
  698. [func = std::forward<Func>(func)](SharedState<T>* input,
  699. SharedState<Result>* output) mutable noexcept {
  700. if (!input->m_status.is_ok()) {
  701. output->set_status(input->m_status);
  702. return;
  703. }
  704. output->set_from(no_throw_call(func, std::move(*input->m_data)));
  705. });
  706. });
  707. }
  708. else {
  709. using UnwrappedResult = typename Result::value_type;
  710. return general_impl(
  711. // on ready success:
  712. [&](T&& val) {
  713. try {
  714. return Future<UnwrappedResult>(throwing_call(func, std::move(val)));
  715. }
  716. catch (...) {
  717. return Future<UnwrappedResult>::make_ready(exception_to_status());
  718. }
  719. },
  720. // on ready failure:
  721. [&](Status&& status) {
  722. return Future<UnwrappedResult>::make_ready(std::move(status));
  723. },
  724. // on not ready yet:
  725. [&] {
  726. return make_continuation<UnwrappedResult>(
  727. [func = std::forward<Func>(func)](SharedState<T>* input,
  728. SharedState<UnwrappedResult>* output) mutable noexcept {
  729. if (!input->m_status.is_ok())
  730. return output->set_status(std::move(input->m_status));
  731. try {
  732. throwing_call(func, std::move(*input->m_data)).propagate_result_to(output);
  733. }
  734. catch (...) {
  735. output->set_status(exception_to_status());
  736. }
  737. });
  738. });
  739. }
  740. }
  741. /*
  742. * Callbacks passed to on_completion() are always called with a StatusWith<T> when the input future completes.
  743. */
  744. template <typename Func>
  745. auto on_completion(Func&& func) && noexcept
  746. {
  747. using Wrapper = StatusOrStatusWith<T>;
  748. using Result = NormalizedCallResult<Func, StatusOrStatusWith<T>>;
  749. if constexpr (!is_future<Result>) {
  750. return general_impl(
  751. // on ready success:
  752. [&](T&& val) {
  753. return Future<Result>::make_ready(
  754. no_throw_call(std::forward<Func>(func), Wrapper(std::move(val))));
  755. },
  756. // on ready failure:
  757. [&](Status&& status) {
  758. return Future<Result>::make_ready(
  759. no_throw_call(std::forward<Func>(func), Wrapper(std::move(status))));
  760. },
  761. // on not ready yet:
  762. [&] {
  763. return make_continuation<Result>(
  764. [func = std::forward<Func>(func)](SharedState<T>* input,
  765. SharedState<Result>* output) mutable noexcept {
  766. if (!input->m_status.is_ok())
  767. return output->set_from(no_throw_call(func, Wrapper(std::move(input->m_status))));
  768. output->set_from(no_throw_call(func, Wrapper(std::move(*input->m_data))));
  769. });
  770. });
  771. }
  772. else {
  773. using UnwrappedResult = typename Result::value_type;
  774. return general_impl(
  775. // on ready success:
  776. [&](T&& val) {
  777. try {
  778. return Future<UnwrappedResult>(
  779. throwing_call(std::forward<Func>(func), Wrapper(std::move(val))));
  780. }
  781. catch (...) {
  782. return Future<UnwrappedResult>::make_ready(exception_to_status());
  783. }
  784. },
  785. // on ready failure:
  786. [&](Status&& status) {
  787. try {
  788. return Future<UnwrappedResult>(
  789. throwing_call(std::forward<Func>(func), Wrapper(std::move(status))));
  790. }
  791. catch (...) {
  792. return Future<UnwrappedResult>::make_ready(exception_to_status());
  793. }
  794. },
  795. // on not ready yet:
  796. [&] {
  797. return make_continuation<UnwrappedResult>(
  798. [func = std::forward<Func>(func)](SharedState<T>* input,
  799. SharedState<UnwrappedResult>* output) mutable noexcept {
  800. if (!input->m_status.is_ok()) {
  801. try {
  802. throwing_call(func, Wrapper(std::move(input->m_status)))
  803. .propagate_result_to(output);
  804. }
  805. catch (...) {
  806. output->set_status(exception_to_status());
  807. }
  808. return;
  809. }
  810. try {
  811. throwing_call(func, Wrapper(std::move(*input->m_data))).propagate_result_to(output);
  812. }
  813. catch (...) {
  814. output->set_status(exception_to_status());
  815. }
  816. });
  817. });
  818. }
  819. }
  820. /**
  821. * Callbacks passed to on_error() are only called if the input Future completes with an error.
  822. * Otherwise, the successful result propagates automatically, bypassing the callback.
  823. *
  824. * The callback can either produce a replacement value (which must be a T), return a replacement
  825. * Future<T> (such as a by retrying), or return/throw a replacement error.
  826. *
  827. * Note that this will only catch errors produced by earlier stages; it is not registering a
  828. * general error handler for the entire chain.
  829. */
  830. template <typename Func>
  831. Future<FakeVoidToVoid<T>> on_error(Func&& func) && noexcept
  832. {
  833. using Result = NormalizedCallResult<Func, Status>;
  834. static_assert(std::is_same<VoidToFakeVoid<UnwrappedType<Result>>, T>::value,
  835. "func passed to Future<T>::onError must return T, StatusWith<T>, or Future<T>");
  836. if constexpr (!is_future<Result>) {
  837. return general_impl(
  838. // on ready success:
  839. [&](T&& val) {
  840. return Future<T>::make_ready(std::move(val));
  841. },
  842. // on ready failure:
  843. [&](Status&& status) {
  844. return Future<T>::make_ready(no_throw_call(func, std::move(status)));
  845. },
  846. // on not ready yet:
  847. [&] {
  848. return make_continuation<T>([func = std::forward<Func>(func)](
  849. SharedState<T>* input, SharedState<T>* output) mutable noexcept {
  850. if (input->m_status.is_ok())
  851. return output->emplace_value(std::move(*input->m_data));
  852. output->set_from(no_throw_call(func, std::move(input->m_status)));
  853. });
  854. });
  855. }
  856. else {
  857. return general_impl(
  858. // on ready success:
  859. [&](T&& val) {
  860. return Future<T>::make_ready(std::move(val));
  861. },
  862. // on ready failure:
  863. [&](Status&& status) {
  864. try {
  865. return Future<T>(throwing_call(func, std::move(status)));
  866. }
  867. catch (...) {
  868. return Future<T>::make_ready(exception_to_status());
  869. }
  870. },
  871. // on not ready yet:
  872. [&] {
  873. return make_continuation<T>([func = std::forward<Func>(func)](
  874. SharedState<T>* input, SharedState<T>* output) mutable noexcept {
  875. if (input->m_status.is_ok())
  876. return output->emplace_value(std::move(*input->m_data));
  877. try {
  878. throwing_call(func, std::move(input->m_status)).propagate_result_to(output);
  879. }
  880. catch (...) {
  881. output->set_status(exception_to_status());
  882. }
  883. });
  884. });
  885. }
  886. }
  887. Future<void> ignore_value() && noexcept;
  888. private:
  889. template <typename T2>
  890. friend class Future;
  891. friend class Promise<T>;
  892. T& get_impl()
  893. {
  894. if (m_immediate) {
  895. return *m_immediate;
  896. }
  897. m_shared->wait();
  898. if (!m_shared->m_status.is_ok()) {
  899. throw Exception(m_shared->m_status);
  900. }
  901. return *m_shared->m_data;
  902. }
  903. // All callbacks are called immediately so they are allowed to capture everything by reference.
  904. // All callbacks should return the same return type.
  905. template <typename SuccessFunc, typename FailFunc, typename NotReady>
  906. auto general_impl(SuccessFunc&& success, FailFunc&& fail, NotReady&& notReady) noexcept
  907. {
  908. if (m_immediate) {
  909. return success(std::move(*m_immediate));
  910. }
  911. if (m_shared->m_state.load(std::memory_order_acquire) == SSBState::Finished) {
  912. if (m_shared->m_data) {
  913. return success(std::move(*m_shared->m_data));
  914. }
  915. else {
  916. return fail(std::move(m_shared->m_status));
  917. }
  918. }
  919. // This is always done after notReady, which never throws. It is in a ScopeExit to
  920. // support both void- and value-returning notReady implementations since we can't assign
  921. // void to a variable.
  922. auto guard = util::make_scope_exit([&]() noexcept {
  923. auto old_state = SSBState::Init;
  924. if (REALM_UNLIKELY(!m_shared->m_state.compare_exchange_strong(old_state, SSBState::Waiting,
  925. std::memory_order_acq_rel))) {
  926. REALM_ASSERT_DEBUG(old_state == SSBState::Finished);
  927. m_shared->m_callback(m_shared.get());
  928. }
  929. });
  930. return notReady();
  931. }
  932. template <typename Result, typename OnReady>
  933. inline Future<Result> make_continuation(OnReady&& on_ready)
  934. {
  935. REALM_ASSERT(!m_shared->m_callback && !m_shared->m_continuation);
  936. auto continuation = make_intrusive<SharedState<Result>>();
  937. continuation->thread_unsafe_inc_refs_to(2);
  938. m_shared->m_continuation.reset(continuation.get(), util::bind_ptr_base::adopt_tag{});
  939. m_shared->m_callback = [on_ready = std::forward<OnReady>(on_ready)](SharedStateBase* ssb) mutable noexcept {
  940. const auto input = static_cast<SharedState<T>*>(ssb);
  941. const auto output = static_cast<SharedState<Result>*>(ssb->m_continuation.get());
  942. on_ready(input, output);
  943. };
  944. return Future<VoidToFakeVoid<Result>>(std::move(continuation));
  945. }
  946. void propagate_result_to(SharedState<T>* output) && noexcept
  947. {
  948. general_impl(
  949. // on ready success:
  950. [&](T&& val) {
  951. output->emplace_value(std::move(val));
  952. },
  953. // on ready failure:
  954. [&](Status&& status) {
  955. output->set_status(std::move(status));
  956. },
  957. // on not ready yet:
  958. [&] {
  959. // If the output is just for continuation, bypass it and just directly fill in the
  960. // SharedState that it would write to. The concurrency situation is a bit subtle
  961. // here since we are the Future-side of shared, but the Promise-side of output.
  962. // The rule is that p->isJustForContinuation must be acquire-read as true before
  963. // examining p->continuation, and p->continuation must be written before doing the
  964. // release-store of true to p->isJustForContinuation.
  965. if (output->m_is_just_for_continuation.load(std::memory_order_acquire)) {
  966. m_shared->m_continuation = std::move(output->m_continuation);
  967. }
  968. else {
  969. m_shared->m_continuation = util::bind_ptr(output);
  970. }
  971. m_shared->m_is_just_for_continuation.store(true, std::memory_order_release);
  972. m_shared->m_callback = [](SharedStateBase* ssb) noexcept {
  973. const auto input = static_cast<SharedState<T>*>(ssb);
  974. const auto output = static_cast<SharedState<T>*>(ssb->m_continuation.get());
  975. output->fill_from(std::move(*input));
  976. };
  977. });
  978. }
  979. explicit Future(util::bind_ptr<SharedState<T>> ptr)
  980. : m_shared(std::move(ptr))
  981. {
  982. }
  983. // At most one of these will be active.
  984. util::Optional<T> m_immediate;
  985. util::bind_ptr<SharedState<T>> m_shared;
  986. };
  987. /**
  988. * The void specialization of Future<T>. See the general Future<T> for detailed documentation.
  989. * It should be the same as the generic Future<T> with the following exceptions:
  990. * - Anything mentioning StatusWith<T> will use Status instead.
  991. * - Anything returning references to T will just return void since there are no void references.
  992. * - Anything taking a T argument will receive no arguments.
  993. */
  994. template <>
  995. class REALM_NODISCARD future_details::Future<void> {
  996. public:
  997. using value_type = void;
  998. /* implicit */ Future()
  999. : Future(make_ready())
  1000. {
  1001. }
  1002. /* implicit */ Future(Status status)
  1003. : Future(make_ready(std::move(status)))
  1004. {
  1005. }
  1006. static Future<void> make_ready()
  1007. {
  1008. return Future<FakeVoid>::make_ready(FakeVoid{});
  1009. }
  1010. static Future<void> make_ready(Status status)
  1011. {
  1012. if (status.is_ok())
  1013. return make_ready();
  1014. return Future<FakeVoid>::make_ready(std::move(status));
  1015. }
  1016. bool is_ready() const
  1017. {
  1018. return inner.is_ready();
  1019. }
  1020. void get() const
  1021. {
  1022. inner.get();
  1023. }
  1024. Status get_no_throw() const noexcept
  1025. {
  1026. return inner.get_no_throw().get_status();
  1027. }
  1028. template <typename Func> // Status -> void
  1029. void get_async(Func&& func) && noexcept
  1030. {
  1031. return std::move(inner).get_async(std::forward<Func>(func));
  1032. }
  1033. template <typename Func> // () -> T or StatusWith<T> or Future<T>
  1034. auto then(Func&& func) && noexcept
  1035. {
  1036. return std::move(inner).then(std::forward<Func>(func));
  1037. }
  1038. template <typename Func>
  1039. auto on_error(Func&& func) && noexcept
  1040. {
  1041. return std::move(inner).on_error(std::forward<Func>(func));
  1042. }
  1043. template <typename Func>
  1044. auto on_completion(Func&& func) && noexcept
  1045. {
  1046. return std::move(inner).on_completion(std::forward<Func>(func));
  1047. }
  1048. Future<void> ignore_value() && noexcept
  1049. {
  1050. return std::move(*this);
  1051. }
  1052. private:
  1053. template <typename T>
  1054. friend class Future;
  1055. friend class Promise<void>;
  1056. explicit Future(util::bind_ptr<SharedState<FakeVoid>> ptr)
  1057. : inner(std::move(ptr))
  1058. {
  1059. }
  1060. /*implicit*/ Future(Future<FakeVoid>&& inner)
  1061. : inner(std::move(inner))
  1062. {
  1063. }
  1064. /*implicit*/ operator Future<FakeVoid>() &&
  1065. {
  1066. return std::move(inner);
  1067. }
  1068. void propagate_result_to(SharedState<void>* output) && noexcept
  1069. {
  1070. std::move(inner).propagate_result_to(output);
  1071. }
  1072. static Future<void> make_ready(StatusWith<FakeVoid> status)
  1073. {
  1074. return Future<FakeVoid>::make_ready(std::move(status));
  1075. }
  1076. Future<FakeVoid> inner;
  1077. };
  1078. /**
  1079. * Returns a bound Promise and Future in a struct with friendly names (promise and future) that also
  1080. * works well with C++17 structured bindings.
  1081. */
  1082. template <typename T>
  1083. inline auto make_promise_future()
  1084. {
  1085. return Promise<T>::make_promise_future_impl();
  1086. }
  1087. /**
  1088. * This metafunction allows APIs that take callbacks and return Future to avoid doing their own type
  1089. * calculus. This results in the base value_type that would result from passing Func to a
  1090. * Future<T>::then(), with the same normalizing of T/StatusWith<T>/Future<T> returns. This is
  1091. * primarily useful for implementations of executors rather than their users.
  1092. *
  1093. * This returns the unwrapped T rather than Future<T> so it will be easy to create a Promise<T>.
  1094. *
  1095. * Examples:
  1096. *
  1097. * FutureContinuationResult<std::function<void()>> == void
  1098. * FutureContinuationResult<std::function<Status()>> == void
  1099. * FutureContinuationResult<std::function<Future<void>()>> == void
  1100. *
  1101. * FutureContinuationResult<std::function<int()>> == int
  1102. * FutureContinuationResult<std::function<StatusWith<int>()>> == int
  1103. * FutureContinuationResult<std::function<Future<int>()>> == int
  1104. *
  1105. * FutureContinuationResult<std::function<int(bool)>, bool> == int
  1106. *
  1107. * FutureContinuationResult<std::function<int(bool)>, NotBool> SFINAE-safe substitution failure.
  1108. */
  1109. template <typename Func, typename... Args>
  1110. using FutureContinuationResult =
  1111. typename future_details::FutureContinuationResultImpl<std::invoke_result_t<Func, Args...>>::type;
  1112. //
  1113. // Implementations of methods that couldn't be defined in the class due to ordering requirements.
  1114. //
  1115. template <typename T>
  1116. inline Future<T> Promise<T>::get_future() noexcept
  1117. {
  1118. m_shared_state->thread_unsafe_inc_refs_to(2);
  1119. return Future<T>(util::bind_ptr<SharedState<T>>(m_shared_state.get(), util::bind_ptr_base::adopt_tag{}));
  1120. }
  1121. template <typename T>
  1122. inline void Promise<T>::set_from(Future<T>&& future) noexcept
  1123. {
  1124. set_impl([&] {
  1125. std::move(future).propagate_result_to(m_shared_state.get());
  1126. });
  1127. }
  1128. template <typename T>
  1129. inline Future<void> Future<T>::ignore_value() && noexcept
  1130. {
  1131. return std::move(*this).then([](auto&&) {});
  1132. }
  1133. } // namespace realm::util