Search Postgresql Archives

Re: PostgreSQL as advanced job queuing system

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 2024-03-22 12:43:40 +0100, ushi wrote:
> i am playing with the idea to implement a job queuing system using
> PostgreSQL. To meet requirements the system needs to offer some advanced
> features compared to "classic" queuing systems:
> 
> - users can create new queues at any time
> - queues have priorities
> - priorities of queues can change at any time
> - jobs in queues with the highest priority should be processed first
> 
> A simple schema could look like this:
> 
>    create table queues (
>      id integer primary key,
>      priority integer not null default 0
>    );
> 
>    create table jobs (
>      id serial primary key,
>      queue_id integer not null references queues(id)
>    );
> 
> Right now i am stuck writing an efficient query for dequeuing. This is what i got so far:
> 
>    insert into queues (id, priority) values (1, 0), (2, 1), (3, 1);
>    insert into jobs (queue_id) select 1 from generate_series(1, 1000000);
>    insert into jobs (queue_id) select 2 from generate_series(1, 1000000);
>    insert into jobs (queue_id) select 3 from generate_series(1, 1000000);
> 
[...]
> 
> This is my naive query obeying priority. This is very slow and i could not get it any faster:
> 
>    select *
>    from jobs
>    join queues on queues.id = jobs.queue_id
>    order by priority desc
>    limit 1
>    for update of jobs skip locked;
>    --    id    | queue_id | id | priority
>    -- ---------+----------+----+----------
>    --  1000001 |        2 |  2 |        1
>    -- (1 row)
>    --
>    -- Time: 1116.631 ms (00:01.117)
> 
> Here is the query plan:
> 
>   --                                                                QUERY PLAN
>   --
>   -----------------------------------------------------------------------------------------------------------------------------------------
>   --  Limit  (cost=66225.61..66225.63 rows=1 width=28) (actual time=1333.139..1333.142 rows=1 loops=1)
>   --    ->  LockRows  (cost=66225.61..103725.61 rows=3000000 width=28) (actual time=1333.137..1333.139 rows=1 loops=1)
>   --          ->  Sort  (cost=66225.61..73725.61 rows=3000000 width=28) (actual time=1333.110..1333.112 rows=1 loops=1)
>   --                Sort Key: queues.priority DESC
>   --                Sort Method: external merge  Disk: 111568kB
                                   ^^^^^^^^^^^^^^^^^^^^
                                   You might want to increase work_mem.
>   --                ->  Hash Join  (cost=60.85..51225.61 rows=3000000 width=28) (actual time=0.064..660.317 rows=3000000 loops=1)
[...]
>   --  Planning Time: 0.430 ms
>   --  Execution Time: 1347.448 ms
>   -- (13 rows)

Is 3 queues with 1 million jobs each a realistic scenario in your case?

If you have 3000 queues with 1000 jobs each instead the planner can use
an index on queues(priority):

hjp=> explain(analyze)   select *
   from jobs
   join queues on queues.id = jobs.queue_id
   order by priority desc, jobs.id asc
   limit 1
;
╔═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╗
║                                                                         QUERY PLAN                                                                          ║
╟─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╢
║ Limit  (cost=1222.42..1222.47 rows=1 width=16) (actual time=9.128..9.129 rows=1 loops=1)                                                                    ║
║   ->  Incremental Sort  (cost=1222.42..159673.21 rows=3000000 width=16) (actual time=9.127..9.127 rows=1 loops=1)                                           ║
║         Sort Key: queues.priority DESC, jobs.id                                                                                                             ║
║         Presorted Key: queues.priority                                                                                                                      ║
║         Full-sort Groups: 1  Sort Method: top-N heapsort  Average Memory: 25kB  Peak Memory: 25kB                                                           ║
║         Pre-sorted Groups: 1  Sort Method: top-N heapsort  Average Memory: 25kB  Peak Memory: 25kB                                                          ║
║         ->  Nested Loop  (cost=0.71..107171.21 rows=3000000 width=16) (actual time=0.037..5.824 rows=27001 loops=1)                                         ║
║               ->  Index Scan Backward using queues_priority_idx on queues  (cost=0.28..121.21 rows=3000 width=8) (actual time=0.018..0.059 rows=28 loops=1) ║
║               ->  Index Only Scan using jobs_queue_id_id_idx on jobs  (cost=0.43..25.68 rows=1000 width=8) (actual time=0.011..0.119 rows=964 loops=28)     ║
║                     Index Cond: (queue_id = queues.id)                                                                                                      ║
║                     Heap Fetches: 0                                                                                                                         ║
║ Planning Time: 0.362 ms                                                                                                                                     ║
║ Execution Time: 9.158 ms                                                                                                                                    ║
╚═════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
(13 rows)

hjp=> \d queues
                 Table "public.queues"
╔══════════╤═════════╤═══════════╤══════════╤═════════╗
║  Column  │  Type   │ Collation │ Nullable │ Default ║
╟──────────┼─────────┼───────────┼──────────┼─────────╢
║ id       │ integer │           │ not null │         ║
║ priority │ integer │           │ not null │ 0       ║
╚══════════╧═════════╧═══════════╧══════════╧═════════╝
Indexes:
    "queues_pkey" PRIMARY KEY, btree (id)
    "queues_priority_idx" btree (priority)
Referenced by:
    TABLE "jobs" CONSTRAINT "jobs_queue_id_fkey" FOREIGN KEY (queue_id) REFERENCES queues(id)

hjp=> \d jobs
                              Table "public.jobs"
╔══════════╤═════════╤═══════════╤══════════╤══════════════════════════════════╗
║  Column  │  Type   │ Collation │ Nullable │             Default              ║
╟──────────┼─────────┼───────────┼──────────┼──────────────────────────────────╢
║ id       │ integer │           │ not null │ nextval('jobs_id_seq'::regclass) ║
║ queue_id │ integer │           │ not null │                                  ║
╚══════════╧═════════╧═══════════╧══════════╧══════════════════════════════════╝
Indexes:
    "jobs_pkey" PRIMARY KEY, btree (id)
    "jobs_queue_id_id_idx" btree (queue_id, id)
    "jobs_queue_id_idx" btree (queue_id)
Foreign-key constraints:
    "jobs_queue_id_fkey" FOREIGN KEY (queue_id) REFERENCES queues(id)


If you do have very few very long queues it might be faster to query
each queue separately.

        hp

-- 
   _  | Peter J. Holzer    | Story must make more sense than reality.
|_|_) |                    |
| |   | hjp@xxxxxx         |    -- Charles Stross, "Creative writing
__/   | http://www.hjp.at/ |       challenge!"

Attachment: signature.asc
Description: PGP signature


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]

  Powered by Linux