123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- #pragma once
- #include "realm/db.hpp"
- #include "realm/obj.hpp"
- #include "realm/query.hpp"
- #include "realm/timestamp.hpp"
- #include "realm/util/future.hpp"
- #include "realm/util/functional.hpp"
- #include "realm/util/optional.hpp"
- #include "realm/util/tagged_bool.hpp"
- #include <list>
- #include <set>
- #include <string_view>
- namespace realm::sync {
- class MutableSubscriptionSet;
- class SubscriptionSet;
- class SubscriptionStore;
- class Subscription {
- public:
-
- ObjectId id;
-
- Timestamp created_at;
-
- Timestamp updated_at;
-
- util::Optional<std::string> name;
-
- std::string object_class_name;
-
- std::string query_string;
-
- friend bool operator==(const Subscription& lhs, const Subscription& rhs)
- {
- return lhs.id == rhs.id;
- }
- Subscription() = default;
- Subscription(const SubscriptionStore* parent, Obj obj);
- Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str);
- };
- class SubscriptionSet {
- public:
-
- enum class State {
-
-
- Uncommitted = 0,
-
- Pending,
-
- Bootstrapping,
-
- Complete,
-
- Error,
-
-
- Superseded,
-
-
- AwaitingMark,
- };
- static constexpr int64_t EmptyVersion = int64_t(-1);
-
- inline friend std::ostream& operator<<(std::ostream& o, State state)
- {
- switch (state) {
- case State::Uncommitted:
- o << "Uncommitted";
- break;
- case State::Pending:
- o << "Pending";
- break;
- case State::Bootstrapping:
- o << "Bootstrapping";
- break;
- case State::AwaitingMark:
- o << "AwaitingMark";
- break;
- case State::Complete:
- o << "Complete";
- break;
- case State::Error:
- o << "Error";
- break;
- case State::Superseded:
- o << "Superseded";
- break;
- }
- return o;
- }
- using const_iterator = std::vector<Subscription>::const_iterator;
- using iterator = const_iterator;
-
-
-
- MutableSubscriptionSet make_mutable_copy() const;
-
-
-
-
- util::Future<State> get_state_change_notification(State notify_when) const;
- void get_state_change_notification(
- State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> callback) const;
-
- int64_t version() const;
-
- DB::version_type snapshot_version() const;
-
- State state() const;
-
- StringData error_str() const;
-
- size_t size() const;
-
- iterator begin() const;
- iterator end() const;
- const Subscription& at(size_t index) const;
-
-
- const Subscription* find(StringData name) const;
- const Subscription* find(const Query& query) const;
-
- std::string to_ext_json() const;
-
-
- void refresh();
- protected:
- friend class SubscriptionStore;
- struct SupersededTag {
- };
- using MakingMutableCopy = util::TaggedBool<class MakingMutableCopyTag>;
- explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag);
- explicit SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj,
- MakingMutableCopy making_mutable_copy = MakingMutableCopy(false));
- void load_from_database(Obj obj);
-
- std::shared_ptr<const SubscriptionStore> get_flx_subscription_store() const;
- std::weak_ptr<const SubscriptionStore> m_mgr;
- DB::version_type m_cur_version = 0;
- int64_t m_version = 0;
- State m_state = State::Uncommitted;
- std::string m_error_str;
- DB::version_type m_snapshot_version = -1;
- std::vector<Subscription> m_subs;
- };
- class MutableSubscriptionSet : public SubscriptionSet {
- public:
-
- void clear();
-
-
-
-
-
-
-
-
-
- std::pair<iterator, bool> insert_or_assign(std::string_view name, const Query& query);
-
-
-
-
-
-
-
-
-
- std::pair<iterator, bool> insert_or_assign(const Query& query);
- void import(const SubscriptionSet&);
-
-
-
- iterator erase(iterator it);
-
- bool erase(StringData name);
- bool erase(const Query& query);
-
-
-
-
-
-
-
- void update_state(State state, util::Optional<std::string_view> error_str = util::none);
-
-
- SubscriptionSet commit();
- protected:
- friend class SubscriptionStore;
- MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
- MakingMutableCopy making_mutable_copy = MakingMutableCopy{false});
- void insert_sub(const Subscription& sub);
- private:
-
- void refresh() = delete;
- std::pair<iterator, bool> insert_or_assign_impl(iterator it, util::Optional<std::string> name,
- std::string object_class_name, std::string query_str);
-
- void check_is_mutable() const;
- void insert_sub_impl(ObjectId id, Timestamp created_at, Timestamp updated_at, StringData name,
- StringData object_class_name, StringData query_str);
- void process_notifications();
- TransactionRef m_tr;
- Obj m_obj;
- State m_old_state;
- };
- class SubscriptionStore;
- using SubscriptionStoreRef = std::shared_ptr<SubscriptionStore>;
- class SubscriptionStore : public std::enable_shared_from_this<SubscriptionStore> {
- public:
- static SubscriptionStoreRef create(DBRef db, util::UniqueFunction<void(int64_t)> on_new_subscription_set);
- SubscriptionStore(const SubscriptionStore&) = delete;
- SubscriptionStore& operator=(const SubscriptionStore&) = delete;
-
-
-
- SubscriptionSet get_latest() const;
-
-
-
- SubscriptionSet get_active() const;
- struct VersionInfo {
- int64_t latest;
- int64_t active;
- int64_t pending_mark;
- };
-
-
- VersionInfo get_version_info() const;
-
-
- MutableSubscriptionSet get_mutable_by_version(int64_t version_id);
-
-
- SubscriptionSet get_by_version(int64_t version_id) const;
-
-
- void supercede_all_except(MutableSubscriptionSet& mut_sub) const;
-
- bool would_refresh(DB::version_type version) const noexcept;
- using TableSet = std::set<std::string, std::less<>>;
- TableSet get_tables_for_latest(const Transaction& tr) const;
- struct PendingSubscription {
- int64_t query_version;
- DB::version_type snapshot_version;
- };
- util::Optional<PendingSubscription> get_next_pending_version(int64_t last_query_version,
- DB::version_type after_client_version) const;
- std::vector<SubscriptionSet> get_pending_subscriptions() const;
- private:
- using std::enable_shared_from_this<SubscriptionStore>::weak_from_this;
- DBRef m_db;
- protected:
- explicit SubscriptionStore(DBRef db, util::UniqueFunction<void(int64_t)> on_new_subscription_set);
- struct NotificationRequest {
- NotificationRequest(int64_t version, util::Promise<SubscriptionSet::State> promise,
- SubscriptionSet::State notify_when)
- : version(version)
- , promise(std::move(promise))
- , notify_when(notify_when)
- {
- }
- int64_t version;
- util::Promise<SubscriptionSet::State> promise;
- SubscriptionSet::State notify_when;
- };
- void supercede_prior_to(TransactionRef tr, int64_t version_id) const;
- SubscriptionSet get_by_version_impl(int64_t flx_version, util::Optional<DB::VersionID> version) const;
- MutableSubscriptionSet make_mutable_copy(const SubscriptionSet& set) const;
- friend class MutableSubscriptionSet;
- friend class Subscription;
- friend class SubscriptionSet;
- TableKey m_sub_table;
- ColKey m_sub_id;
- ColKey m_sub_created_at;
- ColKey m_sub_updated_at;
- ColKey m_sub_name;
- ColKey m_sub_object_class_name;
- ColKey m_sub_query_str;
- TableKey m_sub_set_table;
- ColKey m_sub_set_version_num;
- ColKey m_sub_set_snapshot_version;
- ColKey m_sub_set_state;
- ColKey m_sub_set_error_str;
- ColKey m_sub_set_subscriptions;
- util::UniqueFunction<void(int64_t)> m_on_new_subscription_set;
- mutable std::mutex m_pending_notifications_mutex;
- mutable std::condition_variable m_pending_notifications_cv;
- mutable int64_t m_outstanding_requests = 0;
- mutable int64_t m_min_outstanding_version = 0;
- mutable std::list<NotificationRequest> m_pending_notifications;
- };
- }
|