Re: [PATCH 09/12] ring: introduce lockless ring buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 




Hi Michael,

On 06/20/2018 08:38 PM, Michael S. Tsirkin wrote:
On Mon, Jun 04, 2018 at 05:55:17PM +0800, guangrong.xiao@xxxxxxxxx wrote:
From: Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx>



(1) https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/include/linux/kfifo.h
(2) http://dpdk.org/doc/api/rte__ring_8h.html

Signed-off-by: Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx>

So instead of all this super-optimized trickiness, how about
a simple port of ptr_ring from linux?

That one isn't lockless but it's known to outperform
most others for a single producer/single consumer case.
And with a ton of networking going on,
who said it's such a hot spot? OTOH this implementation
has more barriers which slows down each individual thread.
It's also a source of bugs.


Thank you for pointing it out.

I just quickly went through the code of ptr_ring that is very nice and
really impressive. I will consider to port it to QEMU.

Further, atomic tricks this one uses are not fair so some threads can get
completely starved while others make progress. There's also no
chance to mix aggressive polling and sleeping with this
kind of scheme, so the starved thread will consume lots of
CPU.

So I'd like to see a simple ring used, and then a patch on top
switching to this tricky one with performance comparison
along with that.


I agree with you, i will make a version that uses a lock for multiple
producers and doing incremental optimizations based on it.

---
  migration/ring.h | 265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
  1 file changed, 265 insertions(+)
  create mode 100644 migration/ring.h

diff --git a/migration/ring.h b/migration/ring.h
new file mode 100644
index 0000000000..da9b8bdcbb
--- /dev/null
+++ b/migration/ring.h
@@ -0,0 +1,265 @@
+/*
+ * Ring Buffer
+ *
+ * Multiple producers and single consumer are supported with lock free.
+ *
+ * Copyright (c) 2018 Tencent Inc
+ *
+ * Authors:
+ *  Xiao Guangrong <xiaoguangrong@xxxxxxxxxxx>
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#ifndef _RING__
+#define _RING__

Prefix Ring is too short.


Okay, will improve it.

+    atomic_set(&ring->data[index], NULL);
+
+    /*
+     * (B) smp_mb() is needed as we should read the entry out before
+     * updating ring->out as we did in __ring_get().
+     *
+     * (A) smp_wmb() is needed as we should make the entry be NULL before
+     * updating ring->out (which will make the entry be visible and usable).
+     */

I can't say I understand this all.
And the interaction of acquire/release semantics with smp_*
barriers is even scarier.


Hmm... the parallel accesses for these two indexes and the data stored
in the ring are subtle indeed. :(

+    atomic_store_release(&ring->out, ring->out + 1);
+
+    return data;
+}
+
+static inline int ring_put(Ring *ring, void *data)
+{
+    if (ring->flags & RING_MULTI_PRODUCER) {
+        return ring_mp_put(ring, data);
+    }
+    return __ring_put(ring, data);
+}
+
+static inline void *ring_get(Ring *ring)
+{
+    if (ring->flags & RING_MULTI_PRODUCER) {
+        return ring_mp_get(ring);
+    }
+    return __ring_get(ring);
+}
+#endif


A bunch of tricky barriers retries etc all over the place.  This sorely
needs *a lot of* unit tests. Where are they?

I used the code attached in this mail to test & benchmark the patches during
my development which does not dedicate for Ring, instead it is based
on the framework of compression.

Yes, test cases are useful and really needed, i will do it... :)

#include "qemu/osdep.h"

#include "libqtest.h"
#include <zlib.h>

#include "qemu/osdep.h"
#include <zlib.h>
#include "qemu/cutils.h"
#include "qemu/bitops.h"
#include "qemu/bitmap.h"
#include "qemu/main-loop.h"
#include "migration/ram.h"
#include "migration/migration.h"
#include "migration/register.h"
#include "migration/misc.h"
#include "migration/page_cache.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
#include "qapi/qapi-events-migration.h"
#include "qapi/qmp/qerror.h"
#include "trace.h"
//#include "exec/ram_addr.h"
#include "exec/target_page.h"
#include "qemu/rcu_queue.h"
#include "migration/colo.h"
#include "migration/block.h"
#include "migration/threads.h"

#include "migration/qemu-file.h"
#include "migration/threads.h"

CompressionStats compression_counters;

#define PAGE_SIZE 4096
#define PAGE_MASK ~(PAGE_SIZE - 1)

static ssize_t test_writev_buffer(void *opaque, struct iovec *iov, int iovcnt,
                                   int64_t pos)
{
    int i, size = 0;

    for (i = 0; i < iovcnt; i++) {
        size += iov[i].iov_len;
    }
    return size;
}

static int test_fclose(void *opaque)
{
    return 0;
}

static const QEMUFileOps test_write_ops = {
    .writev_buffer  = test_writev_buffer,
    .close          = test_fclose
};

QEMUFile *dest_file;

static const QEMUFileOps empty_ops = { };

static int do_compress_ram_page(QEMUFile *f, z_stream *stream, uint8_t *ram_addr,
                                ram_addr_t offset, uint8_t *source_buf)
{
    int bytes_sent = 0, blen;
    uint8_t *p = ram_addr;

    /*
     * copy it to a internal buffer to avoid it being modified by VM
     * so that we can catch up the error during compression and
     * decompression
     */
    memcpy(source_buf, p, PAGE_SIZE);
    blen = qemu_put_compression_data(f, stream, source_buf, PAGE_SIZE);
    if (blen < 0) {
        bytes_sent = 0;
        qemu_file_set_error(dest_file, blen);
        error_report("compressed data failed!");
    } else {
        printf("Compressed size %d.\n", blen);
        bytes_sent += blen;
    }

    return bytes_sent;
}

struct CompressData {
    /* filled by migration thread.*/
    uint8_t *ram_addr;
    ram_addr_t offset;

    /* filled by compress thread. */
    QEMUFile *file;
    z_stream stream;
    uint8_t *originbuf;

    ThreadRequest data;
};
typedef struct CompressData CompressData;

static ThreadRequest *compress_thread_data_init(void)
{
    CompressData *cd = g_new0(CompressData, 1);

    cd->originbuf = g_try_malloc(PAGE_SIZE);
    if (!cd->originbuf) {
        goto exit;
    }

    if (deflateInit(&cd->stream, 1) != Z_OK) {
        g_free(cd->originbuf);
        goto exit;
    }

    cd->file = qemu_fopen_ops(NULL, &empty_ops);
    return &cd->data;

exit:
    g_free(cd);
    return NULL;
}

static void compress_thread_data_fini(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);

    qemu_fclose(cd->file);
    deflateEnd(&cd->stream);
    g_free(cd->originbuf);
    g_free(cd);
}

static void compress_thread_data_handler(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);

    /*
     * if compression fails, it will indicate by
     * migrate_get_current()->to_dst_file.
     */
    do_compress_ram_page(cd->file, &cd->stream, cd->ram_addr, cd->offset,
                         cd->originbuf);
}

static void compress_thread_data_done(ThreadRequest *data)
{
    CompressData *cd = container_of(data, CompressData, data);
    int bytes_xmit;

    bytes_xmit = qemu_put_qemu_file(dest_file, cd->file);
    /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
    compression_counters.reduced_size += 4096 - bytes_xmit + 8;
    compression_counters.pages++;
}

static Threads *compress_threads;

static void flush_compressed_data(void)
{
    threads_wait_done(compress_threads);
}

static void compress_threads_save_cleanup(void)
{
    if (!compress_threads) {
        return;
    }

    threads_destroy(compress_threads);
    compress_threads = NULL;
    qemu_fclose(dest_file);
    dest_file = NULL;
}

static int compress_threads_save_setup(void)
{
    dest_file = qemu_fopen_ops(NULL, &test_write_ops);
    compress_threads = threads_create(16,
                                      "compress",
                                      compress_thread_data_init,
                                      compress_thread_data_fini,
                                      compress_thread_data_handler,
                                      compress_thread_data_done);
    assert(compress_threads);
    return 0;
}

static int compress_page_with_multi_thread(uint8_t *addr)
{
    CompressData *cd;
    ThreadRequest *thread_data;
    thread_data = threads_submit_request_prepare(compress_threads);
    if (!thread_data) {
        compression_counters.busy++;
        return -1;
    }

    cd = container_of(thread_data, CompressData, data);
    cd->ram_addr = addr;
    threads_submit_request_commit(compress_threads, thread_data);
    return 1;
}

#define MEM_SIZE (30ULL << 30)
#define COUNT    5 

static void run(void)
{
    void *mem = qemu_memalign(PAGE_SIZE, MEM_SIZE);
    uint8_t *ptr = mem, *end = mem + MEM_SIZE;
    uint64_t start_time, total_time = 0, spend, total_busy = 0;
    int i;

    memset(mem, 0, MEM_SIZE);

    start_time = g_get_monotonic_time();
    for (i = 0; i < COUNT; i++) {
        ptr = mem;
	start_time = g_get_monotonic_time();
        while (ptr < end) {
            *ptr = 0x10;
            compress_page_with_multi_thread(ptr);
            ptr += PAGE_SIZE;
        }
        flush_compressed_data();
	spend = g_get_monotonic_time() - start_time;
	total_time += spend;
	printf("RUN %d: BUSY %ld Time Cost %ld.\n", i, compression_counters.busy, spend);
	total_busy += compression_counters.busy;
	compression_counters.busy = 0;
    }

    printf("AVG: BUSY %ld Time Cost %ld.\n", total_busy / COUNT, total_time / COUNT);
}

static void compare_zero_and_compression(void)
{
    ThreadRequest *data = compress_thread_data_init();
    CompressData *cd;
    uint64_t start_time, zero_time, compress_time;
    char page[PAGE_SIZE];

    if (!data) {
        printf("Init compression failed.\n");
        return;
    }

    cd = container_of(data, CompressData, data);
    cd->ram_addr = (uint8_t *)page;

    memset(page, 0, sizeof(page));
    dest_file = qemu_fopen_ops(NULL, &test_write_ops);

    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
    buffer_is_zero(page, PAGE_SIZE);
    zero_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;

    start_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
    compress_thread_data_handler(data);
    compress_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time;

    printf("Zero %ld ns Compression: %ld ns.\n", zero_time, compress_time);
    compress_thread_data_fini(data);

}

static void migration_threads(void)
{
    int i;

    printf("Zero Test vs. compression.\n");
    for (i = 0; i < 10; i++) {
        compare_zero_and_compression();
    }

    printf("test migration threads.\n");
    compress_threads_save_setup();
    run();
    compress_threads_save_cleanup();
}

int main(int argc, char **argv)
{
    QTestState *s = NULL;
    int ret;

    g_test_init(&argc, &argv, NULL);

    qtest_add_func("/migration/threads", migration_threads);
    ret = g_test_run();

    if (s) {
        qtest_quit(s);
    }

    return ret;
}


[Index of Archives]     [KVM ARM]     [KVM ia64]     [KVM ppc]     [Virtualization Tools]     [Spice Development]     [Libvirt]     [Libvirt Users]     [Linux USB Devel]     [Linux Audio Users]     [Yosemite Questions]     [Linux Kernel]     [Linux SCSI]     [XFree86]

  Powered by Linux