Search Postgresql Archives

Re: Multithreaded queue in PgSQL

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



> We are doing the same (newsletter) and there is no problem to lock the
> whole table for a short time with an advisory lock as the java id
> fetching worker is locking the table (that does not lock the table for
> reading or writing, it is only locking his java worker brothers that
> are using the same advisory lock), fetches, let's say, 50 id's of
> records marked as CREATED and changes their status to PROCESSING. Then
> several workers are getting the id's and fetch the needed data from
> the table independently and process and update them in parallel. We
> have 3 java machines getting id's for 10 parallel workers and
> everything works just fine.
>
> Getting the IDs is much much faster usually then real processing.


Weird this thread came back up today. I actually implemented a small
example this morning in Python using Klint Gore's suggestion and was
very happy with the results. I have attached 4 files for anyone
interested in looking at a really simple example. You'll need
Python+psycopg2 driver for this, but anyone should be able to follow
this easily:
    db.py      - Edit this file with your db settings.
    client.py - This would be your worker process
    insert.py - This is used to enter some jobs into the jobs table.
    reset.py  - This is used to reset each job back to 'REQUESTED' status.

This example assumes that you have a jobs table like below:
create table jobs(id serial primary key, status text not null)

First edit db.py with your own db settings.
Fire up as many copies of client.py as you'd like.
Now enter some jobs into jobs table. Running insert.py will enter 100
jobs for you.
Now watch as your clients process the jobs.

Once all of the jobs have finished processing, you can run reset.py to
mark all of the jobs back to 'REQUESTED' status so that the clients
start processing all over again.

I hope it is OK to attach examples! Just seems like this question
comes up often.


Jeff Peck
import psycopg2


host = 'localhost'
db = 'your_db'
user = 'your_username'
password = 'your_pw'
connect_string = "dbname='%s' user='%s' host='%s' password='%s'" % (
    db, user, host, password
    )



def connect_db():
    try:
        connection = psycopg2.connect(connect_string)
    except Exception, e:
        print "Error connection to db!"
        raise
    else:
        print "Successfull connection"
    return connection
import time
import psycopg2
import psycopg2.extensions
import db


def get_new_job_count(connection):
    """ get_new_job_count(connection) -> int

    Returns the number of new jobs found in the jobs table. New jobs have
    a status of 'REQUESTED'
    """
    cursor = connection.cursor()
    cursor.execute("BEGIN TRANSACTION")
    cursor.execute("""
        SELECT count(status)
        FROM   jobs
        WHERE  status = 'REQUESTED'
        """
        )
    count = cursor.fetchall()[0][0]
    cursor.execute("COMMIT")
    return count


def get_new_jobs(connection):
    """ get_new_jobs(connection) -> [(id, status), ....., (id_n, status_n)]

    Returns new jobs found in jobs table. This will return a max of 10 jobs
    at a time.
    """
    jobs = []
    try:
        cursor = connection.cursor()
        cursor.execute("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE")
        cursor.execute("""
            UPDATE jobs
            SET    status = 'PROCESSING'
            WHERE  id IN(
                SELECT id
                FROM   jobs
                WHERE  status = 'REQUESTED'
                LIMIT 10
                )
            RETURNING *        
            """
            )
        jobs = cursor.fetchall()
        cursor.execute("COMMIT")
    except psycopg2.extensions.TransactionRollbackError, e:
        # Another client had the job table locked. Just continue. We will have
        # to retry the call to get_new_jobs again.
        cursor.execute("COMMIT")    
    return jobs


def mark_complete(connection, jobs):
    """ mark_complete(connection, jobs) -> None

    Sets the status flag for each passed in job to COMPLETE.
    """
    cursor = connection.cursor()
    sql = """UPDATE jobs SET status='COMPLETE' WHERE id=%(id)s"""
    update_params = [{'id': job[0]} for job in jobs]
    cursor.execute("BEGIN TRANSACTION")
    cursor.executemany(sql, update_params)
    cursor.execute("COMMIT TRANSACTION")
    

connection = db.connect_db()
my_job_ids = []
collisions = 0
while 1:
    # First see if there are any new jobs entered into the queue
    count = get_new_job_count(connection)
    if not count:
        if len(my_job_ids):
            print "Job queue empty. I processed these jobs:"
            my_job_ids.sort()
            print my_job_ids
            print "I processed %d jobs" % (len(my_job_ids))
            print "I had %d collisions" % (collisions)
            my_job_ids = []
            collisions = 0
        time.sleep(5)
        continue
    else:
        print "%d jobs remaining in queue" % (count)
        jobs = get_new_jobs(connection)
        if not len(jobs):
            print "None found, or my jobs were marked by another client"
            collisions += 1
        else:
            # you would do any processing needed for these jobs here.
            my_job_ids.extend([j[0] for j in jobs])
            #time.sleep(3)   # Lets pretend it takes about 3 secs to process jobs
            mark_complete(connection, jobs)
        time.sleep(1)
import psycopg2
import db


# This controls how many new jobs get inserted by insert_new_jobs function
how_many = 100


def insert_new_jobs(cursor):
    """ insert_new_jobs(cursor) -> None

    This inserts 100 new jobs into jobs job queue table.
    """
    cursor.execute("BEGIN TRANSACTION")
    for i in range(how_many):
        cursor.execute("INSERT INTO jobs(status) VALUES ('REQUESTED')")
    cursor.execute("COMMIT TRANSACTION")


connection = db.connect_db()
cursor = connection.cursor()
insert_new_jobs(cursor)
connection.close()
import psycopg2
import db


def reset_job_table(cursor):
    """ reset_job_table(cursor) -> None

    This sets each job in jobs table to 'REQUESTED' status'
    """
    cursor.execute("BEGIN TRANSACTION")
    cursor.execute("UPDATE jobs set status='REQUESTED'")
    cursor.execute("COMMIT TRANSACTION")


connection = db.connect_db()
cursor = connection.cursor()
reset_job_table(cursor)
connection.close()

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Index of Archives]     [Postgresql Jobs]     [Postgresql Admin]     [Postgresql Performance]     [Linux Clusters]     [PHP Home]     [PHP on Windows]     [Kernel Newbies]     [PHP Classes]     [PHP Books]     [PHP Databases]     [Postgresql & PHP]     [Yosemite]
  Powered by Linux