Skip to content

ssbc: Fix double dequeues after worker fairness PR

Warren Gifford requested to merge es/fix-worker-fairness-double-dequeues into main

Created by: eseliger

There was one problem with the solution that I didn't see while testing locally, because it doesn't happen when only 1 executor is attached: We dequeue the same record multiple times. This is very weird to me, as I would've thought the DB should error here instead, but that's not the case. What happens: With our complex view query that contains a materialized CTE in the middle of the chain, postgres loses track of the underlying table and doesn't lock the record on there, but on the materialized CTE (which is .. noop?). Hence, two executors can briefly dequeue the same record. I found this to be a larger problem with arbitrary views, so I made the fix in the dbworker implementation instead. Also, this now has a second view that we actually pull the data from to include the other, non-queued records as well. This is needed, because the executor scaling metric needs to be able to view processing records to do proper scaling. This was also broken, but is harder to notice. This fixes it by first selecting from the view and then properly reselecting from the underlying table. The query sounds slow, but actually it's quite fine. The query plan looks horrible I admit, but the runtime is really good. This has been tested on a DB with a few hundred thousand rows:

explain WITH potential_candidates AS (
	SELECT
		id AS candidate_id,
		ROW_NUMBER() OVER (ORDER BY batch_spec_workspace_execution_jobs.place_in_global_queue) AS order
	FROM batch_spec_workspace_execution_jobs_with_rank batch_spec_workspace_execution_jobs
	WHERE
		(
			(
				state = 'queued' AND
				(process_after IS NULL OR process_after <= NOW())
			) OR (
				0 > 0 AND
				state = 'errored' AND
				NOW() - finished_at > (30 * '1 second'::interval) AND
				num_failures < 5
			)
		)
	ORDER BY batch_spec_workspace_execution_jobs.place_in_global_queue
),
candidate AS (
	SELECT
		id FROM batch_spec_workspace_execution_jobs
	JOIN potential_candidates pc ON pc.candidate_id = id
	WHERE
		-- Recheck state.
		state = 'queued'
	ORDER BY pc.order
	FOR UPDATE OF batch_spec_workspace_execution_jobs SKIP LOCKED
	LIMIT 1
),
updated_record AS (
	UPDATE
		batch_spec_workspace_execution_jobs
	SET
		state = 'processing'
	WHERE
		id IN (SELECT id FROM candidate)
)
SELECT
	batch_spec_workspace_execution_jobs.id,
	batch_spec_workspace_execution_jobs.place_in_global_queue,
	batch_spec_workspace_execution_jobs.place_in_user_queue
FROM
	batch_spec_workspace_execution_jobs_with_rank batch_spec_workspace_execution_jobs
WHERE
	id IN (SELECT id FROM candidate);
                                                                                                                           QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Nested Loop Left Join  (cost=36899.15..36900.33 rows=1 width=24) (actual time=0.078..0.081 rows=0 loops=1)
   Join Filter: (j.id = queue_candidates.id)
   CTE candidate
     ->  Limit  (cost=18450.27..18450.29 rows=1 width=62) (actual time=0.076..0.078 rows=0 loops=1)
           ->  LockRows  (cost=18450.27..18450.29 rows=1 width=62) (actual time=0.076..0.077 rows=0 loops=1)
                 ->  Sort  (cost=18450.27..18450.28 rows=1 width=62) (actual time=0.075..0.077 rows=0 loops=1)
                       Sort Key: pc."order"
                       Sort Method: quicksort  Memory: 25kB
                       ->  Nested Loop  (cost=18448.44..18450.26 rows=1 width=62) (actual time=0.034..0.036 rows=0 loops=1)
                             Join Filter: (batch_spec_workspace_execution_jobs.id = pc.candidate_id)
                             ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs  (cost=0.42..2.21 rows=1 width=14) (actual time=0.034..0.034 rows=0 loops=1)
                                   Index Cond: (state = 'queued'::text)
                             ->  Subquery Scan on pc  (cost=18448.01..18448.04 rows=1 width=56) (never executed)
                                   ->  WindowAgg  (cost=18448.01..18448.03 rows=1 width=24) (never executed)
                                         ->  Sort  (cost=18448.01..18448.02 rows=1 width=16) (never executed)
                                               Sort Key: (row_number() OVER (?))
                                               ->  Nested Loop Left Join  (cost=18446.17..18448.00 rows=1 width=16) (never executed)
                                                     Join Filter: (j_1.id = queue_candidates_1.id)
                                                     ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs j_1  (cost=0.42..2.21 rows=1 width=8) (never executed)
                                                           Index Cond: (state = 'queued'::text)
                                                           Filter: ((process_after IS NULL) OR (process_after <= now()))
                                                     ->  WindowAgg  (cost=18445.74..18445.77 rows=1 width=24) (never executed)
                                                           ->  Subquery Scan on queue_candidates_1  (cost=18445.74..18445.76 rows=1 width=8) (never executed)
                                                                 ->  Sort  (cost=18445.74..18445.75 rows=1 width=36) (never executed)
                                                                       Sort Key: (rank() OVER (?)), (max(exec_3.started_at)) NULLS FIRST
                                                                       ->  WindowAgg  (cost=18445.71..18445.73 rows=1 width=36) (never executed)
                                                                             ->  Sort  (cost=18445.71..18445.71 rows=1 width=28) (never executed)
                                                                                   Sort Key: exec_3.user_id, exec_2.created_at, exec_2.id
                                                                                   ->  Hash Join  (cost=18445.31..18445.70 rows=1 width=28) (never executed)
                                                                                         Hash Cond: (exec_3.user_id = exec_2.user_id)
                                                                                         ->  HashAggregate  (cost=18443.09..18443.25 rows=16 width=12) (never executed)
                                                                                               Group Key: exec_3.user_id
                                                                                               ->  Seq Scan on batch_spec_workspace_execution_jobs exec_3  (cost=0.00..16992.06 rows=290206 width=12) (never executed)
                                                                                         ->  Hash  (cost=2.21..2.21 rows=1 width=20) (never executed)
                                                                                               ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec_2  (cost=0.42..2.21 rows=1 width=20) (never executed)
                                                                                                     Index Cond: (state = 'queued'::text)
   CTE updated_record
     ->  Update on batch_spec_workspace_execution_jobs batch_spec_workspace_execution_jobs_1  (cost=0.45..2.67 rows=1 width=285) (actual time=0.001..0.001 rows=0 loops=1)
           ->  Nested Loop  (cost=0.45..2.67 rows=1 width=285) (actual time=0.001..0.001 rows=0 loops=1)
                 ->  HashAggregate  (cost=0.02..0.03 rows=1 width=40) (actual time=0.000..0.000 rows=0 loops=1)
                       Group Key: candidate_1.id
                       ->  CTE Scan on candidate candidate_1  (cost=0.00..0.02 rows=1 width=40) (actual time=0.000..0.000 rows=0 loops=1)
                 ->  Index Scan using batch_spec_workspace_execution_jobs_pkey on batch_spec_workspace_execution_jobs batch_spec_workspace_execution_jobs_1  (cost=0.42..2.64 rows=1 width=217) (never executed)
                       Index Cond: (id = candidate_1.id)
   ->  Nested Loop  (cost=0.45..1.57 rows=1 width=8) (actual time=0.077..0.078 rows=0 loops=1)
         ->  HashAggregate  (cost=0.02..0.03 rows=1 width=8) (actual time=0.077..0.077 rows=0 loops=1)
               Group Key: candidate.id
               ->  CTE Scan on candidate  (cost=0.00..0.02 rows=1 width=8) (actual time=0.077..0.077 rows=0 loops=1)
         ->  Index Only Scan using batch_spec_workspace_execution_jobs_pkey on batch_spec_workspace_execution_jobs j  (cost=0.42..1.54 rows=1 width=8) (never executed)
               Index Cond: (id = candidate.id)
               Heap Fetches: 0
   ->  WindowAgg  (cost=18445.74..18445.77 rows=1 width=24) (never executed)
         ->  Subquery Scan on queue_candidates  (cost=18445.74..18445.76 rows=1 width=16) (never executed)
               ->  Sort  (cost=18445.74..18445.75 rows=1 width=36) (never executed)
                     Sort Key: (rank() OVER (?)), (max(exec_1.started_at)) NULLS FIRST
                     ->  WindowAgg  (cost=18445.71..18445.73 rows=1 width=36) (never executed)
                           ->  Sort  (cost=18445.71..18445.71 rows=1 width=28) (never executed)
                                 Sort Key: exec_1.user_id, exec.created_at, exec.id
                                 ->  Hash Join  (cost=18445.31..18445.70 rows=1 width=28) (never executed)
                                       Hash Cond: (exec_1.user_id = exec.user_id)
                                       ->  HashAggregate  (cost=18443.09..18443.25 rows=16 width=12) (never executed)
                                             Group Key: exec_1.user_id
                                             ->  Seq Scan on batch_spec_workspace_execution_jobs exec_1  (cost=0.00..16992.06 rows=290206 width=12) (never executed)
                                       ->  Hash  (cost=2.21..2.21 rows=1 width=20) (never executed)
                                             ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec  (cost=0.42..2.21 rows=1 width=20) (never executed)
                                                   Index Cond: (state = 'queued'::text)
 Planning Time: 1.228 ms
 Execution Time: 0.432 ms
(68 rows)

Closes https://github.com/sourcegraph/sourcegraph/issues/37071

Test plan

Adjusted test suites, wrote a test that currently fails on main but doesn't on this PR and made sure things still seem to work.

Merge request reports

Loading