Search Postgresql Archives

Re: Multithreaded queue in PgSQL

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



Nikola Milutinovic wrote:
Hi all.

This may be trivial, but I cannot find good references for it. The problem is this:

Suppose we have one table in PgSQL which is a job queue, each row represents one job with several status flags, IDs,... Several processes will attempt to access the queue and "take" their batch of jobs, the batch will have some parameterizable size. So, the simple idea is "select N lowest IDs that do not have a flag <in process> set and set the flag", "then proceed with whatever it is that should be done".

Trouble is, with MVCC I don't see a way to prevent overlapping and race conditions. Oh, sure, if I issue select for update, it will lock rows, but, if I understand correctly, the change may not be instantaneous and atomic, so I might get transaction to roll back and then there is error handling that will lead to the uglies serialization I can think of. Let me clarify this, so somebody can tell me if I got it wrong.

Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine 2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do that?

P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
SELECT id FROM job_queue WHERE status="new" and id IN ( SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT 10 FOR UPDATE)
    )
)
P2: the same
P1: SELECT * FROM job_queue WHERE process_id=$1 ....
P2: SELECT * FROM job_queue WHERE process_id=$1 ....

The reason for the 2 selects is that if 2 or more processes content for the same set of jobs, the first one will set the status. The second will, after P1 has released the rows get those rows, that are already taken. Of course, this will most likely return 0 rows for P2, since all 10 will be taken. If I leave out the LIMIT 10 in the inner select, I am effectively locking the entire table. Is that the way to go?

LOCK TABLE job_queue EXCLUSIVE;
UPDATE ...
UNLOCK TABLE job_queue;


we recently use an almost pure SQL method to solve it somehow, but simply apply a hash algorithm to make "partition" inside a table to avoid conflit for different "process_id", the method is we build a simple xor() function to calculate XOR value of job_queue table's ID,
like:

create or replace function xor255(int) returns int as $$
select (($1>>24)&255)#(($1>>16)&255)#(($1>>8)&255)#($1&255) ;
$$language sql immutable;

then create a functional index like:

create index idx_job_queue_xor on job_queue using btree (xor255(id)); --assume your id column is primary key of type integer/serial

then we use query like:

select url from crawljob where xor255(id) >=N and xor255(id)<M and status = 'blahblah' order by blahblah;

to fetch jobs, here the number N and M is some math with your proces_id, then we could sure no conflict
for different process_id.

This method simply use Xoring a integer to obtain a value between 0 and 255, thus we could partition the whole id set into 255 part without conflict with each other thus avoid the race etc. problem with concurrently 256 consumer process at most, and we surly could
change it into adapt for more process, depend on you need.

wish that helps.

-laser




[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]
  Powered by Linux