class.jetpack-sync-queue.php 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. <?php
  2. /**
  3. * A buffer of items from the queue that can be checked out
  4. */
  5. class Jetpack_Sync_Queue_Buffer {
  6. public $id;
  7. public $items_with_ids;
  8. public function __construct( $id, $items_with_ids ) {
  9. $this->id = $id;
  10. $this->items_with_ids = $items_with_ids;
  11. }
  12. public function get_items() {
  13. return array_combine( $this->get_item_ids(), $this->get_item_values() );
  14. }
  15. public function get_item_values() {
  16. return Jetpack_Sync_Utils::get_item_values( $this->items_with_ids );
  17. }
  18. public function get_item_ids() {
  19. return Jetpack_Sync_Utils::get_item_ids( $this->items_with_ids );
  20. }
  21. }
  22. /**
  23. * A persistent queue that can be flushed in increments of N items,
  24. * and which blocks reads until checked-out buffers are checked in or
  25. * closed. This uses raw SQL for two reasons: speed, and not triggering
  26. * tons of added_option callbacks.
  27. */
  28. class Jetpack_Sync_Queue {
  29. public $id;
  30. private $row_iterator;
  31. function __construct( $id ) {
  32. $this->id = str_replace( '-', '_', $id ); // necessary to ensure we don't have ID collisions in the SQL
  33. $this->row_iterator = 0;
  34. $this->random_int = mt_rand( 1, 1000000 );
  35. }
  36. function add( $item ) {
  37. global $wpdb;
  38. $added = false;
  39. // this basically tries to add the option until enough time has elapsed that
  40. // it has a unique (microtime-based) option key
  41. while ( ! $added ) {
  42. $rows_added = $wpdb->query( $wpdb->prepare(
  43. "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES (%s, %s,%s)",
  44. $this->get_next_data_row_option_name(),
  45. serialize( $item ),
  46. 'no'
  47. ) );
  48. $added = ( 0 !== $rows_added );
  49. }
  50. }
  51. // Attempts to insert all the items in a single SQL query. May be subject to query size limits!
  52. function add_all( $items ) {
  53. global $wpdb;
  54. $base_option_name = $this->get_next_data_row_option_name();
  55. $query = "INSERT INTO $wpdb->options (option_name, option_value, autoload) VALUES ";
  56. $rows = array();
  57. for ( $i = 0; $i < count( $items ); $i += 1 ) {
  58. $option_name = esc_sql( $base_option_name . '-' . $i );
  59. $option_value = esc_sql( serialize( $items[ $i ] ) );
  60. $rows[] = "('$option_name', '$option_value', 'no')";
  61. }
  62. $rows_added = $wpdb->query( $query . join( ',', $rows ) );
  63. if ( count( $items ) === $rows_added ) {
  64. return new WP_Error( 'row_count_mismatch', "The number of rows inserted didn't match the size of the input array" );
  65. }
  66. }
  67. // Peek at the front-most item on the queue without checking it out
  68. function peek( $count = 1 ) {
  69. $items = $this->fetch_items( $count );
  70. if ( $items ) {
  71. return Jetpack_Sync_Utils::get_item_values( $items );
  72. }
  73. return array();
  74. }
  75. // lag is the difference in time between the age of the oldest item
  76. // (aka first or frontmost item) and the current time
  77. function lag( $now = null ) {
  78. global $wpdb;
  79. $first_item_name = $wpdb->get_var( $wpdb->prepare(
  80. "SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT 1",
  81. "jpsq_{$this->id}-%"
  82. ) );
  83. if ( ! $first_item_name ) {
  84. return 0;
  85. }
  86. if ( null === $now ) {
  87. $now = microtime( true );
  88. }
  89. // break apart the item name to get the timestamp
  90. $matches = null;
  91. if ( preg_match( '/^jpsq_' . $this->id . '-(\d+\.\d+)-/', $first_item_name, $matches ) ) {
  92. return $now - floatval( $matches[1] );
  93. } else {
  94. return 0;
  95. }
  96. }
  97. function reset() {
  98. global $wpdb;
  99. $this->delete_checkout_id();
  100. $wpdb->query( $wpdb->prepare(
  101. "DELETE FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
  102. ) );
  103. }
  104. function size() {
  105. global $wpdb;
  106. return (int) $wpdb->get_var( $wpdb->prepare(
  107. "SELECT count(*) FROM $wpdb->options WHERE option_name LIKE %s", "jpsq_{$this->id}-%"
  108. ) );
  109. }
  110. // we use this peculiar implementation because it's much faster than count(*)
  111. function has_any_items() {
  112. global $wpdb;
  113. $value = $wpdb->get_var( $wpdb->prepare(
  114. "SELECT exists( SELECT option_name FROM $wpdb->options WHERE option_name LIKE %s )", "jpsq_{$this->id}-%"
  115. ) );
  116. return ( $value === '1' );
  117. }
  118. function checkout( $buffer_size ) {
  119. if ( $this->get_checkout_id() ) {
  120. return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
  121. }
  122. $buffer_id = uniqid();
  123. $result = $this->set_checkout_id( $buffer_id );
  124. if ( ! $result || is_wp_error( $result ) ) {
  125. return $result;
  126. }
  127. $items = $this->fetch_items( $buffer_size );
  128. if ( count( $items ) === 0 ) {
  129. return false;
  130. }
  131. $buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, array_slice( $items, 0, $buffer_size ) );
  132. return $buffer;
  133. }
  134. // this checks out rows until it either empties the queue or hits a certain memory limit
  135. // it loads the sizes from the DB first so that it doesn't accidentally
  136. // load more data into memory than it needs to.
  137. // The only way it will load more items than $max_size is if a single queue item
  138. // exceeds the memory limit, but in that case it will send that item by itself.
  139. function checkout_with_memory_limit( $max_memory, $max_buffer_size = 500 ) {
  140. if ( $this->get_checkout_id() ) {
  141. return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
  142. }
  143. $buffer_id = uniqid();
  144. $result = $this->set_checkout_id( $buffer_id );
  145. if ( ! $result || is_wp_error( $result ) ) {
  146. return $result;
  147. }
  148. // get the map of buffer_id -> memory_size
  149. global $wpdb;
  150. $items_with_size = $wpdb->get_results(
  151. $wpdb->prepare(
  152. "SELECT option_name AS id, LENGTH(option_value) AS value_size FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d",
  153. "jpsq_{$this->id}-%",
  154. $max_buffer_size
  155. ),
  156. OBJECT
  157. );
  158. if ( count( $items_with_size ) === 0 ) {
  159. return false;
  160. }
  161. $total_memory = 0;
  162. $min_item_id = $max_item_id = $items_with_size[0]->id;
  163. foreach ( $items_with_size as $id => $item_with_size ) {
  164. $total_memory += $item_with_size->value_size;
  165. // if this is the first item and it exceeds memory, allow loop to continue
  166. // we will exit on the next iteration instead
  167. if ( $total_memory > $max_memory && $id > 0 ) {
  168. break;
  169. }
  170. $max_item_id = $item_with_size->id;
  171. }
  172. $query = $wpdb->prepare(
  173. "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name >= %s and option_name <= %s ORDER BY option_name ASC",
  174. $min_item_id,
  175. $max_item_id
  176. );
  177. $items = $wpdb->get_results( $query, OBJECT );
  178. foreach ( $items as $item ) {
  179. $item->value = maybe_unserialize( $item->value );
  180. }
  181. if ( count( $items ) === 0 ) {
  182. $this->delete_checkout_id();
  183. return false;
  184. }
  185. $buffer = new Jetpack_Sync_Queue_Buffer( $buffer_id, $items );
  186. return $buffer;
  187. }
  188. function checkin( $buffer ) {
  189. $is_valid = $this->validate_checkout( $buffer );
  190. if ( is_wp_error( $is_valid ) ) {
  191. return $is_valid;
  192. }
  193. $this->delete_checkout_id();
  194. return true;
  195. }
  196. function close( $buffer, $ids_to_remove = null ) {
  197. $is_valid = $this->validate_checkout( $buffer );
  198. if ( is_wp_error( $is_valid ) ) {
  199. return $is_valid;
  200. }
  201. $this->delete_checkout_id();
  202. // by default clear all items in the buffer
  203. if ( is_null( $ids_to_remove ) ) {
  204. $ids_to_remove = $buffer->get_item_ids();
  205. }
  206. global $wpdb;
  207. if ( count( $ids_to_remove ) > 0 ) {
  208. $sql = "DELETE FROM $wpdb->options WHERE option_name IN (" . implode( ', ', array_fill( 0, count( $ids_to_remove ), '%s' ) ) . ')';
  209. $query = call_user_func_array( array( $wpdb, 'prepare' ), array_merge( array( $sql ), $ids_to_remove ) );
  210. $wpdb->query( $query );
  211. }
  212. return true;
  213. }
  214. function flush_all() {
  215. $items = Jetpack_Sync_Utils::get_item_values( $this->fetch_items() );
  216. $this->reset();
  217. return $items;
  218. }
  219. function get_all() {
  220. return $this->fetch_items();
  221. }
  222. // use with caution, this could allow multiple processes to delete
  223. // and send from the queue at the same time
  224. function force_checkin() {
  225. $this->delete_checkout_id();
  226. }
  227. // used to lock checkouts from the queue.
  228. // tries to wait up to $timeout seconds for the queue to be empty
  229. function lock( $timeout = 30 ) {
  230. $tries = 0;
  231. while ( $this->has_any_items() && $tries < $timeout ) {
  232. sleep( 1 );
  233. $tries += 1;
  234. }
  235. if ( $tries === 30 ) {
  236. return new WP_Error( 'lock_timeout', 'Timeout waiting for sync queue to empty' );
  237. }
  238. if ( $this->get_checkout_id() ) {
  239. return new WP_Error( 'unclosed_buffer', 'There is an unclosed buffer' );
  240. }
  241. // hopefully this means we can acquire a checkout?
  242. $result = $this->set_checkout_id( 'lock' );
  243. if ( ! $result || is_wp_error( $result ) ) {
  244. return $result;
  245. }
  246. return true;
  247. }
  248. function unlock() {
  249. return $this->delete_checkout_id();
  250. }
  251. /**
  252. * This option is specifically chosen to, as much as possible, preserve time order
  253. * and minimise the possibility of collisions between multiple processes working
  254. * at the same time.
  255. *
  256. * @return string
  257. */
  258. protected function generate_option_name_timestamp() {
  259. return sprintf( '%.6f', microtime( true ) );
  260. }
  261. private function get_checkout_id() {
  262. global $wpdb;
  263. $checkout_value = $wpdb->get_var(
  264. $wpdb->prepare(
  265. "SELECT option_value FROM $wpdb->options WHERE option_name = %s",
  266. $this->get_lock_option_name()
  267. )
  268. );
  269. if ( $checkout_value ) {
  270. list( $checkout_id, $timestamp ) = explode( ':', $checkout_value );
  271. if ( intval( $timestamp ) > time() ) {
  272. return $checkout_id;
  273. }
  274. }
  275. return false;
  276. }
  277. private function set_checkout_id( $checkout_id ) {
  278. global $wpdb;
  279. $expires = time() + Jetpack_Sync_Defaults::$default_sync_queue_lock_timeout;
  280. $updated_num = $wpdb->query(
  281. $wpdb->prepare(
  282. "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
  283. "$checkout_id:$expires",
  284. $this->get_lock_option_name()
  285. )
  286. );
  287. if ( ! $updated_num ) {
  288. $updated_num = $wpdb->query(
  289. $wpdb->prepare(
  290. "INSERT INTO $wpdb->options ( option_name, option_value, autoload ) VALUES ( %s, %s, 'no' )",
  291. $this->get_lock_option_name(),
  292. "$checkout_id:$expires"
  293. )
  294. );
  295. }
  296. return $updated_num;
  297. }
  298. private function delete_checkout_id() {
  299. global $wpdb;
  300. // rather than delete, which causes fragmentation, we update in place
  301. return $wpdb->query(
  302. $wpdb->prepare(
  303. "UPDATE $wpdb->options SET option_value = %s WHERE option_name = %s",
  304. "0:0",
  305. $this->get_lock_option_name()
  306. )
  307. );
  308. }
  309. private function get_lock_option_name() {
  310. return "jpsq_{$this->id}_checkout";
  311. }
  312. private function get_next_data_row_option_name() {
  313. $timestamp = $this->generate_option_name_timestamp();
  314. // row iterator is used to avoid collisions where we're writing data waaay fast in a single process
  315. if ( $this->row_iterator === PHP_INT_MAX ) {
  316. $this->row_iterator = 0;
  317. } else {
  318. $this->row_iterator += 1;
  319. }
  320. return 'jpsq_' . $this->id . '-' . $timestamp . '-' . $this->random_int . '-' . $this->row_iterator;
  321. }
  322. private function fetch_items( $limit = null ) {
  323. global $wpdb;
  324. if ( $limit ) {
  325. $query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC LIMIT %d", "jpsq_{$this->id}-%", $limit );
  326. } else {
  327. $query_sql = $wpdb->prepare( "SELECT option_name AS id, option_value AS value FROM $wpdb->options WHERE option_name LIKE %s ORDER BY option_name ASC", "jpsq_{$this->id}-%" );
  328. }
  329. $items = $wpdb->get_results( $query_sql, OBJECT );
  330. foreach ( $items as $item ) {
  331. $item->value = maybe_unserialize( $item->value );
  332. }
  333. return $items;
  334. }
  335. private function validate_checkout( $buffer ) {
  336. if ( ! $buffer instanceof Jetpack_Sync_Queue_Buffer ) {
  337. return new WP_Error( 'not_a_buffer', 'You must checkin an instance of Jetpack_Sync_Queue_Buffer' );
  338. }
  339. $checkout_id = $this->get_checkout_id();
  340. if ( ! $checkout_id ) {
  341. return new WP_Error( 'buffer_not_checked_out', 'There are no checked out buffers' );
  342. }
  343. if ( $checkout_id != $buffer->id ) {
  344. return new WP_Error( 'buffer_mismatch', 'The buffer you checked in was not checked out' );
  345. }
  346. return true;
  347. }
  348. }
  349. class Jetpack_Sync_Utils {
  350. static function get_item_values( $items ) {
  351. return array_map( array( __CLASS__, 'get_item_value' ), $items );
  352. }
  353. static function get_item_ids( $items ) {
  354. return array_map( array( __CLASS__, 'get_item_id' ), $items );
  355. }
  356. static private function get_item_value( $item ) {
  357. return $item->value;
  358. }
  359. static private function get_item_id( $item ) {
  360. return $item->id;
  361. }
  362. }