123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- /*************************************************************************
- *
- * Copyright 2021 Realm, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- **************************************************************************/
- #pragma once
- #include "realm/db.hpp"
- #include "realm/obj.hpp"
- #include "realm/query.hpp"
- #include "realm/timestamp.hpp"
- #include "realm/util/future.hpp"
- #include "realm/util/functional.hpp"
- #include "realm/util/optional.hpp"
- #include "realm/util/tagged_bool.hpp"
- #include <list>
- #include <set>
- #include <string_view>
- namespace realm::sync {
- class MutableSubscriptionSet;
- class SubscriptionSet;
- class SubscriptionStore;
- // A Subscription represents a single query that may be OR'd with other queries on the same object class to be
- // send to the server in a QUERY or IDENT message.
- class Subscription {
- public:
- // The unique ID for this subscription.
- ObjectId id;
- // The timestamp of when this subscription was originally created.
- Timestamp created_at;
- // The timestamp of the last time this subscription was updated by calling update_query.
- Timestamp updated_at;
- // The name of the subscription that was set when it was created, or util::none if it was created without a name.
- util::Optional<std::string> name;
- // The name of the object class of the query for this subscription.
- std::string object_class_name;
- // A stringified version of the query associated with this subscription.
- std::string query_string;
- // Returns whether the 2 subscriptions passed have the same id.
- friend bool operator==(const Subscription& lhs, const Subscription& rhs)
- {
- return lhs.id == rhs.id;
- }
- Subscription() = default;
- Subscription(const SubscriptionStore* parent, Obj obj);
- Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str);
- };
- // SubscriptionSets contain a set of unique queries by either name or Query object that will be constructed into a
- // single QUERY or IDENT message to be sent to the server.
- class SubscriptionSet {
- public:
- /*
- * State diagram:
- *
- * ┌───────────┬─────────►Error──────────────────────────┐
- * │ │ │
- * │ │ ▼
- * Uncommitted──►Pending──►Bootstrapping──►AwaitingMark──►Complete───►Superseded
- * │ ▲
- * │ │
- * └────────────────────────────┘
- *
- */
- enum class State {
- // This subscription set has not been persisted and has not been sent to the server. This state is only valid
- // for MutableSubscriptionSets
- Uncommitted = 0,
- // The subscription set has been persisted locally but has not been acknowledged by the server yet.
- Pending,
- // The server is currently sending the initial state that represents this subscription set to the client.
- Bootstrapping,
- // This subscription set is the active subscription set that is currently being synchronized with the server.
- Complete,
- // An error occurred while processing this subscription set on the server. Check error_str() for details.
- Error,
- // The server responded to a later subscription set to this one and this one has been trimmed from the
- // local storage of subscription sets.
- Superseded,
- // The last bootstrap message containing the initial state for this subscription set has been received. The
- // client is awaiting a mark message to mark this subscription as fully caught up to history.
- AwaitingMark,
- };
- static constexpr int64_t EmptyVersion = int64_t(-1);
- // Used in tests.
- inline friend std::ostream& operator<<(std::ostream& o, State state)
- {
- switch (state) {
- case State::Uncommitted:
- o << "Uncommitted";
- break;
- case State::Pending:
- o << "Pending";
- break;
- case State::Bootstrapping:
- o << "Bootstrapping";
- break;
- case State::AwaitingMark:
- o << "AwaitingMark";
- break;
- case State::Complete:
- o << "Complete";
- break;
- case State::Error:
- o << "Error";
- break;
- case State::Superseded:
- o << "Superseded";
- break;
- }
- return o;
- }
- using const_iterator = std::vector<Subscription>::const_iterator;
- using iterator = const_iterator; // Note: no mutable access provided through iterators.
- // This will make a copy of this subscription set with the next available version number and return it as
- // a mutable SubscriptionSet to be updated. The new SubscriptionSet's state will be Uncommitted. This
- // subscription set will be unchanged.
- MutableSubscriptionSet make_mutable_copy() const;
- // Returns a future that will resolve either with an error status if this subscription set encounters an
- // error, or resolves when the subscription set reaches at least that state. It's possible for a subscription
- // set to skip a state (i.e. go from Pending to Complete or Pending to Superseded), and the future value
- // will the the state it actually reached.
- util::Future<State> get_state_change_notification(State notify_when) const;
- void get_state_change_notification(
- State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> callback) const;
- // The query version number used in the sync wire protocol to identify this subscription set to the server.
- int64_t version() const;
- // The database version that this subscription set was created at or -1 if Uncommitted.
- DB::version_type snapshot_version() const;
- // The current state of this subscription set
- State state() const;
- // The error string for this subscription set if any.
- StringData error_str() const;
- // Returns the number of subscriptions in the set.
- size_t size() const;
- // An iterator interface for finding/working with individual subscriptions.
- iterator begin() const;
- iterator end() const;
- const Subscription& at(size_t index) const;
- // Returns a pointer to the Subscription matching either the name or Query object, or nullptr if no such
- // subscription exists.
- const Subscription* find(StringData name) const;
- const Subscription* find(const Query& query) const;
- // Returns this query set as extended JSON in a form suitable for transmitting to the server.
- std::string to_ext_json() const;
- // Reloads the state of this SubscriptionSet so that it reflects the latest state from synchronizing with the
- // server. This will invalidate all iterators.
- void refresh();
- protected:
- friend class SubscriptionStore;
- struct SupersededTag {
- };
- using MakingMutableCopy = util::TaggedBool<class MakingMutableCopyTag>;
- explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag);
- explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj,
- MakingMutableCopy making_mutable_copy = MakingMutableCopy(false));
- void load_from_database(Obj obj);
- // Get a reference to the SubscriptionStore. It may briefly extend the lifetime of the store.
- std::shared_ptr<const SubscriptionStore> get_flx_subscription_store() const;
- std::weak_ptr<const SubscriptionStore> m_mgr;
- DB::version_type m_cur_version = 0;
- int64_t m_version = 0;
- State m_state = State::Uncommitted;
- std::string m_error_str;
- DB::version_type m_snapshot_version = -1;
- std::vector<Subscription> m_subs;
- };
- class MutableSubscriptionSet : public SubscriptionSet {
- public:
- // Erases all subscriptions in the subscription set.
- void clear();
- // Inserts a new subscription into the set if one does not exist already - returns an iterator to the
- // subscription and a bool that is true if a new subscription was actually created. The SubscriptionSet
- // must be in the Uncommitted state to call this - otherwise this will throw.
- //
- // The Query portion of the subscription is mutable, however the name portion is immutable after the
- // subscription is inserted.
- //
- // If insert is called twice for the same name, the Query portion and updated_at timestamp for that named
- // subscription will be updated to match the new Query.
- std::pair<iterator, bool> insert_or_assign(std::string_view name, const Query& query);
- // Inserts a new subscription into the set if one does not exist already - returns an iterator to the
- // subscription and a bool that is true if a new subscription was actually created. The SubscriptionSet
- // must be in the Uncommitted state to call this - otherwise this will throw.
- //
- // If insert is called twice for the same query, then the updated_at timestamp for that subscription will
- // be updated.
- //
- // The inserted subscription will have an empty name - to update this Subscription's query, the caller
- // will have
- std::pair<iterator, bool> insert_or_assign(const Query& query);
- void import(const SubscriptionSet&);
- // Erases a subscription pointed to by an iterator. Returns the "next" iterator in the set - to provide
- // STL compatibility. The SubscriptionSet must be in the Uncommitted state to call this - otherwise
- // this will throw.
- iterator erase(iterator it);
- // Erases the subscription identified by the argument, if any. Returns true if anything was removed.
- bool erase(StringData name);
- bool erase(const Query& query);
- // Updates the state of the transaction and optionally updates its error information.
- //
- // You may only set an error_str when the State is State::Error.
- //
- // If set to State::Complete, this will erase all subscription sets with a version less than this one's.
- //
- // This should be called internally within the sync client.
- void update_state(State state, util::Optional<std::string_view> error_str = util::none);
- // This commits any changes to the subscription set and returns an this subscription set as an immutable view
- // from after the commit. This MutableSubscriptionSet object must not be used after calling commit().
- SubscriptionSet commit();
- protected:
- friend class SubscriptionStore;
- MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
- MakingMutableCopy making_mutable_copy = MakingMutableCopy{false});
- void insert_sub(const Subscription& sub);
- private:
- // To refresh a MutableSubscriptionSet, you should call commit() and call refresh() on its return value.
- void refresh() = delete;
- std::pair<iterator, bool> insert_or_assign_impl(iterator it, util::Optional<std::string> name,
- std::string object_class_name, std::string query_str);
- // Throws is m_tr is in the wrong state.
- void check_is_mutable() const;
- void insert_sub_impl(ObjectId id, Timestamp created_at, Timestamp updated_at, StringData name,
- StringData object_class_name, StringData query_str);
- void process_notifications();
- TransactionRef m_tr;
- Obj m_obj;
- State m_old_state;
- };
- class SubscriptionStore;
- using SubscriptionStoreRef = std::shared_ptr<SubscriptionStore>;
- // A SubscriptionStore manages the FLX metadata tables, SubscriptionSets and Subscriptions.
- class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore> {
- public:
- static SubscriptionStoreRef create(DBRef db, util::UniqueFunction<void(int64_t)> on_new_subscription_set);
- SubscriptionStore(const SubscriptionStore&) = delete;
- SubscriptionStore& operator=(const SubscriptionStore&) = delete;
- // Get the latest subscription created by calling update_latest(). Once bootstrapping is complete,
- // this and get_active() will return the same thing. If no SubscriptionSet has been set, then
- // this returns an empty SubscriptionSet that you can clone() in order to mutate.
- SubscriptionSet get_latest() const;
- // Gets the subscription set that has been acknowledged by the server as having finished bootstrapping.
- // If no subscriptions have reached the complete stage, this returns an empty subscription with version
- // zero.
- SubscriptionSet get_active() const;
- struct VersionInfo {
- int64_t latest;
- int64_t active;
- int64_t pending_mark;
- };
- // Returns the version number of the current active and latest subscription sets. This function guarantees
- // that the versions will be read from the same underlying transaction and will thus be consistent.
- VersionInfo get_version_info() const;
- // To be used internally by the sync client. This returns a mutable view of a subscription set by its
- // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound.
- MutableSubscriptionSet get_mutable_by_version(int64_t version_id);
- // To be used internally by the sync client. This returns a read-only view of a subscription set by its
- // version ID. If there is no SubscriptionSet with that version ID, this throws KeyNotFound.
- SubscriptionSet get_by_version(int64_t version_id) const;
- // Fulfill all previous subscriptions by superceding them. This does not
- // affect the mutable subscription identified by the parameter.
- void supercede_all_except(MutableSubscriptionSet& mut_sub) const;
- // Returns true if there have been commits to the DB since the given version
- bool would_refresh(DB::version_type version) const noexcept;
- using TableSet = std::set<std::string, std::less<>>;
- TableSet get_tables_for_latest(const Transaction& tr) const;
- struct PendingSubscription {
- int64_t query_version;
- DB::version_type snapshot_version;
- };
- util::Optional<PendingSubscription> get_next_pending_version(int64_t last_query_version,
- DB::version_type after_client_version) const;
- std::vector<SubscriptionSet> get_pending_subscriptions() const;
- private:
- using std::enable_shared_from_this<SubscriptionStore>::weak_from_this;
- DBRef m_db;
- protected:
- explicit SubscriptionStore(DBRef db, util::UniqueFunction<void(int64_t)> on_new_subscription_set);
- struct NotificationRequest {
- NotificationRequest(int64_t version, util::Promise<SubscriptionSet::State> promise,
- SubscriptionSet::State notify_when)
- : version(version)
- , promise(std::move(promise))
- , notify_when(notify_when)
- {
- }
- int64_t version;
- util::Promise<SubscriptionSet::State> promise;
- SubscriptionSet::State notify_when;
- };
- void supercede_prior_to(TransactionRef tr, int64_t version_id) const;
- SubscriptionSet get_by_version_impl(int64_t flx_version, util::Optional<DB::VersionID> version) const;
- MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set) const;
- friend class MutableSubscriptionSet;
- friend class Subscription;
- friend class SubscriptionSet;
- TableKey m_sub_table;
- ColKey m_sub_id;
- ColKey m_sub_created_at;
- ColKey m_sub_updated_at;
- ColKey m_sub_name;
- ColKey m_sub_object_class_name;
- ColKey m_sub_query_str;
- TableKey m_sub_set_table;
- ColKey m_sub_set_version_num;
- ColKey m_sub_set_snapshot_version;
- ColKey m_sub_set_state;
- ColKey m_sub_set_error_str;
- ColKey m_sub_set_subscriptions;
- util::UniqueFunction<void(int64_t)> m_on_new_subscription_set;
- mutable std::mutex m_pending_notifications_mutex;
- mutable std::condition_variable m_pending_notifications_cv;
- mutable int64_t m_outstanding_requests = 0;
- mutable int64_t m_min_outstanding_version = 0;
- mutable std::list<NotificationRequest> m_pending_notifications;
- };
- } // namespace realm::sync
|