Skip to content

batches: implement basic fairness for workspace queues

Administrator requested to merge bo/implement-basic-worker-fairness-ssbc into main

Created by: BolajiOlajide

Closes #35981 (closed)

edit from Erik:

Ok, I made some tweaks to the query and got it to work properly in a round-robin fashion, prioritizing the user who dequeued the longest ago.

After I did these changes, I checked the query plan from our k8s instance, see below. (It holds quite some data, so should be representative.)

sg=# select count(*) from batch_spec_workspace_execution_jobs;
 count
--------
 199872
(1 row)
                                                                                              QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 WindowAgg  (cost=43920.55..43920.58 rows=1 width=229) (actual time=18.200..21.619 rows=0 loops=1)
   CTE materialized_queue_candidates
     ->  Sort  (cost=43920.54..43920.55 rows=1 width=238) (actual time=18.198..21.616 rows=0 loops=1)
           Sort Key: (rank() OVER (?)), (max(exec_1.started_at)) NULLS FIRST
           Sort Method: quicksort  Memory: 25kB
           ->  WindowAgg  (cost=43920.51..43920.53 rows=1 width=238) (actual time=18.187..21.605 rows=0 loops=1)
                 ->  Sort  (cost=43920.51..43920.51 rows=1 width=230) (actual time=18.185..21.603 rows=0 loops=1)
                       Sort Key: spec_1.user_id, exec.created_at, exec.id
                       Sort Method: quicksort  Memory: 25kB
                       ->  Nested Loop  (cost=35805.08..43920.50 rows=1 width=230) (actual time=18.171..21.589 rows=0 loops=1)
                             Join Filter: (spec.user_id = spec_1.user_id)
                             ->  Nested Loop  (cost=1000.70..9102.03 rows=1 width=222) (actual time=18.171..21.587 rows=0 loops=1)
                                   ->  Nested Loop  (cost=1000.42..9101.74 rows=1 width=222) (actual time=18.170..21.585 rows=0 loops=1)
                                         ->  Gather  (cost=1000.00..9099.10 rows=1 width=218) (actual time=18.169..21.584 rows=0 loops=1)
                                               Workers Planned: 2
                                               Workers Launched: 2
                                               ->  Parallel Seq Scan on batch_spec_workspace_execution_jobs exec  (cost=0.00..8099.00 rows=1 width=218) (actual time=14.543..14.543 rows=0 loops=3)
                                                     Filter: (state = 'queued'::text)
                                                     Rows Removed by Filter: 66624
                                         ->  Index Scan using batch_spec_workspaces_pkey on batch_spec_workspaces workspace  (cost=0.42..2.64 rows=1 width=12) (never executed)
                                               Index Cond: (id = exec.batch_spec_workspace_id)
                                   ->  Index Scan using batch_specs_pkey on batch_specs spec  (cost=0.28..0.29 rows=1 width=12) (never executed)
                                         Index Cond: (id = workspace.batch_spec_id)
                             ->  Finalize GroupAggregate  (cost=34804.38..34817.68 rows=35 width=12) (never executed)
                                   Group Key: spec_1.user_id
                                   ->  Gather Merge  (cost=34804.38..34816.80 rows=105 width=12) (never executed)
                                         Workers Planned: 3
                                         Workers Launched: 0
                                         ->  Sort  (cost=33804.34..33804.43 rows=35 width=12) (never executed)
                                               Sort Key: spec_1.user_id
                                               ->  Partial HashAggregate  (cost=33803.09..33803.44 rows=35 width=12) (never executed)
                                                     Group Key: spec_1.user_id
                                                     ->  Hash Join  (cost=9046.79..33481.03 rows=64413 width=12) (never executed)
                                                           Hash Cond: (workspace_1.batch_spec_id = spec_1.id)
                                                           ->  Parallel Hash Join  (cost=8931.00..33194.78 rows=64413 width=12) (never executed)
                                                                 Hash Cond: (workspace_1.id = exec_1.batch_spec_workspace_id)
                                                                 ->  Parallel Seq Scan on batch_spec_workspaces workspace_1  (cost=0.00..23514.74 rows=128174 width=12) (never executed)
                                                                 ->  Parallel Hash  (cost=7891.00..7891.00 rows=83200 width=12) (never executed)
                                                                       ->  Parallel Seq Scan on batch_spec_workspace_execution_jobs exec_1  (cost=0.00..7891.00 rows=83200 width=12) (never executed)
                                                           ->  Hash  (cost=109.24..109.24 rows=524 width=12) (never executed)
                                                                 ->  Seq Scan on batch_specs spec_1  (cost=0.00..109.24 rows=524 width=12) (never executed)
   ->  CTE Scan on materialized_queue_candidates  (cost=0.00..0.02 rows=1 width=221) (actual time=18.199..18.199 rows=0 loops=1)
 Planning Time: 1.212 ms
 Execution Time: 21.956 ms
(44 rows)

Overall, it isn't terrible, but it also isn't great considering that this query needs to run very often for polling for new jobs (1/s * executor_count). I think a great improvement would come from denormalizing the schema and putting the tenant ID directly onto the job record as well.

NEXT EDIT

Did some tinkering and one missing index and the denormalization helped a lot, the query is near-instant now:

                                                                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 WindowAgg  (cost=11400.08..11400.11 rows=1 width=233) (actual time=0.055..0.057 rows=0 loops=1)
   CTE materialized_queue_candidates
     ->  Sort  (cost=11400.07..11400.08 rows=1 width=246) (actual time=0.054..0.055 rows=0 loops=1)
           Sort Key: (rank() OVER (?)), (max(exec_1.started_at)) NULLS FIRST
           Sort Method: quicksort  Memory: 25kB
           ->  WindowAgg  (cost=11400.04..11400.06 rows=1 width=246) (actual time=0.042..0.043 rows=0 loops=1)
                 ->  Sort  (cost=11400.04..11400.04 rows=1 width=238) (actual time=0.042..0.042 rows=0 loops=1)
                       Sort Key: exec_1.user_id, exec.created_at, exec.id
                       Sort Method: quicksort  Memory: 25kB
                       ->  Merge Join  (cost=11393.73..11400.03 rows=1 width=238) (actual time=0.031..0.032 rows=0 loops=1)
                             Merge Cond: (exec.user_id = exec_1.user_id)
                             ->  Sort  (cost=2.09..2.09 rows=1 width=226) (actual time=0.030..0.031 rows=0 loops=1)
                                   Sort Key: exec.user_id
                                   Sort Method: quicksort  Memory: 25kB
                                   ->  Index Scan using batch_spec_workspace_execution_jobs_state on batch_spec_workspace_execution_jobs exec  (cost=0.42..2.08 rows=1 width=226) (actual time=0.016..0.017 rows=0 loops=1)
                                         Index Cond: (state = 'queued'::text)
                             ->  Finalize GroupAggregate  (cost=11391.64..11397.72 rows=16 width=12) (never executed)
                                   Group Key: exec_1.user_id
                                   ->  Gather Merge  (cost=11391.64..11397.32 rows=48 width=12) (never executed)
                                         Workers Planned: 3
                                         Workers Launched: 0
                                         ->  Sort  (cost=10391.60..10391.64 rows=16 width=12) (never executed)
                                               Sort Key: exec_1.user_id
                                               ->  Partial HashAggregate  (cost=10391.12..10391.28 rows=16 width=12) (never executed)
                                                     Group Key: exec_1.user_id
                                                     ->  Parallel Seq Scan on batch_spec_workspace_execution_jobs exec_1  (cost=0.00..10068.75 rows=64475 width=12) (never executed)
   ->  CTE Scan on materialized_queue_candidates  (cost=0.00..0.02 rows=1 width=225) (actual time=0.055..0.055 rows=0 loops=1)
 Planning Time: 0.246 ms
 Execution Time: 0.262 ms
(29 rows)

This is what I had to change:

alter table batch_spec_workspace_execution_jobs add column user_id integer;

UPDATE batch_spec_workspace_execution_jobs exec SET user_id = (SELECT spec.user_id FROM batch_spec_workspaces AS workspace JOIN batch_specs spec ON spec.id = workspace.batch_spec_id WHERE workspace.id = exec.batch_spec_workspace_id);

alter table batch_spec_workspace_execution_jobs alter column user_id set not null;

CREATE INDEX batch_spec_workspace_execution_jobs_user_id ON batch_spec_workspace_execution_jobs (user_id);

CREATE INDEX batch_spec_workspace_execution_jobs_state ON batch_spec_workspace_execution_jobs (state);

place_in_queue

The place_in_queue is a bit weird now that we fairly distribute the resources. A record might be in position 10. Then another user comes by and spawns an additional set of jobs. The rank would increase by that because we now start sharing the resources we got. Therefore, I tweaked the place_in_queue field to be the rank within the tenant queue instead. Those should remain stable. This means that a record can stay in a specific position for longer. I added an additional field to the API placeInGlobalQueue that we can use in a tooltip to show that there are other jobs queued in front of this. We should ask our UX expert what would be best here.


That leaves three TODOs:

  • Incorporate that SQL change so that the performance is more acceptable

    • Add SQL migration from above
    • Add code to write user_id on jobs
    • Ensure that user_id can never get out of sync (I think it can't but it should be validated in code)
  • Add a test for the place_in_queue as well. It was incorrect in the first implementation and we didn't catch it

  • Make sure the new rank behavior (see above) is described correctly and maybe expose the global rank in the API, too

  • For backcompat reasons, this needs a second PR that we will merge after 3.41 is cut, that does the following:

    ALTER TABLE batch_spec_workspace_execution_jobs ALTER COLUMN user_id SET NOT NULL;

Test plan

  • Added unit tests to test the round-robin mechanism on the worker level
  • Adjusted test suite to check the queue rank fields
  • Tested locally with a couple of jobs over 3 user accounts in queue, works well and the position in queue remains stable
CleanShot 2022-06-02 at 07 13 24@2x

Merge request reports

Loading