> We are doing the same (newsletter) and there is no problem to lock the > whole table for a short time with an advisory lock as the java id > fetching worker is locking the table (that does not lock the table for > reading or writing, it is only locking his java worker brothers that > are using the same advisory lock), fetches, let's say, 50 id's of > records marked as CREATED and changes their status to PROCESSING. Then > several workers are getting the id's and fetch the needed data from > the table independently and process and update them in parallel. We > have 3 java machines getting id's for 10 parallel workers and > everything works just fine. > > Getting the IDs is much much faster usually then real processing. Weird this thread came back up today. I actually implemented a small example this morning in Python using Klint Gore's suggestion and was very happy with the results. I have attached 4 files for anyone interested in looking at a really simple example. You'll need Python+psycopg2 driver for this, but anyone should be able to follow this easily: db.py - Edit this file with your db settings. client.py - This would be your worker process insert.py - This is used to enter some jobs into the jobs table. reset.py - This is used to reset each job back to 'REQUESTED' status. This example assumes that you have a jobs table like below: create table jobs(id serial primary key, status text not null) First edit db.py with your own db settings. Fire up as many copies of client.py as you'd like. Now enter some jobs into jobs table. Running insert.py will enter 100 jobs for you. Now watch as your clients process the jobs. Once all of the jobs have finished processing, you can run reset.py to mark all of the jobs back to 'REQUESTED' status so that the clients start processing all over again. I hope it is OK to attach examples! Just seems like this question comes up often. Jeff Peck
import psycopg2 host = 'localhost' db = 'your_db' user = 'your_username' password = 'your_pw' connect_string = "dbname='%s' user='%s' host='%s' password='%s'" % ( db, user, host, password ) def connect_db(): try: connection = psycopg2.connect(connect_string) except Exception, e: print "Error connection to db!" raise else: print "Successfull connection" return connection
import time import psycopg2 import psycopg2.extensions import db def get_new_job_count(connection): """ get_new_job_count(connection) -> int Returns the number of new jobs found in the jobs table. New jobs have a status of 'REQUESTED' """ cursor = connection.cursor() cursor.execute("BEGIN TRANSACTION") cursor.execute(""" SELECT count(status) FROM jobs WHERE status = 'REQUESTED' """ ) count = cursor.fetchall()[0][0] cursor.execute("COMMIT") return count def get_new_jobs(connection): """ get_new_jobs(connection) -> [(id, status), ....., (id_n, status_n)] Returns new jobs found in jobs table. This will return a max of 10 jobs at a time. """ jobs = [] try: cursor = connection.cursor() cursor.execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE") cursor.execute(""" UPDATE jobs SET status = 'PROCESSING' WHERE id IN( SELECT id FROM jobs WHERE status = 'REQUESTED' LIMIT 10 ) RETURNING * """ ) jobs = cursor.fetchall() cursor.execute("COMMIT") except psycopg2.extensions.TransactionRollbackError, e: # Another client had the job table locked. Just continue. We will have # to retry the call to get_new_jobs again. cursor.execute("COMMIT") return jobs def mark_complete(connection, jobs): """ mark_complete(connection, jobs) -> None Sets the status flag for each passed in job to COMPLETE. """ cursor = connection.cursor() sql = """UPDATE jobs SET status='COMPLETE' WHERE id=%(id)s""" update_params = [{'id': job[0]} for job in jobs] cursor.execute("BEGIN TRANSACTION") cursor.executemany(sql, update_params) cursor.execute("COMMIT TRANSACTION") connection = db.connect_db() my_job_ids = [] collisions = 0 while 1: # First see if there are any new jobs entered into the queue count = get_new_job_count(connection) if not count: if len(my_job_ids): print "Job queue empty. I processed these jobs:" my_job_ids.sort() print my_job_ids print "I processed %d jobs" % (len(my_job_ids)) print "I had %d collisions" % (collisions) my_job_ids = [] collisions = 0 time.sleep(5) continue else: print "%d jobs remaining in queue" % (count) jobs = get_new_jobs(connection) if not len(jobs): print "None found, or my jobs were marked by another client" collisions += 1 else: # you would do any processing needed for these jobs here. my_job_ids.extend([j[0] for j in jobs]) #time.sleep(3) # Lets pretend it takes about 3 secs to process jobs mark_complete(connection, jobs) time.sleep(1)
import psycopg2 import db # This controls how many new jobs get inserted by insert_new_jobs function how_many = 100 def insert_new_jobs(cursor): """ insert_new_jobs(cursor) -> None This inserts 100 new jobs into jobs job queue table. """ cursor.execute("BEGIN TRANSACTION") for i in range(how_many): cursor.execute("INSERT INTO jobs(status) VALUES ('REQUESTED')") cursor.execute("COMMIT TRANSACTION") connection = db.connect_db() cursor = connection.cursor() insert_new_jobs(cursor) connection.close()
import psycopg2 import db def reset_job_table(cursor): """ reset_job_table(cursor) -> None This sets each job in jobs table to 'REQUESTED' status' """ cursor.execute("BEGIN TRANSACTION") cursor.execute("UPDATE jobs set status='REQUESTED'") cursor.execute("COMMIT TRANSACTION") connection = db.connect_db() cursor = connection.cursor() reset_job_table(cursor) connection.close()