subscriptions.hpp 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. /*************************************************************************
  2. *
  3. * Copyright 2021 Realm, Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. **************************************************************************/
  18. #pragma once
  19. #include "realm/db.hpp"
  20. #include "realm/obj.hpp"
  21. #include "realm/query.hpp"
  22. #include "realm/timestamp.hpp"
  23. #include "realm/util/future.hpp"
  24. #include "realm/util/functional.hpp"
  25. #include "realm/util/optional.hpp"
  26. #include "realm/util/tagged_bool.hpp"
  27. #include <list>
  28. #include <set>
  29. #include <string_view>
  30. namespace realm::sync {
  31. class MutableSubscriptionSet;
  32. class SubscriptionSet;
  33. class SubscriptionStore;
  34. // A Subscription represents a single query that may be OR'd with other queries on the same object class to be
  35. // send to the server in a QUERY or IDENT message.
  36. class Subscription {
  37. public:
  38. // The unique ID for this subscription.
  39. ObjectId id;
  40. // The timestamp of when this subscription was originally created.
  41. Timestamp created_at;
  42. // The timestamp of the last time this subscription was updated by calling update_query.
  43. Timestamp updated_at;
  44. // The name of the subscription that was set when it was created, or util::none if it was created without a name.
  45. util::Optional<std::string> name;
  46. // The name of the object class of the query for this subscription.
  47. std::string object_class_name;
  48. // A stringified version of the query associated with this subscription.
  49. std::string query_string;
  50. // Returns whether the 2 subscriptions passed have the same id.
  51. friend bool operator==(const Subscription& lhs, const Subscription& rhs)
  52. {
  53. return lhs.id == rhs.id;
  54. }
  55. Subscription() = default;
  56. Subscription(const SubscriptionStore* parent, Obj obj);
  57. Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str);
  58. };
  59. // SubscriptionSets contain a set of unique queries by either name or Query object that will be constructed into a
  60. // single QUERY or IDENT message to be sent to the server.
  61. class SubscriptionSet {
  62. public:
  63. /*
  64. * State diagram:
  65. *
  66. * ┌───────────┬─────────►Error──────────────────────────┐
  67. * │ │ │
  68. * │ │ ▼
  69. * Uncommitted──►Pending──►Bootstrapping──►AwaitingMark──►Complete───►Superseded
  70. * │ ▲
  71. * │ │
  72. * └────────────────────────────┘
  73. *
  74. */
  75. enum class State {
  76. // This subscription set has not been persisted and has not been sent to the server. This state is only valid
  77. // for MutableSubscriptionSets
  78. Uncommitted = 0,
  79. // The subscription set has been persisted locally but has not been acknowledged by the server yet.
  80. Pending,
  81. // The server is currently sending the initial state that represents this subscription set to the client.
  82. Bootstrapping,
  83. // This subscription set is the active subscription set that is currently being synchronized with the server.
  84. Complete,
  85. // An error occurred while processing this subscription set on the server. Check error_str() for details.
  86. Error,
  87. // The server responded to a later subscription set to this one and this one has been trimmed from the
  88. // local storage of subscription sets.
  89. Superseded,
  90. // The last bootstrap message containing the initial state for this subscription set has been received. The
  91. // client is awaiting a mark message to mark this subscription as fully caught up to history.
  92. AwaitingMark,
  93. };
  94. static constexpr int64_t EmptyVersion = int64_t(-1);
  95. // Used in tests.
  96. inline friend std::ostream& operator<<(std::ostream& o, State state)
  97. {
  98. switch (state) {
  99. case State::Uncommitted:
  100. o << "Uncommitted";
  101. break;
  102. case State::Pending:
  103. o << "Pending";
  104. break;
  105. case State::Bootstrapping:
  106. o << "Bootstrapping";
  107. break;
  108. case State::AwaitingMark:
  109. o << "AwaitingMark";
  110. break;
  111. case State::Complete:
  112. o << "Complete";
  113. break;
  114. case State::Error:
  115. o << "Error";
  116. break;
  117. case State::Superseded:
  118. o << "Superseded";
  119. break;
  120. }
  121. return o;
  122. }
  123. using const_iterator = std::vector<Subscription>::const_iterator;
  124. using iterator = const_iterator; // Note: no mutable access provided through iterators.
  125. // This will make a copy of this subscription set with the next available version number and return it as
  126. // a mutable SubscriptionSet to be updated. The new SubscriptionSet's state will be Uncommitted. This
  127. // subscription set will be unchanged.
  128. MutableSubscriptionSet make_mutable_copy() const;
  129. // Returns a future that will resolve either with an error status if this subscription set encounters an
  130. // error, or resolves when the subscription set reaches at least that state. It's possible for a subscription
  131. // set to skip a state (i.e. go from Pending to Complete or Pending to Superseded), and the future value
  132. // will the the state it actually reached.
  133. util::Future<State> get_state_change_notification(State notify_when) const;
  134. void get_state_change_notification(
  135. State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> callback) const;
  136. // The query version number used in the sync wire protocol to identify this subscription set to the server.
  137. int64_t version() const;
  138. // The database version that this subscription set was created at or -1 if Uncommitted.
  139. DB::version_type snapshot_version() const;
  140. // The current state of this subscription set
  141. State state() const;
  142. // The error string for this subscription set if any.
  143. StringData error_str() const;
  144. // Returns the number of subscriptions in the set.
  145. size_t size() const;
  146. // An iterator interface for finding/working with individual subscriptions.
  147. iterator begin() const;
  148. iterator end() const;
  149. const Subscription& at(size_t index) const;
  150. // Returns a pointer to the Subscription matching either the name or Query object, or nullptr if no such
  151. // subscription exists.
  152. const Subscription* find(StringData name) const;
  153. const Subscription* find(const Query& query) const;
  154. // Returns this query set as extended JSON in a form suitable for transmitting to the server.
  155. std::string to_ext_json() const;
  156. // Reloads the state of this SubscriptionSet so that it reflects the latest state from synchronizing with the
  157. // server. This will invalidate all iterators.
  158. void refresh();
  159. protected:
  160. friend class SubscriptionStore;
  161. struct SupersededTag {
  162. };
  163. using MakingMutableCopy = util::TaggedBool<class MakingMutableCopyTag>;
  164. explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag);
  165. explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj,
  166. MakingMutableCopy making_mutable_copy = MakingMutableCopy(false));
  167. void load_from_database(Obj obj);
  168. // Get a reference to the SubscriptionStore. It may briefly extend the lifetime of the store.
  169. std::shared_ptr<const SubscriptionStore> get_flx_subscription_store() const;
  170. std::weak_ptr<const SubscriptionStore> m_mgr;
  171. DB::version_type m_cur_version = 0;
  172. int64_t m_version = 0;
  173. State m_state = State::Uncommitted;
  174. std::string m_error_str;
  175. DB::version_type m_snapshot_version = -1;
  176. std::vector<Subscription> m_subs;
  177. };
  178. class MutableSubscriptionSet : public SubscriptionSet {
  179. public:
  180. // Erases all subscriptions in the subscription set.
  181. void clear();
  182. // Inserts a new subscription into the set if one does not exist already - returns an iterator to the
  183. // subscription and a bool that is true if a new subscription was actually created. The SubscriptionSet
  184. // must be in the Uncommitted state to call this - otherwise this will throw.
  185. //
  186. // The Query portion of the subscription is mutable, however the name portion is immutable after the
  187. // subscription is inserted.
  188. //
  189. // If insert is called twice for the same name, the Query portion and updated_at timestamp for that named
  190. // subscription will be updated to match the new Query.
  191. std::pair<iterator, bool> insert_or_assign(std::string_view name, const Query& query);
  192. // Inserts a new subscription into the set if one does not exist already - returns an iterator to the
  193. // subscription and a bool that is true if a new subscription was actually created. The SubscriptionSet
  194. // must be in the Uncommitted state to call this - otherwise this will throw.
  195. //
  196. // If insert is called twice for the same query, then the updated_at timestamp for that subscription will
  197. // be updated.
  198. //
  199. // The inserted subscription will have an empty name - to update this Subscription's query, the caller
  200. // will have
  201. std::pair<iterator, bool> insert_or_assign(const Query& query);
  202. void import(const SubscriptionSet&);
  203. // Erases a subscription pointed to by an iterator. Returns the "next" iterator in the set - to provide
  204. // STL compatibility. The SubscriptionSet must be in the Uncommitted state to call this - otherwise
  205. // this will throw.
  206. iterator erase(iterator it);
  207. // Erases the subscription identified by the argument, if any. Returns true if anything was removed.
  208. bool erase(StringData name);
  209. bool erase(const Query& query);
  210. // Updates the state of the transaction and optionally updates its error information.
  211. //
  212. // You may only set an error_str when the State is State::Error.
  213. //
  214. // If set to State::Complete, this will erase all subscription sets with a version less than this one's.
  215. //
  216. // This should be called internally within the sync client.
  217. void update_state(State state, util::Optional<std::string_view> error_str = util::none);
  218. // This commits any changes to the subscription set and returns an this subscription set as an immutable view
  219. // from after the commit. This MutableSubscriptionSet object must not be used after calling commit().
  220. SubscriptionSet commit();
  221. protected:
  222. friend class SubscriptionStore;
  223. MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
  224. MakingMutableCopy making_mutable_copy = MakingMutableCopy{false});
  225. void insert_sub(const Subscription& sub);
  226. private:
  227. // To refresh a MutableSubscriptionSet, you should call commit() and call refresh() on its return value.
  228. void refresh() = delete;
  229. std::pair<iterator, bool> insert_or_assign_impl(iterator it, util::Optional<std::string> name,
  230. std::string object_class_name, std::string query_str);
  231. // Throws is m_tr is in the wrong state.
  232. void check_is_mutable() const;
  233. void insert_sub_impl(ObjectId id, Timestamp created_at, Timestamp updated_at, StringData name,
  234. StringData object_class_name, StringData query_str);
  235. void process_notifications();
  236. TransactionRef m_tr;
  237. Obj m_obj;
  238. State m_old_state;
  239. };
  240. class SubscriptionStore;
  241. using SubscriptionStoreRef = std::shared_ptr<SubscriptionStore>;
  242. // A SubscriptionStore manages the FLX metadata tables, SubscriptionSets and Subscriptions.
  243. class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore> {
  244. public:
  245. static SubscriptionStoreRef create(DBRef db, util::UniqueFunction<void(int64_t)> on_new_subscription_set);
  246. SubscriptionStore(const SubscriptionStore&) = delete;
  247. SubscriptionStore& operator=(const SubscriptionStore&) = delete;
  248. // Get the latest subscription created by calling update_latest(). Once bootstrapping is complete,
  249. // this and get_active() will return the same thing. If no SubscriptionSet has been set, then
  250. // this returns an empty SubscriptionSet that you can clone() in order to mutate.
  251. SubscriptionSet get_latest() const;
  252. // Gets the subscription set that has been acknowledged by the server as having finished bootstrapping.
  253. // If no subscriptions have reached the complete stage, this returns an empty subscription with version
  254. // zero.
  255. SubscriptionSet get_active() const;
  256. struct VersionInfo {
  257. int64_t latest;
  258. int64_t active;
  259. int64_t pending_mark;
  260. };
  261. // Returns the version number of the current active and latest subscription sets. This function guarantees
  262. // that the versions will be read from the same underlying transaction and will thus be consistent.
  263. VersionInfo get_version_info() const;
  264. // To be used internally by the sync client. This returns a mutable view of a subscription set by its
  265. // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound.
  266. MutableSubscriptionSet get_mutable_by_version(int64_t version_id);
  267. // To be used internally by the sync client. This returns a read-only view of a subscription set by its
  268. // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound.
  269. SubscriptionSet get_by_version(int64_t version_id) const;
  270. // Fulfill all previous subscriptions by superceding them. This does not
  271. // affect the mutable subscription identified by the parameter.
  272. void supercede_all_except(MutableSubscriptionSet& mut_sub) const;
  273. // Returns true if there have been commits to the DB since the given version
  274. bool would_refresh(DB::version_type version) const noexcept;
  275. using TableSet = std::set<std::string, std::less<>>;
  276. TableSet get_tables_for_latest(const Transaction& tr) const;
  277. struct PendingSubscription {
  278. int64_t query_version;
  279. DB::version_type snapshot_version;
  280. };
  281. util::Optional<PendingSubscription> get_next_pending_version(int64_t last_query_version,
  282. DB::version_type after_client_version) const;
  283. std::vector<SubscriptionSet> get_pending_subscriptions() const;
  284. private:
  285. using std::enable_shared_from_this<SubscriptionStore>::weak_from_this;
  286. DBRef m_db;
  287. protected:
  288. explicit SubscriptionStore(DBRef db, util::UniqueFunction<void(int64_t)> on_new_subscription_set);
  289. struct NotificationRequest {
  290. NotificationRequest(int64_t version, util::Promise<SubscriptionSet::State> promise,
  291. SubscriptionSet::State notify_when)
  292. : version(version)
  293. , promise(std::move(promise))
  294. , notify_when(notify_when)
  295. {
  296. }
  297. int64_t version;
  298. util::Promise<SubscriptionSet::State> promise;
  299. SubscriptionSet::State notify_when;
  300. };
  301. void supercede_prior_to(TransactionRef tr, int64_t version_id) const;
  302. SubscriptionSet get_by_version_impl(int64_t flx_version, util::Optional<DB::VersionID> version) const;
  303. MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set) const;
  304. friend class MutableSubscriptionSet;
  305. friend class Subscription;
  306. friend class SubscriptionSet;
  307. TableKey m_sub_table;
  308. ColKey m_sub_id;
  309. ColKey m_sub_created_at;
  310. ColKey m_sub_updated_at;
  311. ColKey m_sub_name;
  312. ColKey m_sub_object_class_name;
  313. ColKey m_sub_query_str;
  314. TableKey m_sub_set_table;
  315. ColKey m_sub_set_version_num;
  316. ColKey m_sub_set_snapshot_version;
  317. ColKey m_sub_set_state;
  318. ColKey m_sub_set_error_str;
  319. ColKey m_sub_set_subscriptions;
  320. util::UniqueFunction<void(int64_t)> m_on_new_subscription_set;
  321. mutable std::mutex m_pending_notifications_mutex;
  322. mutable std::condition_variable m_pending_notifications_cv;
  323. mutable int64_t m_outstanding_requests = 0;
  324. mutable int64_t m_min_outstanding_version = 0;
  325. mutable std::list<NotificationRequest> m_pending_notifications;
  326. };
  327. } // namespace realm::sync