Nikola Milutinovic wrote:
Hi all.
This may be trivial, but I cannot find good references for it. The
problem is this:
Suppose we have one table in PgSQL which is a job queue, each row
represents one job with several status flags, IDs,... Several
processes will attempt to access the queue and "take" their batch of
jobs, the batch will have some parameterizable size. So, the simple
idea is "select N lowest IDs that do not have a flag <in process> set
and set the flag", "then proceed with whatever it is that should be done".
Trouble is, with MVCC I don't see a way to prevent overlapping and
race conditions. Oh, sure, if I issue select for update, it will lock
rows, but, if I understand correctly, the change may not be
instantaneous and atomic, so I might get transaction to roll back and
then there is error handling that will lead to the uglies
serialization I can think of. Let me clarify this, so somebody can
tell me if I got it wrong.
Imagine Queue table with 20 rows, ID: 1,...,20, status="new". Imagine
2 processes/threads (P1, P2) attempting to get 10 jobs each.How to do
P1: UPDATE job_queue SET process_id=$1, status="in process" WHERE id IN (
SELECT id FROM job_queue WHERE status="new" and id IN (
SELECT id FROM job_queue WHERE status="new" ORDER BY id LIMIT
P2: the same
P1: SELECT * FROM job_queue WHERE process_id=$1 ....
P2: SELECT * FROM job_queue WHERE process_id=$1 ....
The reason for the 2 selects is that if 2 or more processes content
for the same set of jobs, the first one will set the status. The
second will, after P1 has released the rows get those rows, that are
already taken. Of course, this will most likely return 0 rows for P2,
since all 10 will be taken. If I leave out the LIMIT 10 in the inner
select, I am effectively locking the entire table. Is that the way to go?
UNLOCK TABLE job_queue;
we recently use an almost pure SQL method to solve it somehow, but
simply apply a hash
algorithm to make "partition" inside a table to avoid conflit for
different "process_id",
the method is we build a simple xor() function to calculate XOR value of
job_queue table's ID,
create or replace function xor255(int) returns int as $$
select (($1>>24)&255)#(($1>>16)&255)#(($1>>8)&255)#($1&255) ;
$$language sql immutable;
then create a functional index like:
create index idx_job_queue_xor on job_queue using btree (xor255(id));
--assume your id column is primary key of type integer/serial
then we use query like:
select url from crawljob where xor255(id) >=N and xor255(id)<M and
status = 'blahblah' order by blahblah;
to fetch jobs, here the number N and M is some math with your proces_id,
then we could sure no conflict
for different process_id.
This method simply use Xoring a integer to obtain a value between 0 and
255, thus we
could partition the whole id set into 255 part without conflict with
each other thus avoid
the race etc. problem with concurrently 256 consumer process at most,
and we surly could
change it into adapt for more process, depend on you need.
wish that helps.