On Wed, Aug 24, 2011 at 10:12:36PM +0100, Daniel P. Berrange wrote: > On Tue, Aug 23, 2011 at 05:53:35PM +0200, Michal Privoznik wrote: > > On 23.08.2011 14:23, Daniel P. Berrange wrote: > > > On Tue, Aug 16, 2011 at 06:39:10PM +0200, Michal Privoznik wrote: > > >> diff --git a/src/util/threadpool.c b/src/util/threadpool.c > > >> index 8217591..ad2d249 100644 > > >> --- a/src/util/threadpool.c > > >> +++ b/src/util/threadpool.c > > >> @@ -185,27 +185,41 @@ void virThreadPoolFree(virThreadPoolPtr pool) > > >> VIR_FREE(pool); > > >> } > > >> > > >> +/* > > >> + * @only_if_free - place job in pool iff there is > > >> + * a free worker(s). > > >> + * > > >> + * Return: 0 on success, > > >> + * -1 if no free worker available but requested > > >> + * -2 otherwise > > >> + */ > > >> int virThreadPoolSendJob(virThreadPoolPtr pool, > > >> + bool only_if_free, > > >> void *jobData) > > >> { > > >> virThreadPoolJobPtr job; > > >> + int ret = -2; > > >> > > >> virMutexLock(&pool->mutex); > > >> if (pool->quit) > > >> goto error; > > >> > > >> - if (pool->freeWorkers == 0 && > > >> - pool->nWorkers < pool->maxWorkers) { > > >> - if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { > > >> - virReportOOMError(); > > >> - goto error; > > >> - } > > >> + if (pool->freeWorkers == 0) { If we added another counter 'pool->jobQueueDepth', changed whenever a job is added or remove from the queue, then I think we could get the correct semantics by doing if ((pool->freeWorkers - pool->jobQueueDepth) <= 0) { ... } > > >> + if (pool->nWorkers < pool->maxWorkers) { > > >> + if (VIR_EXPAND_N(pool->workers, pool->nWorkers, 1) < 0) { > > >> + virReportOOMError(); > > >> + goto error; > > >> + } > > >> > > >> - if (virThreadCreate(&pool->workers[pool->nWorkers - 1], > > >> - true, > > >> - virThreadPoolWorker, > > >> - pool) < 0) { > > >> - pool->nWorkers--; > > >> + if (virThreadCreate(&pool->workers[pool->nWorkers - 1], > > >> + true, > > >> + virThreadPoolWorker, > > >> + pool) < 0) { > > >> + pool->nWorkers--; > > >> + goto error; > > >> + } > > >> + } else if (only_if_free) { > > >> + ret = -1; > > >> goto error; > > >> } > > > > > > I don't think this check is correct, because it is only checking > > > the free workers, against the current/max workers. It is not > > > taking into account the possibility that there are queued jobs > > > which have not yet been served. So it might look like there is > > > a free worker, but there is already a pending job which could > > > consume it. > > > > I don't agree. Currently we allow expanding of pool only when placing a > > job into pool. During placing, pool is locked, so freeWorkers variable > > cannot change. Assume freeWorkers == 0; As soon as we can't expand the > > pool, job will wait on queue. So we may decide if we want to place job > > or not. If freeWorkers is not zero, there can't be any job on the queue, > > because it would be immediately taken by a free worker. > > The problem I'm seeing involves a sequence of two calls to > virThreadPoolSendJob(). Most of the time we will expect > a liner sequence of virThreadPoolSendJob calls, which will > result in something like this set of steps: > > * Initial condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==0 > > * Thread 1 calls virThreadPoolSendJob() > 1. Acquires lock > 2. Queues job > 3. Notifies condition > 4. Releases lock > > * Current condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==1 > > * Thread 2 is a worker thread > 1. Woken up from condition wait > 2. Acquires lock > 3. Decrements freeWorkers > 5. Releases lock > 6. Starts processing job > > * Current condition: freeWorkers==0, nWorkers==1, maxWorkers==5, queuedJobs==0 > > * Thread 3 calls virThreadPoolSendJob() > 1. Acquires lock > 2. Spawns new worker thread > 3. Queues job > 4. Notifies condition > 5. Releases lock > > * Current condition: freeWorkers==0, nWorkers==2, maxWorkers==5, queuedJobs==1 > > * Thread 4 is a new worker thread > 1. Woken up from condition wait > 2. Acquires lock > 3. Decrements freeWorkers > 5. Releases lock > 6. Starts processing job > > * Final condition: freeWorkers==0, nWorkers==2, maxWorkers==5, queuedJobs==0 > > > But now consider what happens if two calls to virThreadPoolSendJob > arrive concurrently. It is possible thread 3 will acquire the lock > before thread 2 does. This results in the following set of steps: > > * Initial condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==0 > > * Thread 1 calls virThreadPoolSendJob() > 1. Acquires lock > 2. Queues job > 3. Notifies condition > 4. Releases lock > > * Current condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==1 > > * Thread 3 calls virThreadPoolSendJob() > 1. Acquires lock > 2. Queues job > 3. Notifies condition > 4. Releases lock > > * Current condition: freeWorkers==1, nWorkers==1, maxWorkers==5, queuedJobs==2 > > * Thread 2 is a worker thread > 1. Woken up from condition wait > 2. Acquires lock > 3. Decrements freeWorkers > 5. Releases lock > 6. Starts processing job > > * Final condition: freeWorkers==0, nWorkers==1, maxWorkers==5, queuedJobs==1 > > So we end up with 2 jobs and only one worker thread to process > them. If that second job was a high priority job, it would be > queued in the normal queue because 'freeWorkers' was still 1, > even though there was a job in the queue ahead of it, so the > effective 'freeWorkers' was 0. > > This is actually an existing bug in the virThreadPool code for queuing > jobs, but it was harmless. When we start to differentiate between low > and high priority jobs, then this bug becomes active and can result in > a high priority job being blocked by low priority ones. Daniel -- |: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :| |: http://libvirt.org -o- http://virt-manager.org :| |: http://autobuild.org -o- http://search.cpan.org/~danberr/ :| |: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :| -- libvir-list mailing list libvir-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/libvir-list