On Thu, 2009-07-16 at 14:13 +0000, Florian Weimer wrote: > The drawback is that some of the side effects of the INSERT occur > before the constraint check fails, so it seems to me that I still need > to perform the select. I was about to foolishly suggest: Instead of: SELECT 1 FROM x WHERE a = 4; IF NOT FOUND THEN INSERT INTO x (a,b) VALUES (4,10); END IF; trying: INSERT INTO x (a, b) SELECT 4, 10 WHERE NOT EXISTS(SELECT 1 FROM x WHERE a = 4); ... but then realised I couldn't come up with any justification for how it'd help (after all, the WHERE clause still has to be evaluated before the INSERT can proceed, there's still no predicate locking, and the statements can be evaluated concurrently) so I thought I'd test it. The test program, attached, demonstrates what I should've known in the first place. In SERIALIZABLE isolation, the above is *guaranteed* to fail every time there's conflict, because concurrent transactions cannot see changes committed by the others. So is a SELECT test then separate INSERT, by the way. In READ COMITTED you get away with it a lot of the time because the statement can see other transaction(s)' committed changes so the subquery often matches - but it's a race, and eventually you'll hit a situation where the subquery for two concurrent transactions is evaluated before either's insert is issued or at least is committed. In my test program I've managed as many as 1283 steps before two racing READ COMMITTED transactions collide. That's in a program designed to synchronize each transaction before each insert for maximum collision potential. With six racing transactions I've rarely seen more than three steps without a collision. ( I've attached the test program in case it's of any interest. It's a Python test controller that spawns slave processes which it synchronises using Pg's advisory locking. It ensures that the slaves all start each INSERT attempt together, and all finish before starting the next attempt. Signals are used for failure notification, cleanup, etc. ) Anyway, the point is that you're actually worse off in this particular situation thanks to your use of SERIALIZABLE isolation. However, READ COMMITTED just gives you a race you're likely to win most of the time instead of a guaranteed failure whenever there's a race, so it's not really a solution. Given that, it seems to me you'll have to rely on Pg's internal lower-level synchonization around unique indexes. Try the insert and see if it fails, then ROLLBACK TO SAVEPOINT (or use a PL/PgSQL exception block). As you noted, this does mean that certain side-effects may occur, including: - advancement of sequences due to nextval(...) calls - triggers that've done work that can't be rolled back, eg dblink calls, external file writes, inter-process communication etc If you really can't afford the INSERT side effects and can't redesign your code to be tolerant of them, you can always lock the table before an INSERT. If you can't afford to lock the table due to its impact on performance, you can potentially use Pg's advisory locking mechanism to protect your inserts. Eg (PL/PgSQL): PERFORM pg_advisory_lock(4); SELECT 1 FROM x WHERE a = 4; IF NOT FOUND THEN INSERT INTO x (a,b) VALUES (4,10); END IF; PERFORM pg_advisory_unlock(4); (You might want to use the two-argument form of the advisory locking calls if your IDs are INTEGER size not INT8, and use the table oid for the first argument.) If every possible INSERTer ensures it holds the lock on the id of interest before inserting, you'll be fine. Yes, it's ugly, but it preserves concurrent insert performance while eliminating failed INSERTs. A possible way to ensure that every possible INSERTer does do the right thing is to drop the INSERT privilege on the table and then use a SECURITY DEFINER function that checks the caller's rights and does the INSERT. Also: Is this really a phantom read? Your issue is not that you read a record that then vanishes or no longer matches your filter criteria; rather, it's that a record is created that matches your criteria after you tested for it. Certainly that wouldn't be possible if the concurrent transactions were actually executed serially, but does the standard actually require that this be the case? If it does, then compliant implementations would have to do predicate locking. Ouch. Does anybody do that? It seems MS-SQL implements very limited predicate locking (enough to handle your issue) but not enough to tackle aggregates or anything complex. -- Craig Ringer
#!/usr/bin/env python import psycopg2 import sys import time import os from psycopg2.extensions import ( ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_SERIALIZABLE, STATUS_BEGIN, STATUS_READY) import signal import subprocess import random ######## CONFIGURATION ########## debug = False serializable = False ################################# conn = psycopg2.connect("") if serializable: conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) else: conn.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED) children = [] def main_slave(slave_number, master_pid): print "SLAVE[%02s] Spawned" % slave_number n = 0 # Acquire the first finish lock curs = conn.cursor() if debug: print "SLAVE[%02s] LS(1,0)" % slave_number curs.execute("SELECT pg_advisory_lock_shared(1,0);") # Register our successful start to the controller curs.execute("INSERT INTO control (slave) VALUES (%s);" % slave_number) conn.commit(); print "SLAVE[%02s] Started" % slave_number # We enter this loop holding the finish lock for n, and no other locks. while True: # Aqcuire the finish lock for the next record pre-emptively if debug: print "SLAVE[%02s] LS(1,%s)" % (slave_number, (n+1)) curs.execute("SELECT pg_advisory_lock_shared(1,%s);" % (n+1)); # Then wait on the start lock for our current record. if debug: print "SLAVE[%02s] LS(0,%s)" % (slave_number, n) curs.execute("SELECT pg_advisory_lock_shared(0,%s);" % n) # The controller let us off the leash. Race! try: curs.execute("INSERT INTO x (a, slave) SELECT %s, %s WHERE NOT EXISTS (SELECT 1 FROM x WHERE a = %s);" % (n,slave_number,n)) conn.commit(); except psycopg2.IntegrityError, e: print "SLAVE[%02s]: Integrity error inserting %s!" % (slave_number, n) print e # SIGUSR1 the master os.kill( int(master_pid), signal.SIGUSR1) # and terminate. exit(2) # Release the start lock now we're done with it. if debug: print "SLAVE[%02s] US(0,%s)" % (slave_number, n) curs.execute("SELECT pg_advisory_unlock_shared(0,%s);" % n); # Done with n. Tell the controller about our success so it can go ahead # and unlock the next record. if debug: print "SLAVE[%02s] US(1,%s)" % (slave_number, n) curs.execute("SELECT pg_advisory_unlock_shared(1,%s);" % n) n += 1 def master_start_slaves(nslaves): for n in range(0, nslaves): child = subprocess.Popen([sys.argv[0], "slave", str(n), str(os.getpid())], shell=False, stdin=None, stdout=None, stderr=None, close_fds=True) children.append(child) def master_wait_for_slaves(curs, nslaves): while True: curs.execute("SELECT count(slave) from control;") ret = curs.fetchone() if ret is not None and ret[0] == nslaves: break; else: conn.rollback() time.sleep(0.01) def master_num_alive_slaves(): alive = 0 for child in children: child.poll() if child.returncode is None: alive += 1 return alive def master_sigusr1_handler(signum, frame): """We get SIGUSR1 from a slave when the test fails. We should terminate our children and then ourselves.""" print "MASTER: Test failed, terminating children" master_killall_slaves() print "MASTER: Test failed, terminating" sys.exit(2) def master_killall_slaves(): for child in children: child.poll() if child.returncode is None: try: os.kill(child.pid, signal.SIGTERM) except: pass time.sleep(0.1) if master_num_alive_slaves(): time.sleep(1) for child in children: child.poll() if child.returncode is None: try: os.kill(child.pid, signal.SIGKILL) except: pass def main_master(): """Test controller. This keeps the inserters in lockstep by forcing them to wait on an advisory lock for the id they're about to try to insert.""" nslaves = 2 # We use 0,n advisory locks for start locks. These locks # stop the slaves starting to insert "n" until the lock is # released by the controller. # # 1,n locks are finish locks. They're acquired by the slaves # as share locks before trying to get the finish lock, and only # released once the INSERT is done. The controller must acquire # this lock exclusively before proceeding to unlock the next # start lock. signal.signal(signal.SIGUSR1, master_sigusr1_handler) curs = conn.cursor() curs.execute("TRUNCATE TABLE x;") curs.execute("TRUNCATE TABLE control;") conn.commit() n = 0; # Take the first start lock so the slaves don't race off as soon # as they're started. if debug: print "MASTER: L(0,%s)" % n curs.execute("SELECT pg_advisory_lock(0,%s);" % n) # Start slaves print "MASTER: Spawning slaves..." master_start_slaves(nslaves); print "MASTER: slaves spawned, waiting for them to report in." # Wait until they've all reported in master_wait_for_slaves(curs, nslaves); print "MASTER: Slaves have reported in. Starting test." # and start them going. State on loop entry is start lock # for "n" is held, no other locks held. while True: # Lock the record they're going to try to insert # once we let them loose on this one. This keeps # them from running ahead. if debug: print "MASTER: L(0,%s)" % (n+1) curs.execute("SELECT pg_advisory_lock(0,%s);" % (n+1)) # Unlock the current record, letting them try to # insert. if debug: print "MASTER: U(0,%s): Slaves inserting %s" % (n,n) curs.execute("SELECT pg_advisory_unlock(0,%s);" % n) # Attempt to acquire the finish lock on the record we just # let the slaves insert. We'll only be able to obtain it once # all slaves have released their finish locks. if debug: print "MASTER: L(1,%s)" % n curs.execute("SELECT pg_advisory_lock(1,%s)" % n); # Release it again, since we know they're all done. if debug: print "MASTER: U(1,%s): Slaves finished inserting %s" % (n,n) curs.execute("SELECT pg_advisory_unlock(1,%s)" % n); # and get ready for the next run. n += 1 sys.stdout.write(" %s" % n); sys.stdout.flush() if not master_num_alive_slaves() == nslaves: print "One or more slaves appear to have died. Terminating the test" master_killall_slaves() sys.exit(3) if len(sys.argv) > 1: if sys.argv[1] == "slave": main_slave(sys.argv[2], sys.argv[3]) elif sys.argv[1] == "master": main_master() else: print "Usage: %s master|slave" % sys.argv[0] sys.exit(1); else: main_master()
-- Sent via pgsql-general mailing list (pgsql-general@xxxxxxxxxxxxxx) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-general