transaction.hpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523
  1. /*************************************************************************
  2. *
  3. * Copyright 2016 Realm Inc.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. **************************************************************************/
  18. #ifndef REALM_TRANSACTION_HPP
  19. #define REALM_TRANSACTION_HPP
  20. #include <realm/db.hpp>
  21. namespace realm {
  22. class Transaction : public Group {
  23. public:
  24. Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage);
  25. // convenience, so you don't need to carry a reference to the DB around
  26. ~Transaction();
  27. DB::version_type get_version() const noexcept
  28. {
  29. return m_read_lock.m_version;
  30. }
  31. DB::version_type get_version_of_latest_snapshot()
  32. {
  33. return db->get_version_of_latest_snapshot();
  34. }
  35. /// Get a version id which may be used to request a different transaction locked to specific version.
  36. DB::VersionID get_version_of_current_transaction() const noexcept
  37. {
  38. return VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
  39. }
  40. void close() REQUIRES(!m_async_mutex);
  41. bool is_attached()
  42. {
  43. return m_transact_stage != DB::transact_Ready && db->is_attached();
  44. }
  45. /// Get the approximate size of the data that would be written to the file if
  46. /// a commit were done at this point. The reported size will always be bigger
  47. /// than what will eventually be needed as we reserve a bit more memory than
  48. /// what will be needed.
  49. size_t get_commit_size() const;
  50. DB::version_type commit() REQUIRES(!m_async_mutex);
  51. void rollback() REQUIRES(!m_async_mutex);
  52. void end_read() REQUIRES(!m_async_mutex);
  53. // Live transactions state changes, often taking an observer functor:
  54. VersionID commit_and_continue_as_read(bool commit_to_disk = true) REQUIRES(!m_async_mutex);
  55. VersionID commit_and_continue_writing();
  56. template <class O>
  57. void rollback_and_continue_as_read(O* observer) REQUIRES(!m_async_mutex);
  58. void rollback_and_continue_as_read() REQUIRES(!m_async_mutex)
  59. {
  60. _impl::NullInstructionObserver* o = nullptr;
  61. rollback_and_continue_as_read(o);
  62. }
  63. template <class O>
  64. void advance_read(O* observer, VersionID target_version = VersionID());
  65. void advance_read(VersionID target_version = VersionID())
  66. {
  67. _impl::NullInstructionObserver* o = nullptr;
  68. advance_read(o, target_version);
  69. }
  70. template <class O>
  71. bool promote_to_write(O* observer, bool nonblocking = false) REQUIRES(!m_async_mutex);
  72. bool promote_to_write(bool nonblocking = false) REQUIRES(!m_async_mutex)
  73. {
  74. _impl::NullInstructionObserver* o = nullptr;
  75. return promote_to_write(o, nonblocking);
  76. }
  77. TransactionRef freeze();
  78. // Frozen transactions are created by freeze() or DB::start_frozen()
  79. bool is_frozen() const noexcept override
  80. {
  81. return m_transact_stage == DB::transact_Frozen;
  82. }
  83. bool is_async() noexcept REQUIRES(!m_async_mutex)
  84. {
  85. util::CheckedLockGuard lck(m_async_mutex);
  86. return m_async_stage != AsyncState::Idle;
  87. }
  88. TransactionRef duplicate();
  89. void copy_to(TransactionRef dest) const;
  90. _impl::History* get_history() const;
  91. // direct handover of accessor instances
  92. Obj import_copy_of(const Obj& original);
  93. TableRef import_copy_of(const ConstTableRef original);
  94. LnkLst import_copy_of(const LnkLst& original);
  95. LnkSet import_copy_of(const LnkSet& original);
  96. LstBasePtr import_copy_of(const LstBase& original);
  97. SetBasePtr import_copy_of(const SetBase& original);
  98. CollectionBasePtr import_copy_of(const CollectionBase& original);
  99. LnkLstPtr import_copy_of(const LnkLstPtr& original);
  100. LnkSetPtr import_copy_of(const LnkSetPtr& original);
  101. LinkCollectionPtr import_copy_of(const LinkCollectionPtr& original);
  102. // handover of the heavier Query and TableView
  103. std::unique_ptr<Query> import_copy_of(Query&, PayloadPolicy);
  104. std::unique_ptr<TableView> import_copy_of(TableView&, PayloadPolicy);
  105. /// Get the current transaction type
  106. DB::TransactStage get_transact_stage() const noexcept
  107. {
  108. return m_transact_stage;
  109. }
  110. void upgrade_file_format(int target_file_format_version);
  111. /// Task oriented/async interface for continuous transactions.
  112. // true if this transaction already holds the write mutex
  113. bool holds_write_mutex() const noexcept REQUIRES(!m_async_mutex)
  114. {
  115. util::CheckedLockGuard lck(m_async_mutex);
  116. return m_async_stage == AsyncState::HasLock || m_async_stage == AsyncState::HasCommits;
  117. }
  118. // Convert an existing write transaction to an async write transaction
  119. void promote_to_async() REQUIRES(!m_async_mutex);
  120. // request full synchronization to stable storage for all writes done since
  121. // last sync - or just release write mutex.
  122. // The write mutex is released after full synchronization.
  123. void async_complete_writes(util::UniqueFunction<void()> when_synchronized = nullptr) REQUIRES(!m_async_mutex);
  124. // Complete all pending async work and return once the async stage is Idle.
  125. // If currently in an async write transaction that transaction is cancelled,
  126. // and any async writes which were committed are synchronized.
  127. void prepare_for_close() REQUIRES(!m_async_mutex);
  128. // true if sync to disk has been requested
  129. bool is_synchronizing() noexcept REQUIRES(!m_async_mutex)
  130. {
  131. util::CheckedLockGuard lck(m_async_mutex);
  132. return m_async_stage == AsyncState::Syncing;
  133. }
  134. std::exception_ptr get_commit_exception() noexcept REQUIRES(!m_async_mutex)
  135. {
  136. util::CheckedLockGuard lck(m_async_mutex);
  137. auto err = std::move(m_commit_exception);
  138. m_commit_exception = nullptr;
  139. return err;
  140. }
  141. bool has_unsynced_commits() noexcept REQUIRES(!m_async_mutex)
  142. {
  143. util::CheckedLockGuard lck(m_async_mutex);
  144. return static_cast<bool>(m_oldest_version_not_persisted);
  145. }
  146. private:
  147. enum class AsyncState { Idle, Requesting, HasLock, HasCommits, Syncing };
  148. DBRef get_db() const
  149. {
  150. return db;
  151. }
  152. Replication* const* get_repl() const final
  153. {
  154. return db->get_repl();
  155. }
  156. template <class O>
  157. bool internal_advance_read(O* observer, VersionID target_version, _impl::History&, bool) REQUIRES(!db->m_mutex);
  158. void set_transact_stage(DB::TransactStage stage) noexcept;
  159. void do_end_read() noexcept REQUIRES(!m_async_mutex);
  160. void initialize_replication();
  161. void replicate(Transaction* dest, Replication& repl) const;
  162. void complete_async_commit();
  163. void acquire_write_lock() REQUIRES(!m_async_mutex);
  164. void cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit);
  165. void close_read_with_lock() REQUIRES(!m_async_mutex, db->m_mutex);
  166. DBRef db;
  167. mutable std::unique_ptr<_impl::History> m_history_read;
  168. mutable _impl::History* m_history = nullptr;
  169. DB::ReadLockInfo m_read_lock;
  170. util::Optional<DB::ReadLockInfo> m_oldest_version_not_persisted;
  171. std::exception_ptr m_commit_exception GUARDED_BY(m_async_mutex);
  172. bool m_async_commit_has_failed = false;
  173. // Mutex is protecting access to members just below
  174. util::CheckedMutex m_async_mutex;
  175. std::condition_variable m_async_cv GUARDED_BY(m_async_mutex);
  176. AsyncState m_async_stage GUARDED_BY(m_async_mutex) = AsyncState::Idle;
  177. std::chrono::steady_clock::time_point m_request_time_point;
  178. bool m_waiting_for_write_lock GUARDED_BY(m_async_mutex) = false;
  179. bool m_waiting_for_sync GUARDED_BY(m_async_mutex) = false;
  180. DB::TransactStage m_transact_stage = DB::transact_Ready;
  181. friend class DB;
  182. friend class DisableReplication;
  183. };
  184. /*
  185. * classes providing backward Compatibility with the older
  186. * ReadTransaction and WriteTransaction types.
  187. */
  188. class ReadTransaction {
  189. public:
  190. ReadTransaction(DBRef sg)
  191. : trans(sg->start_read())
  192. {
  193. }
  194. ~ReadTransaction() noexcept {}
  195. operator Transaction&()
  196. {
  197. return *trans;
  198. }
  199. bool has_table(StringData name) const noexcept
  200. {
  201. return trans->has_table(name);
  202. }
  203. ConstTableRef get_table(TableKey key) const
  204. {
  205. return trans->get_table(key); // Throws
  206. }
  207. ConstTableRef get_table(StringData name) const
  208. {
  209. return trans->get_table(name); // Throws
  210. }
  211. const Group& get_group() const noexcept
  212. {
  213. return *trans.get();
  214. }
  215. /// Get the version of the snapshot to which this read transaction is bound.
  216. DB::version_type get_version() const noexcept
  217. {
  218. return trans->get_version();
  219. }
  220. private:
  221. TransactionRef trans;
  222. };
  223. class WriteTransaction {
  224. public:
  225. WriteTransaction(DBRef sg)
  226. : trans(sg->start_write())
  227. {
  228. }
  229. ~WriteTransaction() noexcept {}
  230. operator Transaction&()
  231. {
  232. return *trans;
  233. }
  234. bool has_table(StringData name) const noexcept
  235. {
  236. return trans->has_table(name);
  237. }
  238. TableRef get_table(TableKey key) const
  239. {
  240. return trans->get_table(key); // Throws
  241. }
  242. TableRef get_table(StringData name) const
  243. {
  244. return trans->get_table(name); // Throws
  245. }
  246. TableRef add_table(StringData name, Table::Type table_type = Table::Type::TopLevel) const
  247. {
  248. return trans->add_table(name, table_type); // Throws
  249. }
  250. TableRef get_or_add_table(StringData name, Table::Type table_type = Table::Type::TopLevel,
  251. bool* was_added = nullptr) const
  252. {
  253. return trans->get_or_add_table(name, table_type, was_added); // Throws
  254. }
  255. Group& get_group() const noexcept
  256. {
  257. return *trans.get();
  258. }
  259. /// Get the version of the snapshot on which this write transaction is
  260. /// based.
  261. DB::version_type get_version() const noexcept
  262. {
  263. return trans->get_version();
  264. }
  265. DB::version_type commit()
  266. {
  267. return trans->commit();
  268. }
  269. void rollback() noexcept
  270. {
  271. trans->rollback();
  272. }
  273. private:
  274. TransactionRef trans;
  275. };
  276. // Implementation:
  277. template <class O>
  278. inline void Transaction::advance_read(O* observer, VersionID version_id)
  279. {
  280. if (m_transact_stage != DB::transact_Reading)
  281. throw WrongTransactionState("Not a read transaction");
  282. // It is an error if the new version precedes the currently bound one.
  283. if (version_id.version < m_read_lock.m_version)
  284. throw IllegalOperation("Requesting an older version when advancing");
  285. auto hist = get_history(); // Throws
  286. if (!hist)
  287. throw IllegalOperation("No transaction log when advancing");
  288. auto old_version = m_read_lock.m_version;
  289. internal_advance_read(observer, version_id, *hist, false); // Throws
  290. if (db->m_logger) {
  291. db->m_logger->log(util::Logger::Level::trace, "Advance read: %1 -> %2", old_version, m_read_lock.m_version);
  292. }
  293. }
  294. template <class O>
  295. inline bool Transaction::promote_to_write(O* observer, bool nonblocking)
  296. {
  297. if (m_transact_stage != DB::transact_Reading)
  298. throw WrongTransactionState("Not a read transaction");
  299. if (!holds_write_mutex()) {
  300. if (nonblocking) {
  301. bool succes = db->do_try_begin_write();
  302. if (!succes) {
  303. return false;
  304. }
  305. }
  306. else {
  307. auto t1 = std::chrono::steady_clock::now();
  308. acquire_write_lock(); // Throws
  309. if (db->m_logger) {
  310. auto t2 = std::chrono::steady_clock::now();
  311. db->m_logger->log(util::Logger::Level::trace, "Acquired write lock in %1 us",
  312. std::chrono::duration_cast<std::chrono::microseconds>(t2 - t1).count());
  313. }
  314. }
  315. }
  316. auto old_version = m_read_lock.m_version;
  317. try {
  318. Replication* repl = db->get_replication();
  319. if (!repl)
  320. throw IllegalOperation("No transaction log when promoting to write");
  321. VersionID version = VersionID(); // Latest
  322. m_history = repl->_get_history_write();
  323. bool history_updated = internal_advance_read(observer, version, *m_history, true); // Throws
  324. REALM_ASSERT(repl); // Presence of `repl` follows from the presence of `hist`
  325. DB::version_type current_version = m_read_lock.m_version;
  326. m_alloc.init_mapping_management(current_version);
  327. repl->initiate_transact(*this, current_version, history_updated); // Throws
  328. // If the group has no top array (top_ref == 0), create a new node
  329. // structure for an empty group now, to be ready for modifications. See
  330. // also Group::attach_shared().
  331. if (!m_top.is_attached())
  332. create_empty_group(); // Throws
  333. }
  334. catch (...) {
  335. if (!holds_write_mutex())
  336. db->end_write_on_correct_thread();
  337. m_history = nullptr;
  338. throw;
  339. }
  340. if (db->m_logger) {
  341. db->m_logger->log(util::Logger::Level::trace, "Promote to write: %1 -> %2", old_version,
  342. m_read_lock.m_version);
  343. }
  344. set_transact_stage(DB::transact_Writing);
  345. return true;
  346. }
  347. template <class O>
  348. inline void Transaction::rollback_and_continue_as_read(O* observer)
  349. {
  350. if (m_transact_stage != DB::transact_Writing)
  351. throw WrongTransactionState("Not a write transaction");
  352. Replication* repl = db->get_replication();
  353. if (!repl)
  354. throw IllegalOperation("No transaction log when rolling back");
  355. BinaryData uncommitted_changes = repl->get_uncommitted_changes();
  356. // Possible optimization: We are currently creating two transaction log parsers, one here,
  357. // and one in advance_transact(). That is wasteful as the parser creation is
  358. // expensive.
  359. util::SimpleInputStream in(uncommitted_changes);
  360. _impl::TransactLogParser parser; // Throws
  361. _impl::TransactReverser reverser;
  362. parser.parse(in, reverser); // Throws
  363. if (observer && uncommitted_changes.size()) {
  364. _impl::ReversedNoCopyInputStream reversed_in(reverser);
  365. parser.parse(reversed_in, *observer); // Throws
  366. observer->parse_complete(); // Throws
  367. }
  368. // Mark all managed space (beyond the attached file) as free.
  369. db->reset_free_space_tracking(); // Throws
  370. m_read_lock.check();
  371. ref_type top_ref = m_read_lock.m_top_ref;
  372. size_t file_size = m_read_lock.m_file_size;
  373. _impl::ReversedNoCopyInputStream reversed_in(reverser);
  374. // since we had the write lock, we already have the latest encrypted pages in memory
  375. m_alloc.update_reader_view(file_size); // Throws
  376. update_allocator_wrappers(false);
  377. advance_transact(top_ref, reversed_in, false); // Throws
  378. if (!holds_write_mutex())
  379. db->end_write_on_correct_thread();
  380. if (db->m_logger) {
  381. db->m_logger->log(util::Logger::Level::trace, "Rollback");
  382. }
  383. m_history = nullptr;
  384. set_transact_stage(DB::transact_Reading);
  385. }
  386. template <class O>
  387. inline bool Transaction::internal_advance_read(O* observer, VersionID version_id, _impl::History& hist, bool writable)
  388. {
  389. DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, version_id); // Throws
  390. REALM_ASSERT(new_read_lock.m_version >= m_read_lock.m_version);
  391. if (new_read_lock.m_version == m_read_lock.m_version) {
  392. db->release_read_lock(new_read_lock);
  393. // _impl::History::update_early_from_top_ref() was not called
  394. // update allocator wrappers merely to update write protection
  395. update_allocator_wrappers(writable);
  396. return false;
  397. }
  398. DB::version_type old_version = m_read_lock.m_version;
  399. DB::ReadLockGuard g(*db, new_read_lock);
  400. DB::version_type new_version = new_read_lock.m_version;
  401. size_t new_file_size = new_read_lock.m_file_size;
  402. ref_type new_top_ref = new_read_lock.m_top_ref;
  403. // Synchronize readers view of the file
  404. SlabAlloc& alloc = m_alloc;
  405. alloc.update_reader_view(new_file_size);
  406. update_allocator_wrappers(writable);
  407. using gf = _impl::GroupFriend;
  408. ref_type hist_ref = gf::get_history_ref(alloc, new_top_ref);
  409. hist.update_from_ref_and_version(hist_ref, new_version);
  410. if (observer) {
  411. // This has to happen in the context of the originally bound snapshot
  412. // and while the read transaction is still in a fully functional state.
  413. _impl::TransactLogParser parser;
  414. _impl::ChangesetInputStream in(hist, old_version, new_version);
  415. parser.parse(in, *observer); // Throws
  416. observer->parse_complete(); // Throws
  417. }
  418. // The old read lock must be retained for as long as the change history is
  419. // accessed (until Group::advance_transact() returns). This ensures that the
  420. // oldest needed changeset remains in the history, even when the history is
  421. // implemented as a separate unversioned entity outside the Realm (i.e., the
  422. // old implementation and ShortCircuitHistory in
  423. // test_lang_Bind_helper.cpp). On the other hand, if it had been the case,
  424. // that the history was always implemented as a versioned entity, that was
  425. // part of the Realm state, then it would not have been necessary to retain
  426. // the old read lock beyond this point.
  427. _impl::ChangesetInputStream in(hist, old_version, new_version);
  428. advance_transact(new_top_ref, in, writable); // Throws
  429. g.release();
  430. db->release_read_lock(m_read_lock);
  431. m_read_lock = new_read_lock;
  432. return true; // _impl::History::update_early_from_top_ref() was called
  433. }
  434. } // namespace realm
  435. #endif /* REALM_TRANSACTION_HPP */