Skip to content

ssbc: Make computation of candidate queues faster

Warren Gifford requested to merge es/faster-worker-queue-determination into main

Created by: eseliger

Before we had to do full table scans to find the latest dequeue times for each user each time we would compute the next item to dequeue. This makes it faster by persisting the last dequeue time to a table using triggers to ensure it's always up to date. This approach has been used by code intel for some tables for some time, so it's production approved, I'd say.

Comparison

Nothing in queue (was already really good but didn't get worse)

Before:

QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=10718.47..10718.49 rows=1 width=62) (actual time=0.041..0.043 rows=0 loops=1)
   ->  LockRows  (cost=10718.47..10718.49 rows=1 width=62) (actual time=0.040..0.042 rows=0 loops=1)
         ->  Sort  (cost=10718.47..10718.48 rows=1 width=62) (actual time=0.040..0.042 rows=0 loops=1)
               Sort Key: pc."order"
               Sort Method: quicksort  Memory: 25kB
               ->  Nested Loop  (cost=10585.37..10718.46 rows=1 width=62) (actual time=0.034..0.035 rows=0 loops=1)
                     ->  Subquery Scan on pc  (cost=10584.95..10586.32 rows=50 width=56) (actual time=0.033..0.035 rows=0 loops=1)
                           ->  Limit  (cost=10584.95..10585.82 rows=50 width=24) (actual time=0.033..0.034 rows=0 loops=1)
                                 ->  WindowAgg  (cost=10584.95..10590.65 rows=326 width=24) (actual time=0.033..0.034 rows=0 loops=1)
                                       ->  Sort  (cost=10584.95..10585.76 rows=326 width=16) (actual time=0.032..0.033 rows=0 loops=1)
                                             Sort Key: q.place_in_global_queue
                                             Sort Method: quicksort  Memory: 25kB
                                             ->  Hash Left Join  (cost=10295.15..10571.34 rows=326 width=16) (actual time=0.020..0.022 rows=0 loops=1)
                                                   Hash Cond: (j.id = q.id)
                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs j  (cost=0.42..275.38 rows=326 width=8) (actual time=0.020..0.020 rows=0 loops=1)
                                                         Index Cond: (state = 'queued'::text)
                                                         Filter: ((process_after IS NULL) OR (process_after <= now()))
                                                   ->  Hash  (cost=10290.65..10290.65 rows=326 width=16) (never executed)
                                                         ->  Subquery Scan on q  (cost=10279.24..10290.65 rows=326 width=16) (never executed)
                                                               ->  WindowAgg  (cost=10279.24..10287.39 rows=326 width=24) (never executed)
                                                                     ->  Subquery Scan on queue_candidates  (cost=10279.24..10283.31 rows=326 width=8) (never executed)
                                                                           ->  Sort  (cost=10279.24..10280.05 rows=326 width=36) (never executed)
                                                                                 Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                                                                 ->  WindowAgg  (cost=10258.29..10265.63 rows=326 width=36) (never executed)
                                                                                       ->  Sort  (cost=10258.29..10259.11 rows=326 width=28) (never executed)
                                                                                             Sort Key: queue.user_id, exec.created_at, exec.id
                                                                                             ->  Nested Loop  (cost=0.84..10244.69 rows=326 width=28) (never executed)
                                                                                                   Join Filter: (exec.user_id = queue.user_id)
                                                                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec  (cost=0.42..273.75 rows=326 width=20) (never executed)
                                                                                                         Index Cond: (state = 'queued'::text)
                                                                                                   ->  Materialize  (cost=0.42..9892.15 rows=17 width=12) (never executed)
                                                                                                         ->  Subquery Scan on queue  (cost=0.42..9892.07 rows=17 width=12) (never executed)
                                                                                                               ->  GroupAggregate  (cost=0.42..9891.90 rows=17 width=12) (never executed)
                                                                                                                     Group Key: exec_1.user_id
                                                                                                                     ->  Index Only Scan using batch_spec_workspace_execution_jobs_last_dequeue on batch_spec_workspace_execution_jobs exec_1  (cost=0.42..8206.51 rows=337044 width=12) (never executed)
                                                                                                                           Heap Fetches: 0
                     ->  Index Scan using batch_spec_workspace_execution_jobs_pkey on batch_spec_workspace_execution_jobs  (cost=0.42..2.64 rows=1 width=14) (never executed)
                           Index Cond: (id = pc.candidate_id)
                           Filter: (state = ANY ('{queued,errored}'::text[]))
 Planning Time: 0.977 ms
 Execution Time: 0.265 ms

After:

                                                                                                                              QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=804.29..804.31 rows=1 width=62) (actual time=0.038..0.040 rows=0 loops=1)
   ->  LockRows  (cost=804.29..804.31 rows=1 width=62) (actual time=0.038..0.039 rows=0 loops=1)
         ->  Sort  (cost=804.29..804.30 rows=1 width=62) (actual time=0.037..0.039 rows=0 loops=1)
               Sort Key: pc."order"
               Sort Method: quicksort  Memory: 25kB
               ->  Nested Loop  (cost=671.19..804.28 rows=1 width=62) (actual time=0.030..0.031 rows=0 loops=1)
                     ->  Subquery Scan on pc  (cost=670.77..672.14 rows=50 width=56) (actual time=0.029..0.031 rows=0 loops=1)
                           ->  Limit  (cost=670.77..671.64 rows=50 width=24) (actual time=0.029..0.031 rows=0 loops=1)
                                 ->  WindowAgg  (cost=670.77..676.47 rows=326 width=24) (actual time=0.029..0.030 rows=0 loops=1)
                                       ->  Sort  (cost=670.77..671.58 rows=326 width=16) (actual time=0.028..0.029 rows=0 loops=1)
                                             Sort Key: q.place_in_global_queue
                                             Sort Method: quicksort  Memory: 25kB
                                             ->  Hash Left Join  (cost=380.97..657.16 rows=326 width=16) (actual time=0.019..0.020 rows=0 loops=1)
                                                   Hash Cond: (j.id = q.id)
                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs j  (cost=0.42..275.38 rows=326 width=8) (actual time=0.018..0.018 rows=0 loops=1)
                                                         Index Cond: (state = 'queued'::text)
                                                         Filter: ((process_after IS NULL) OR (process_after <= now()))
                                                   ->  Hash  (cost=376.47..376.47 rows=326 width=16) (never executed)
                                                         ->  Subquery Scan on q  (cost=365.06..376.47 rows=326 width=16) (never executed)
                                                               ->  WindowAgg  (cost=365.06..373.21 rows=326 width=24) (never executed)
                                                                     ->  Subquery Scan on queue_candidates  (cost=365.06..369.14 rows=326 width=8) (never executed)
                                                                           ->  Sort  (cost=365.06..365.88 rows=326 width=36) (never executed)
                                                                                 Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                                                                 ->  WindowAgg  (cost=344.12..351.45 rows=326 width=36) (never executed)
                                                                                       ->  Sort  (cost=344.12..344.93 rows=326 width=28) (never executed)
                                                                                             Sort Key: queue.user_id, exec.created_at, exec.id
                                                                                             ->  Hash Join  (cost=56.32..330.51 rows=326 width=28) (never executed)
                                                                                                   Hash Cond: (exec.user_id = queue.user_id)
                                                                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec  (cost=0.42..273.75 rows=326 width=20) (never executed)
                                                                                                         Index Cond: (state = 'queued'::text)
                                                                                                   ->  Hash  (cost=30.40..30.40 rows=2040 width=12) (never executed)
                                                                                                         ->  Seq Scan on batch_spec_workspace_execution_last_dequeues queue  (cost=0.00..30.40 rows=2040 width=12) (never executed)
                     ->  Index Scan using batch_spec_workspace_execution_jobs_pkey on batch_spec_workspace_execution_jobs  (cost=0.42..2.64 rows=1 width=14) (never executed)
                           Index Cond: (id = pc.candidate_id)
                           Filter: (state = ANY ('{queued,errored}'::text[]))
 Planning Time: 0.743 ms
 Execution Time: 0.207 ms
(37 rows)

24000 records queued

Before:

QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=10718.47..10718.49 rows=1 width=62) (actual time=186.816..186.825 rows=1 loops=1)
   ->  LockRows  (cost=10718.47..10718.49 rows=1 width=62) (actual time=186.815..186.823 rows=1 loops=1)
         ->  Sort  (cost=10718.47..10718.48 rows=1 width=62) (actual time=186.796..186.804 rows=1 loops=1)
               Sort Key: pc."order"
               Sort Method: quicksort  Memory: 32kB
               ->  Nested Loop  (cost=10585.37..10718.46 rows=1 width=62) (actual time=186.608..186.783 rows=50 loops=1)
                     ->  Subquery Scan on pc  (cost=10584.95..10586.32 rows=50 width=56) (actual time=186.578..186.629 rows=50 loops=1)
                           ->  Limit  (cost=10584.95..10585.82 rows=50 width=24) (actual time=186.558..186.597 rows=50 loops=1)
                                 ->  WindowAgg  (cost=10584.95..10590.65 rows=326 width=24) (actual time=186.558..186.592 rows=50 loops=1)
                                       ->  Sort  (cost=10584.95..10585.76 rows=326 width=16) (actual time=186.544..186.554 rows=51 loops=1)
                                             Sort Key: q.place_in_global_queue
                                             Sort Method: quicksort  Memory: 1910kB
                                             ->  Hash Left Join  (cost=10295.15..10571.34 rows=326 width=16) (actual time=168.569..182.523 rows=24362 loops=1)
                                                   Hash Cond: (j.id = q.id)
                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs j  (cost=0.42..275.38 rows=326 width=8) (actual time=0.044..8.551 rows=24362 loops=1)
                                                         Index Cond: (state = 'queued'::text)
                                                         Filter: ((process_after IS NULL) OR (process_after <= now()))
                                                   ->  Hash  (cost=10290.65..10290.65 rows=326 width=16) (actual time=168.500..168.505 rows=24362 loops=1)
                                                         Buckets: 32768 (originally 1024)  Batches: 1 (originally 1)  Memory Usage: 1398kB
                                                         ->  Subquery Scan on q  (cost=10279.24..10290.65 rows=326 width=16) (actual time=151.406..163.611 rows=24362 loops=1)
                                                               ->  WindowAgg  (cost=10279.24..10287.39 rows=326 width=24) (actual time=151.405..161.738 rows=24362 loops=1)
                                                                     ->  Subquery Scan on queue_candidates  (cost=10279.24..10283.31 rows=326 width=8) (actual time=151.402..154.977 rows=24362 loops=1)
                                                                           ->  Sort  (cost=10279.24..10280.05 rows=326 width=36) (actual time=151.401..152.885 rows=24362 loops=1)
                                                                                 Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                                                                 Sort Method: quicksort  Memory: 2672kB
                                                                                 ->  WindowAgg  (cost=10258.29..10265.63 rows=326 width=36) (actual time=129.476..145.426 rows=24362 loops=1)
                                                                                       ->  Sort  (cost=10258.29..10259.11 rows=326 width=28) (actual time=129.457..130.688 rows=24362 loops=1)
                                                                                             Sort Key: queue.user_id, exec.created_at, exec.id
                                                                                             Sort Method: quicksort  Memory: 2672kB
                                                                                             ->  Nested Loop  (cost=0.84..10244.69 rows=326 width=28) (actual time=99.031..123.881 rows=24362 loops=1)
                                                                                                   Join Filter: (exec.user_id = queue.user_id)
                                                                                                   Rows Removed by Join Filter: 97448
                                                                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec  (cost=0.42..273.75 rows=326 width=20) (actual time=0.024..8.702 rows=24362 loops=1)
                                                                                                         Index Cond: (state = 'queued'::text)
                                                                                                   ->  Materialize  (cost=0.42..9892.15 rows=17 width=12) (actual time=0.000..0.004 rows=5 loops=24362)
                                                                                                         ->  Subquery Scan on queue  (cost=0.42..9892.07 rows=17 width=12) (actual time=0.042..98.995 rows=5 loops=1)
                                                                                                               ->  GroupAggregate  (cost=0.42..9891.90 rows=17 width=12) (actual time=0.041..98.993 rows=5 loops=1)
                                                                                                                     Group Key: exec_1.user_id
                                                                                                                     ->  Index Only Scan using batch_spec_workspace_execution_jobs_last_dequeue on batch_spec_workspace_execution_jobs exec_1  (cost=0.42..8206.51 rows=337044 width=12) (actual time=0.027..73.499 rows=351240 loops=1)
                                                                                                                           Heap Fetches: 90581
                     ->  Index Scan using batch_spec_workspace_execution_jobs_pkey on batch_spec_workspace_execution_jobs  (cost=0.42..2.64 rows=1 width=14) (actual time=0.003..0.003 rows=1 loops=50)
                           Index Cond: (id = pc.candidate_id)
                           Filter: (state = ANY ('{queued,errored}'::text[]))
 Planning Time: 0.881 ms
 Execution Time: 188.086 ms
(45 rows)

After:

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=804.29..804.31 rows=1 width=62) (actual time=74.121..74.127 rows=1 loops=1)
   ->  LockRows  (cost=804.29..804.31 rows=1 width=62) (actual time=74.120..74.126 rows=1 loops=1)
         ->  Sort  (cost=804.29..804.30 rows=1 width=62) (actual time=74.095..74.101 rows=1 loops=1)
               Sort Key: pc."order"
               Sort Method: quicksort  Memory: 32kB
               ->  Nested Loop  (cost=671.19..804.28 rows=1 width=62) (actual time=73.953..74.082 rows=50 loops=1)
                     ->  Subquery Scan on pc  (cost=670.77..672.14 rows=50 width=56) (actual time=73.934..73.985 rows=50 loops=1)
                           ->  Limit  (cost=670.77..671.64 rows=50 width=24) (actual time=73.928..73.968 rows=50 loops=1)
                                 ->  WindowAgg  (cost=670.77..676.47 rows=326 width=24) (actual time=73.927..73.964 rows=50 loops=1)
                                       ->  Sort  (cost=670.77..671.58 rows=326 width=16) (actual time=73.909..73.916 rows=51 loops=1)
                                             Sort Key: q.place_in_global_queue
                                             Sort Method: quicksort  Memory: 1910kB
                                             ->  Hash Left Join  (cost=380.97..657.16 rows=326 width=16) (actual time=56.822..70.091 rows=24360 loops=1)
                                                   Hash Cond: (j.id = q.id)
                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs j  (cost=0.42..275.38 rows=326 width=8) (actual time=0.041..8.347 rows=24360 loops=1)
                                                         Index Cond: (state = 'queued'::text)
                                                         Filter: ((process_after IS NULL) OR (process_after <= now()))
                                                   ->  Hash  (cost=376.47..376.47 rows=326 width=16) (actual time=56.766..56.770 rows=24360 loops=1)
                                                         Buckets: 32768 (originally 1024)  Batches: 1 (originally 1)  Memory Usage: 1398kB
                                                         ->  Subquery Scan on q  (cost=365.06..376.47 rows=326 width=16) (actual time=39.565..51.897 rows=24360 loops=1)
                                                               ->  WindowAgg  (cost=365.06..373.21 rows=326 width=24) (actual time=39.565..49.976 rows=24360 loops=1)
                                                                     ->  Subquery Scan on queue_candidates  (cost=365.06..369.14 rows=326 width=8) (actual time=39.561..43.137 rows=24360 loops=1)
                                                                           ->  Sort  (cost=365.06..365.88 rows=326 width=36) (actual time=39.560..41.026 rows=24360 loops=1)
                                                                                 Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                                                                 Sort Method: quicksort  Memory: 2672kB
                                                                                 ->  WindowAgg  (cost=344.12..351.45 rows=326 width=36) (actual time=18.974..33.974 rows=24360 loops=1)
                                                                                       ->  Sort  (cost=344.12..344.93 rows=326 width=28) (actual time=18.955..20.070 rows=24360 loops=1)
                                                                                             Sort Key: queue.user_id, exec.created_at, exec.id
                                                                                             Sort Method: quicksort  Memory: 2672kB
                                                                                             ->  Hash Join  (cost=56.32..330.51 rows=326 width=28) (actual time=0.087..13.668 rows=24360 loops=1)
                                                                                                   Hash Cond: (exec.user_id = queue.user_id)
                                                                                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec  (cost=0.42..273.75 rows=326 width=20) (actual time=0.022..9.687 rows=24360 loops=1)
                                                                                                         Index Cond: (state = 'queued'::text)
                                                                                                   ->  Hash  (cost=30.40..30.40 rows=2040 width=12) (actual time=0.049..0.050 rows=24 loops=1)
                                                                                                         Buckets: 2048  Batches: 1  Memory Usage: 18kB
                                                                                                         ->  Seq Scan on batch_spec_workspace_execution_last_dequeues queue  (cost=0.00..30.40 rows=2040 width=12) (actual time=0.029..0.031 rows=24 loops=1)
                     ->  Index Scan using batch_spec_workspace_execution_jobs_pkey on batch_spec_workspace_execution_jobs  (cost=0.42..2.64 rows=1 width=14) (actual time=0.002..0.002 rows=1 loops=50)
                           Index Cond: (id = pc.candidate_id)
                           Filter: (state = ANY ('{queued,errored}'::text[]))
 Planning Time: 0.664 ms
 Execution Time: 75.104 ms
(41 rows)

Test plan

This is being tested pretty extensively in code after previous regressions, and I also manually verified the order is still correct.

Merge request reports

Loading