Use a reservation similar to ftrace to make vmbus ring buffer writes lock free. The algorithm uses cmpxchg to atomically reserve an area in the ring buffer. Then the data is copied into the ring, and the updates to the head of the ring are ordered. Other similar implementions are FreeBSD buf_ring, and DPDK rte_ring. Signed-off-by: Stephen Hemminger <sthemmin@xxxxxxxxxxxxx> --- drivers/hv/ring_buffer.c | 192 +++++++++++++++++++++-------------------------- include/linux/hyperv.h | 1 - 2 files changed, 84 insertions(+), 109 deletions(-) diff --git a/drivers/hv/ring_buffer.c b/drivers/hv/ring_buffer.c index f29981764653..7ca1f750e037 100644 --- a/drivers/hv/ring_buffer.c +++ b/drivers/hv/ring_buffer.c @@ -77,68 +77,6 @@ static void hv_signal_on_write(u32 old_write, struct vmbus_channel *channel) vmbus_setevent(channel); } -/* Get the next write location for the specified ring buffer. */ -static inline u32 -hv_get_next_write_location(struct hv_ring_buffer_info *ring_info) -{ - u32 next = ring_info->ring_buffer->write_index; - - return next; -} - -/* Set the next write location for the specified ring buffer. */ -static inline void -hv_set_next_write_location(struct hv_ring_buffer_info *ring_info, - u32 next_write_location) -{ - ring_info->ring_buffer->write_index = next_write_location; -} - -/* Set the next read location for the specified ring buffer. */ -static inline void -hv_set_next_read_location(struct hv_ring_buffer_info *ring_info, - u32 next_read_location) -{ - ring_info->ring_buffer->read_index = next_read_location; - ring_info->priv_read_index = next_read_location; -} - -/* Get the size of the ring buffer. */ -static inline u32 -hv_get_ring_buffersize(const struct hv_ring_buffer_info *ring_info) -{ - return ring_info->ring_datasize; -} - -/* Get the read and write indices as u64 of the specified ring buffer. */ -static inline u64 -hv_get_ring_bufferindices(struct hv_ring_buffer_info *ring_info) -{ - return (u64)ring_info->ring_buffer->write_index << 32; -} - -/* - * Helper routine to copy from source to ring buffer. - * Assume there is enough room. Handles wrap-around in dest case only!! - */ -static u32 hv_copyto_ringbuffer( - struct hv_ring_buffer_info *ring_info, - u32 start_write_offset, - const void *src, - u32 srclen) -{ - void *ring_buffer = hv_get_ring_buffer(ring_info); - u32 ring_buffer_size = hv_get_ring_buffersize(ring_info); - - memcpy(ring_buffer + start_write_offset, src, srclen); - - start_write_offset += srclen; - if (start_write_offset >= ring_buffer_size) - start_write_offset -= ring_buffer_size; - - return start_write_offset; -} - /* Get various debug metrics for the specified ring buffer. */ void hv_ringbuffer_get_debuginfo(const struct hv_ring_buffer_info *ring_info, struct hv_ring_buffer_debug_info *debug_info) @@ -206,8 +144,6 @@ int hv_ringbuffer_init(struct hv_ring_buffer_info *ring_info, ring_info->ring_datasize = ring_info->ring_size - sizeof(struct hv_ring_buffer); - spin_lock_init(&ring_info->ring_lock); - return 0; } @@ -217,72 +153,112 @@ void hv_ringbuffer_cleanup(struct hv_ring_buffer_info *ring_info) vunmap(ring_info->ring_buffer); } -/* Write to the ring buffer. */ +/* + * Multiple producer lock-free ring buffer write + * + * There are two write locations: when no CPU is writing to ring both + * are equal. + * ring_buffer_info->priv_write_index - next writer's tail offset + * ring_buffer->write_index - reader's tail offset + * + * The write goes through three stages: + * 1. Reserve space in ring buffer for the new data. + * Writer atomically moves priv_write_index. + * 2. Copy the new data into the ring. + * 3. Update the tail of the ring (visible to host) that indicates + * next read location. Writer updates write_index + * + * This function can be safely called from softirq. + */ int hv_ringbuffer_write(struct vmbus_channel *channel, const struct kvec *kv_list, u32 kv_count) { - int i; - u32 bytes_avail_towrite; - u32 totalbytes_towrite = sizeof(u64); - u32 next_write_location; - u32 old_write; - u64 prev_indices; + struct hv_ring_buffer_info *outring = &channel->outbound; + const u32 ring_size = outring->ring_datasize; + void *ring_buffer = hv_get_ring_buffer(outring); + u32 write_offset, totalbytes; + u32 prev_location, next_write_location, write_location; unsigned long flags; - struct hv_ring_buffer_info *outring_info = &channel->outbound; + u64 *prev_indices; + unsigned int i; - if (channel->rescind) + if (unlikely(channel->rescind)) return -ENODEV; + /* Compute total size of the request write */ + totalbytes = sizeof(u64); for (i = 0; i < kv_count; i++) - totalbytes_towrite += kv_list[i].iov_len; - - spin_lock_irqsave(&outring_info->ring_lock, flags); - - bytes_avail_towrite = hv_get_bytes_to_write(outring_info); + totalbytes += kv_list[i].iov_len; - /* - * If there is only room for the packet, assume it is full. - * Otherwise, the next time around, we think the ring buffer - * is empty since the read index == write index. - */ - if (bytes_avail_towrite <= totalbytes_towrite) { - spin_unlock_irqrestore(&outring_info->ring_lock, flags); - return -EAGAIN; - } + /* Don't want to allow softirq to race with this. */ + local_irq_save(flags); - /* Write to the ring buffer */ - next_write_location = hv_get_next_write_location(outring_info); + /* Reserve space in ring */ + do { + u32 read_location = READ_ONCE(outring->ring_buffer->read_index); + u32 bytes_avail; - old_write = next_write_location; + write_location = READ_ONCE(outring->priv_write_index); + bytes_avail = (write_location >= read_location) + ? ring_size - (write_location - read_location) + : read_location - write_location; + /* + * If insufficient space exists at this time + * it is up to caller to retry + */ + if (bytes_avail <= totalbytes) { + local_irq_restore(flags); + return -EAGAIN; + } + + /* If device is being hot-removed fail. */ + if (unlikely(channel->rescind)) { + local_irq_restore(flags); + return -ENODEV; + } + + next_write_location = write_location + totalbytes; + if (next_write_location >= ring_size) + next_write_location -= ring_size; + + /* Atomic update update the next write_index */ + prev_location = cmpxchg(&outring->priv_write_index, + write_location, next_write_location); + + /* Loop until our update wins */ + } while (prev_location != write_location); + + /* Copy new data into place */ + write_offset = write_location; for (i = 0; i < kv_count; i++) { - next_write_location = hv_copyto_ringbuffer(outring_info, - next_write_location, - kv_list[i].iov_base, - kv_list[i].iov_len); + memcpy(ring_buffer + write_offset, + kv_list[i].iov_base, kv_list[i].iov_len); + + write_offset += kv_list[i].iov_len; } /* Set previous packet start */ - prev_indices = hv_get_ring_bufferindices(outring_info); - - next_write_location = hv_copyto_ringbuffer(outring_info, - next_write_location, - &prev_indices, - sizeof(u64)); + prev_indices = ring_buffer + write_offset; + *prev_indices = (u64)write_location << 32; /* Issue a full memory barrier before updating the write index */ virt_mb(); - /* Now, update the write location */ - hv_set_next_write_location(outring_info, next_write_location); - - - spin_unlock_irqrestore(&outring_info->ring_lock, flags); + /* Check in for our reservation. Wait for our turn to update host */ + while (cmpxchg(&outring->ring_buffer->write_index, + write_location, next_write_location) + != write_location) { + cpu_relax(); - hv_signal_on_write(old_write, channel); + if (unlikely(channel->rescind)) { + local_irq_restore(flags); + return -ENODEV; + } + } - if (channel->rescind) - return -ENODEV; + hv_signal_on_write(write_location, channel); + local_irq_restore(flags); return 0; } diff --git a/include/linux/hyperv.h b/include/linux/hyperv.h index e09fc8290c2f..7a14bacf0e31 100644 --- a/include/linux/hyperv.h +++ b/include/linux/hyperv.h @@ -121,7 +121,6 @@ struct hv_ring_buffer { struct hv_ring_buffer_info { struct hv_ring_buffer *ring_buffer; u32 ring_size; /* Include the shared header */ - spinlock_t ring_lock; u32 ring_datasize; /* < ring_size */ u32 ring_data_startoffset; -- 2.11.0 _______________________________________________ devel mailing list devel@xxxxxxxxxxxxxxxxxxxxxx http://driverdev.linuxdriverproject.org/mailman/listinfo/driverdev-devel