Skip to content

dbworker: Low-hanging fruit performance improvements

Warren Gifford requested to merge es/faster-dbworker into main

Created by: eseliger

tl;dr this PR does the following:

  • Clean up conditions that are no longer relevant after we introduced the 'failed' state (errored && num_failures > max_failures)
  • Don't use views where not required
  • Drop the conditions field on QueuedCount, it isn't used and we also don't support it on the MaxDurationInQueue method, and both are meant for metrics. This makes it easier to use TableName over ViewName
  • Reduces the execution time of QueuedCount by ~20ms on average
  • Reduces the execution time of MaxDurationInQueue by ~600ms on average
SELECT COUNT(*) FROM batch_spec_workspace_execution_jobs_with_rank batch_spec_workspace_execution_jobs WHERE (
	state IN ('queued') OR
	(state = 'errored' AND num_failures < 0)
);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=48604.02..48604.03 rows=1 width=8) (actual time=152.380..152.491 rows=1 loops=1)
   ->  Hash Left Join  (cost=32521.08..48527.65 rows=30546 width=0) (actual time=136.684..151.030 rows=30005 loops=1)
         Hash Cond: (j.id = q.id)
         ->  Bitmap Heap Scan on batch_spec_workspace_execution_jobs j  (cost=459.72..16323.97 rows=30546 width=8) (actual time=3.360..11.442 rows=30005 loops=1)
               Recheck Cond: ((state = 'queued'::text) OR (state = 'errored'::text))
               Filter: ((state = 'queued'::text) OR ((state = 'errored'::text) AND (num_failures < 0)))
               Heap Blocks: exact=1473
               ->  BitmapOr  (cost=459.72..459.72 rows=30546 width=0) (actual time=3.146..3.148 rows=0 loops=1)
                     ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..442.92 rows=30546 width=0) (actual time=3.114..3.114 rows=32261 loops=1)
                           Index Cond: (state = 'queued'::text)
                     ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..1.53 rows=1 width=0) (actual time=0.031..0.031 rows=0 loops=1)
                           Index Cond: (state = 'errored'::text)
         ->  Hash  (cost=31679.53..31679.53 rows=30546 width=8) (actual time=133.122..133.230 rows=30005 loops=1)
               Buckets: 32768  Batches: 1  Memory Usage: 1429kB
               ->  Subquery Scan on q  (cost=30992.25..31679.53 rows=30546 width=8) (actual time=120.698..128.160 rows=30005 loops=1)
                     ->  Subquery Scan on queue_candidates  (cost=30992.25..31374.07 rows=30546 width=24) (actual time=120.696..125.877 rows=30005 loops=1)
                           ->  Sort  (cost=30992.25..31068.61 rows=30546 width=36) (actual time=120.693..122.833 rows=30005 loops=1)
                                 Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                 Sort Method: quicksort  Memory: 3113kB
                                 ->  WindowAgg  (cost=28029.48..28716.77 rows=30546 width=36) (actual time=96.197..114.483 rows=30005 loops=1)
                                       ->  Sort  (cost=28029.48..28105.85 rows=30546 width=28) (actual time=96.174..97.607 rows=30005 loops=1)
                                             Sort Key: queue.user_id, exec.created_at, exec.id
                                             Sort Method: quicksort  Memory: 3113kB
                                             ->  Hash Join  (cost=9944.42..25754.01 rows=30546 width=28) (actual time=63.052..77.568 rows=30005 loops=1)
                                                   Hash Cond: (exec.user_id = queue.user_id)
                                                   ->  Bitmap Heap Scan on batch_spec_workspace_execution_jobs exec  (cost=450.55..16162.07 rows=30546 width=20) (actual time=2.489..12.079 rows=30005 loops=1)
                                                         Recheck Cond: (state = 'queued'::text)
                                                         Heap Blocks: exact=1473
                                                         ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..442.92 rows=30546 width=0) (actual time=2.307..2.307 rows=32261 loops=1)
                                                               Index Cond: (state = 'queued'::text)
                                                   ->  Hash  (cost=9493.62..9493.62 rows=19 width=12) (actual time=60.513..60.616 rows=22 loops=1)
                                                         Buckets: 1024  Batches: 1  Memory Usage: 9kB
                                                         ->  Subquery Scan on queue  (cost=1000.48..9493.62 rows=19 width=12) (actual time=59.522..60.583 rows=22 loops=1)
                                                               ->  Finalize GroupAggregate  (cost=1000.48..9493.43 rows=19 width=12) (actual time=59.520..60.578 rows=22 loops=1)
                                                                     Group Key: exec_1.user_id
                                                                     ->  Gather Merge  (cost=1000.48..9492.86 rows=76 width=12) (actual time=59.354..60.559 rows=38 loops=1)
                                                                           Workers Planned: 4
                                                                           Workers Launched: 4
                                                                           ->  Partial GroupAggregate  (cost=0.42..8483.75 rows=19 width=12) (actual time=21.327..43.632 rows=8 loops=5)
                                                                                 Group Key: exec_1.user_id
                                                                                 ->  Parallel Index Only Scan using batch_spec_workspace_execution_jobs_last_dequeue on batch_spec_workspace_execution_jobs exec_1  (cost=0.42..8063.67 rows=83979 width=12) (actual time=0.043..37.672 rows=67183 loops=5)
                                                                                       Heap Fetches: 109359
 Planning Time: 2.136 ms
 Execution Time: 153.838 ms
(44 rows)
SELECT
	COUNT(*)
FROM batch_spec_workspace_execution_jobs_with_rank batch_spec_workspace_execution_jobs
WHERE
	state IN ('queued', 'errored')
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Aggregate  (cost=49787.27..49787.28 rows=1 width=8) (actual time=134.689..134.795 rows=1 loops=1)
   ->  Hash Left Join  (cost=34113.84..49713.34 rows=29572 width=0) (actual time=121.297..133.427 rows=29675 loops=1)
         Hash Cond: (j.id = q.id)
         ->  Bitmap Heap Scan on batch_spec_workspace_execution_jobs j  (cost=429.13..15891.70 rows=29572 width=8) (actual time=2.360..8.445 rows=29675 loops=1)
               Recheck Cond: (state = ANY ('{queued,errored}'::text[]))
               Heap Blocks: exact=1460
               ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..421.74 rows=29572 width=0) (actual time=2.188..2.189 rows=32261 loops=1)
                     Index Cond: (state = ANY ('{queued,errored}'::text[]))
         ->  Hash  (cost=33315.06..33315.06 rows=29572 width=8) (actual time=118.756..118.860 rows=29675 loops=1)
               Buckets: 32768  Batches: 1  Memory Usage: 1416kB
               ->  Subquery Scan on q  (cost=32649.69..33315.06 rows=29572 width=8) (actual time=106.911..113.983 rows=29675 loops=1)
                     ->  Subquery Scan on queue_candidates  (cost=32649.69..33019.34 rows=29572 width=24) (actual time=106.910..111.908 rows=29675 loops=1)
                           ->  Sort  (cost=32649.69..32723.62 rows=29572 width=36) (actual time=106.907..108.955 rows=29675 loops=1)
                                 Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                 Sort Method: quicksort  Memory: 3087kB
                                 ->  WindowAgg  (cost=29788.31..30453.68 rows=29572 width=36) (actual time=84.344..101.149 rows=29675 loops=1)
                                       ->  Sort  (cost=29788.31..29862.24 rows=29572 width=28) (actual time=84.326..85.692 rows=29675 loops=1)
                                             Sort Key: queue.user_id, exec.created_at, exec.id
                                             Sort Method: quicksort  Memory: 3087kB
                                             ->  Hash Join  (cost=12032.74..27592.30 rows=29572 width=28) (actual time=50.792..65.839 rows=29675 loops=1)
                                                   Hash Cond: (exec.user_id = queue.user_id)
                                                   ->  Bitmap Heap Scan on batch_spec_workspace_execution_jobs exec  (cost=436.41..15898.98 rows=29572 width=20) (actual time=2.156..11.845 rows=29675 loops=1)
                                                         Recheck Cond: (state = 'queued'::text)
                                                         Heap Blocks: exact=1460
                                                         ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..429.01 rows=29572 width=0) (actual time=1.988..1.988 rows=32261 loops=1)
                                                               Index Cond: (state = 'queued'::text)
                                                   ->  Hash  (cost=11596.13..11596.13 rows=17 width=12) (actual time=48.614..48.714 rows=22 loops=1)
                                                         Buckets: 1024  Batches: 1  Memory Usage: 9kB
                                                         ->  Subquery Scan on queue  (cost=1000.48..11596.13 rows=17 width=12) (actual time=6.056..48.681 rows=22 loops=1)
                                                               ->  Finalize GroupAggregate  (cost=1000.48..11595.96 rows=17 width=12) (actual time=6.055..48.674 rows=22 loops=1)
                                                                     Group Key: exec_1.user_id
                                                                     ->  Gather Merge  (cost=1000.48..11595.45 rows=68 width=12) (actual time=5.942..48.651 rows=41 loops=1)
                                                                           Workers Planned: 4
                                                                           Workers Launched: 4
                                                                           ->  Partial GroupAggregate  (cost=0.42..10587.29 rows=17 width=12) (actual time=0.271..42.824 rows=8 loops=5)
                                                                                 Group Key: exec_1.user_id
                                                                                 ->  Parallel Index Only Scan using batch_spec_workspace_execution_jobs_last_dequeue on batch_spec_workspace_execution_jobs exec_1  (cost=0.42..10167.22 rows=83979 width=12) (actual time=0.044..37.446 rows=67183 loops=5)
                                                                                       Heap Fetches: 114100
 Planning Time: 0.432 ms
 Execution Time: 135.907 ms
(40 rows)
WITH
candidates AS (
	SELECT * FROM batch_spec_workspace_execution_jobs_with_rank batch_spec_workspace_execution_jobs
),
oldest_queued AS (
	SELECT
		-- Select when the record was most recently dequeueable
		GREATEST(queued_at, process_after) AS last_queued_at
	FROM candidates
	WHERE
		state = 'queued' AND
		(process_after IS NULL OR process_after <= now())
),
oldest_retryable AS (
	SELECT
		-- Select when the record was most recently dequeueable
		finished_at + (15 * '1 second'::interval) AS last_queued_at
	FROM candidates
	WHERE
		0 > 0 AND
		state = 'errored' AND
		now() - finished_at > (0 * '1 second'::interval) AND
		num_failures < 0
),
oldest_record AS (
	(
		SELECT last_queued_at FROM oldest_queued
		UNION
		SELECT last_queued_at FROM oldest_retryable
	)
	ORDER BY last_queued_at
	LIMIT 1
)
SELECT EXTRACT(EPOCH FROM NOW() - last_queued_at)::integer AS age FROM oldest_record;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Subquery Scan on oldest_record  (cost=63607.37..63607.39 rows=1 width=4) (actual time=613.663..613.768 rows=1 loops=1)
   CTE candidates
     ->  Hash Left Join  (cost=34054.36..54349.91 rows=335915 width=235) (actual time=130.675..321.333 rows=335915 loops=1)
           Hash Cond: (j.id = q.id)
           ->  Seq Scan on batch_spec_workspace_execution_jobs j  (cost=0.00..18740.15 rows=335915 width=219) (actual time=0.009..106.792 rows=335915 loops=1)
           ->  Hash  (cost=33684.71..33684.71 rows=29572 width=24) (actual time=130.502..130.603 rows=29317 loops=1)
                 Buckets: 32768  Batches: 1  Memory Usage: 1860kB
                 ->  Subquery Scan on q  (cost=32649.69..33684.71 rows=29572 width=24) (actual time=110.406..124.656 rows=29317 loops=1)
                       ->  WindowAgg  (cost=32649.69..33388.99 rows=29572 width=24) (actual time=110.405..122.478 rows=29317 loops=1)
                             ->  Subquery Scan on queue_candidates  (cost=32649.69..33019.34 rows=29572 width=16) (actual time=110.402..115.123 rows=29317 loops=1)
                                   ->  Sort  (cost=32649.69..32723.62 rows=29572 width=36) (actual time=110.399..112.390 rows=29317 loops=1)
                                         Sort Key: (rank() OVER (?)), queue.latest_dequeue NULLS FIRST
                                         Sort Method: quicksort  Memory: 3059kB
                                         ->  WindowAgg  (cost=29788.31..30453.68 rows=29572 width=36) (actual time=87.649..104.705 rows=29317 loops=1)
                                               ->  Sort  (cost=29788.31..29862.24 rows=29572 width=28) (actual time=87.620..88.970 rows=29317 loops=1)
                                                     Sort Key: queue.user_id, exec.created_at, exec.id
                                                     Sort Method: quicksort  Memory: 3059kB
                                                     ->  Hash Join  (cost=12032.74..27592.30 rows=29572 width=28) (actual time=54.578..69.528 rows=29317 loops=1)
                                                           Hash Cond: (exec.user_id = queue.user_id)
                                                           ->  Bitmap Heap Scan on batch_spec_workspace_execution_jobs exec  (cost=436.41..15898.98 rows=29572 width=20) (actual time=2.606..12.281 rows=29317 loops=1)
                                                                 Recheck Cond: (state = 'queued'::text)
                                                                 Heap Blocks: exact=1447
                                                                 ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..429.01 rows=29572 width=0) (actual time=2.428..2.429 rows=32261 loops=1)
                                                                       Index Cond: (state = 'queued'::text)
                                                           ->  Hash  (cost=11596.13..11596.13 rows=17 width=12) (actual time=51.950..52.045 rows=22 loops=1)
                                                                 Buckets: 1024  Batches: 1  Memory Usage: 9kB
                                                                 ->  Subquery Scan on queue  (cost=1000.48..11596.13 rows=17 width=12) (actual time=51.285..52.013 rows=22 loops=1)
                                                                       ->  Finalize GroupAggregate  (cost=1000.48..11595.96 rows=17 width=12) (actual time=51.283..52.005 rows=22 loops=1)
                                                                             Group Key: exec_1.user_id
                                                                             ->  Gather Merge  (cost=1000.48..11595.45 rows=68 width=12) (actual time=51.175..51.985 rows=37 loops=1)
                                                                                   Workers Planned: 4
                                                                                   Workers Launched: 4
                                                                                   ->  Partial GroupAggregate  (cost=0.42..10587.29 rows=17 width=12) (actual time=9.245..37.100 rows=7 loops=5)
                                                                                         Group Key: exec_1.user_id
                                                                                         ->  Parallel Index Only Scan using batch_spec_workspace_execution_jobs_last_dequeue on batch_spec_workspace_execution_jobs exec_1  (cost=0.42..10167.22 rows=83979 width=12) (actual time=0.038..32.603 rows=67183 loops=5)
                                                                                               Heap Fetches: 118551
   ->  Limit  (cost=9257.46..9257.46 rows=1 width=8) (actual time=613.658..613.659 rows=1 loops=1)
         ->  Sort  (cost=9257.46..9258.88 rows=566 width=8) (actual time=613.657..613.658 rows=1 loops=1)
               Sort Key: (GREATEST(candidates.queued_at, candidates.process_after))
               Sort Method: quicksort  Memory: 25kB
               ->  HashAggregate  (cost=9248.97..9254.63 rows=566 width=8) (actual time=613.643..613.645 rows=1 loops=1)
                     Group Key: (GREATEST(candidates.queued_at, candidates.process_after))
                     ->  Append  (cost=0.00..9247.56 rows=566 width=8) (actual time=133.431..609.934 rows=29317 loops=1)
                           ->  CTE Scan on candidates  (cost=0.00..9239.08 rows=565 width=8) (actual time=133.430..608.122 rows=29317 loops=1)
                                 Filter: ((state = 'queued'::text) AND ((process_after IS NULL) OR (process_after <= now())))
                                 Rows Removed by Filter: 306598
                           ->  Result  (cost=0.00..0.00 rows=0 width=8) (actual time=0.001..0.001 rows=0 loops=1)
                                 One-Time Filter: false
 Planning Time: 0.555 ms
 Execution Time: 625.682 ms
(50 rows)
WITH
oldest_queued AS (
	SELECT
		-- Select when the record was most recently dequeueable
		GREATEST(queued_at, process_after) AS last_queued_at
	FROM batch_spec_workspace_execution_jobs
	WHERE
		state = 'queued' AND
		(process_after IS NULL OR process_after <= now())
),
oldest_retryable AS (
	SELECT
		-- Select when the record was most recently dequeueable
		finished_at + (15 * '1 second'::interval) AS last_queued_at
	FROM batch_spec_workspace_execution_jobs
	WHERE
		0 > 0 AND
		state = 'errored' AND
		now() - finished_at > (0 * '1 second'::interval)
),
oldest_record AS (
	(
		SELECT last_queued_at FROM oldest_queued
		UNION
		SELECT last_queued_at FROM oldest_retryable
	)
	ORDER BY last_queued_at
	LIMIT 1
)
SELECT EXTRACT(EPOCH FROM NOW() - last_queued_at)::integer AS age FROM oldest_record
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Subquery Scan on oldest_record  (cost=17014.51..17014.53 rows=1 width=4) (actual time=17.366..17.368 rows=1 loops=1)
   ->  Limit  (cost=17014.51..17014.51 rows=1 width=8) (actual time=17.360..17.362 rows=1 loops=1)
         ->  Sort  (cost=17014.51..17087.91 rows=29360 width=8) (actual time=17.359..17.360 rows=1 loops=1)
               Sort Key: (GREATEST(batch_spec_workspace_execution_jobs.queued_at, batch_spec_workspace_execution_jobs.process_after))
               Sort Method: quicksort  Memory: 25kB
               ->  HashAggregate  (cost=16574.11..16867.71 rows=29360 width=8) (actual time=17.262..17.343 rows=1 loops=1)
                     Group Key: (GREATEST(batch_spec_workspace_execution_jobs.queued_at, batch_spec_workspace_execution_jobs.process_after))
                     ->  Append  (cost=432.55..16500.71 rows=29360 width=8) (actual time=3.646..13.890 rows=28919 loops=1)
                           ->  Bitmap Heap Scan on batch_spec_workspace_execution_jobs  (cost=432.55..16060.32 rows=29359 width=8) (actual time=3.646..12.142 rows=28919 loops=1)
                                 Recheck Cond: (state = 'queued'::text)
                                 Filter: ((process_after IS NULL) OR (process_after <= now()))
                                 Heap Blocks: exact=1433
                                 ->  Bitmap Index Scan on batch_spec_workspace_execution_jobs_state  (cost=0.00..425.22 rows=29359 width=0) (actual time=3.437..3.437 rows=32261 loops=1)
                                       Index Cond: (state = 'queued'::text)
                           ->  Result  (cost=0.00..0.00 rows=0 width=8) (actual time=0.001..0.002 rows=0 loops=1)
                                 One-Time Filter: false
 Planning Time: 0.441 ms
 Execution Time: 18.053 ms

Test plan

test suite and query plans.

Merge request reports

Loading