For the save direction, this helper listens on a unix socket which QEMU connects to for multifd migration to files. For the restore direction, this helper connects to a unix socket QEMU listens at for multifd migration from files. The file descriptors are passed as command line parameters. Signed-off-by: Claudio Fontana <cfontana@xxxxxxx> --- src/libvirt_private.syms | 1 + src/util/meson.build | 13 ++ src/util/multifd-helper.c | 250 ++++++++++++++++++++++++++++++++++++++ src/util/virthread.c | 5 + src/util/virthread.h | 1 + 5 files changed, 270 insertions(+) create mode 100644 src/util/multifd-helper.c diff --git a/src/libvirt_private.syms b/src/libvirt_private.syms index 97bfca906b..5f2bee985e 100644 --- a/src/libvirt_private.syms +++ b/src/libvirt_private.syms @@ -3427,6 +3427,7 @@ virThreadCreateFull; virThreadID; virThreadIsSelf; virThreadJoin; +virThreadJoinRet; virThreadMaxName; virThreadSelf; virThreadSelfID; diff --git a/src/util/meson.build b/src/util/meson.build index 58001a1699..8ea74ff9e8 100644 --- a/src/util/meson.build +++ b/src/util/meson.build @@ -179,6 +179,12 @@ io_helper_sources = [ 'runio.h', ] +multifd_helper_sources = [ + 'multifd-helper.c', + 'runio.c', + 'runio.h', +] + virt_util_lib = static_library( 'virt_util', [ @@ -216,6 +222,13 @@ if conf.has('WITH_LIBVIRTD') dtrace_gen_headers, ], } + virt_helpers += { + 'name': 'libvirt_multifd_helper', + 'sources': [ + files(multifd_helper_sources), + dtrace_gen_headers, + ], + } endif util_inc_dir = include_directories('.') diff --git a/src/util/multifd-helper.c b/src/util/multifd-helper.c new file mode 100644 index 0000000000..37e61a3a4d --- /dev/null +++ b/src/util/multifd-helper.c @@ -0,0 +1,250 @@ +/* + * multifd-helper.c: listens on Unix socket to perform I/O to multiple files + * + * Copyright (C) 2022 SUSE LLC + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library. If not, see + * <http://www.gnu.org/licenses/>. + * + * This has been written to support QEMU multifd migration to file, + * allowing better use of cpu resources to speed up the save/restore. + */ + +#include <config.h> + +#include <unistd.h> +#include <fcntl.h> +#include <stdlib.h> +#include <sys/types.h> +#include <sys/stat.h> +#include <sys/socket.h> +#include <sys/un.h> + +#include "virthread.h" +#include "virfile.h" +#include "virerror.h" +#include "virstring.h" +#include "virgettext.h" +#include "runio.h" + +#define VIR_FROM_THIS VIR_FROM_STORAGE + +typedef struct _multiFdConnData multiFdConnData; +struct _multiFdConnData { + int clientfd; + int filefd; + int oflags; + const char *path; + virThread tid; + + off_t total; +}; + +typedef struct _multiFdThreadArgs multiFdThreadArgs; +struct _multiFdThreadArgs { + int nchannels; + multiFdConnData *conn; /* contains main fd + nchannels */ + const char *sun_path; /* unix socket name to use for the server */ + struct sockaddr_un serv_addr; + + off_t total; +}; + +static void clientThreadFunc(void *a) +{ + multiFdConnData *c = a; + c->total = runIO(c->path, c->filefd, c->oflags, c->clientfd, c->clientfd); +} + +static off_t waitClientThreads(multiFdConnData *conn, int n) +{ + int i; + off_t total = 0; + for (i = 0; i < n; i++) { + multiFdConnData *c = &conn[i]; + if (virThreadJoinRet(&c->tid) < 0) { + total = -1; + } else if (total >= 0) { + total += c->total; + } + if (VIR_CLOSE(c->clientfd) < 0) { + total = -1; + } + } + return total; +} + +static void loadThreadFunc(void *a) +{ + multiFdThreadArgs *args = a; + int i; + args->total = -1; + + for (i = 0; i < args->nchannels + 1; i++) { + /* Perform outgoing connections */ + multiFdConnData *c = &args->conn[i]; + c->clientfd = socket(AF_UNIX, SOCK_STREAM, 0); + if (c->clientfd < 0) { + virReportSystemError(errno, "%s", _("loadThread: socket() failed")); + goto cleanup; + } + if (connect(c->clientfd, (const struct sockaddr *)&args->serv_addr, + sizeof(struct sockaddr_un)) < 0) { + virReportSystemError(errno, "%s", _("loadThread: connect() failed")); + goto cleanup; + } + if (virThreadCreate(&c->tid, true, &clientThreadFunc, c) < 0) { + virReportSystemError(errno, "%s", _("loadThread: client thread creation failed")); + goto cleanup; + } + } + args->total = waitClientThreads(args->conn, args->nchannels + 1); + + cleanup: + for (i = 0; i < args->nchannels + 1; i++) { + multiFdConnData *c = &args->conn[i]; + VIR_FORCE_CLOSE(c->clientfd); + } +} + +static void saveThreadFunc(void *a) +{ + multiFdThreadArgs *args = a; + int i; + const char buf[1] = {'R'}; + int sockfd; + + if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { + virReportSystemError(errno, "%s", _("saveThread: socket() failed")); + return; + } + unlink(args->sun_path); + if (bind(sockfd, (struct sockaddr *)&args->serv_addr, sizeof(args->serv_addr)) < 0) { + virReportSystemError(errno, "%s", _("saveThread: bind() failed")); + goto cleanup; + } + if (listen(sockfd, args->nchannels + 1) < 0) { + virReportSystemError(errno, "%s", _("saveThread: listen() failed")); + goto cleanup; + } + + /* signal that the server is ready */ + if (safewrite(STDOUT_FILENO, &buf, 1) != 1) { + virReportSystemError(errno, "%s", _("saveThread: safewrite failed")); + goto cleanup; + } + + for (i = 0; i < args->nchannels + 1; i++) { + /* Wait for incoming connection. */ + multiFdConnData *c = &args->conn[i]; + if ((c->clientfd = accept(sockfd, NULL, NULL)) < 0) { + virReportSystemError(errno, "%s", _("saveThread: accept() failed")); + goto cleanup; + } + if (virThreadCreate(&c->tid, true, &clientThreadFunc, c) < 0) { + virReportSystemError(errno, "%s", _("saveThread: client thread creation failed")); + goto cleanup; + } + } + + args->total = waitClientThreads(args->conn, args->nchannels + 1); + + cleanup: + for (i = 0; i < args->nchannels + 1; i++) { + multiFdConnData *c = &args->conn[i]; + VIR_FORCE_CLOSE(c->clientfd); + } + if (VIR_CLOSE(sockfd) < 0) + args->total = -1; +} + +static const char *program_name; + +G_GNUC_NORETURN static void +usage(int status) +{ + if (status) { + fprintf(stderr, _("%s: try --help for more details"), program_name); + } else { + fprintf(stderr, _("Usage: %s UNIX_SOCNAME N MAINFD FD0 FD1 ... FDn"), program_name); + } + exit(status); +} + +int +main(int argc, char **argv) +{ + virThread tid; + virThreadFunc func = saveThreadFunc; + multiFdThreadArgs args = { 0 }; + int i; + + sleep(10); + + program_name = argv[0]; + + if (virGettextInitialize() < 0 || + virErrorInitialize() < 0) { + fprintf(stderr, _("%s: initialization failed"), program_name); + exit(EXIT_FAILURE); + } + + if (argc > 1 && STREQ(argv[1], "--help")) + usage(EXIT_SUCCESS); + if (argc < 4) + usage(EXIT_FAILURE); + + args.sun_path = argv[1]; + if (virStrToLong_i(argv[2], NULL, 10, &args.nchannels) < 0) + fprintf(stderr, _("%s: malformed number of channels N %s"), program_name, argv[2]); + + if (argc < 4 + args.nchannels) + usage(EXIT_FAILURE); + + args.conn = g_new0(multiFdConnData, args.nchannels + 1); + + for (i = 3; i < 3 + args.nchannels + 1; i++) { + multiFdConnData *c = &args.conn[i - 3]; + + if (virStrToLong_i(argv[i], NULL, 10, &c->filefd) < 0) { + fprintf(stderr, _("%s: malformed FD %s"), program_name, argv[i]); + usage(EXIT_FAILURE); + } +#ifndef F_GETFL +#error "multifd-helper requires F_GETFL parameter of fcntl" +#endif + c->oflags = fcntl(c->filefd, F_GETFL); + if ((c->oflags & O_ACCMODE) == O_RDONLY) { + func = loadThreadFunc; + } + } + + /* initialize server address structure */ + memset(&args.serv_addr, 0, sizeof(args.serv_addr)); + args.serv_addr.sun_family = AF_UNIX; + strncpy(args.serv_addr.sun_path, args.sun_path, sizeof(args.serv_addr.sun_path) - 1); + + if (virThreadCreate(&tid, true, func, &args) < 0) { + virReportSystemError(errno, _("%s: failed to create server thread"), program_name); + exit(EXIT_FAILURE); + } + + if (virThreadJoinRet(&tid) < 0) + exit(EXIT_FAILURE); + + if (args.total < 0) + exit(EXIT_FAILURE); + + exit(EXIT_SUCCESS); +} diff --git a/src/util/virthread.c b/src/util/virthread.c index 5422bb74fd..0f6c6a68fa 100644 --- a/src/util/virthread.c +++ b/src/util/virthread.c @@ -348,6 +348,11 @@ void virThreadJoin(virThread *thread) pthread_join(thread->thread, NULL); } +int virThreadJoinRet(virThread *thread) +{ + return pthread_join(thread->thread, NULL); +} + void virThreadCancel(virThread *thread) { pthread_cancel(thread->thread); diff --git a/src/util/virthread.h b/src/util/virthread.h index 23abe0b6c9..5cecb9bd8a 100644 --- a/src/util/virthread.h +++ b/src/util/virthread.h @@ -89,6 +89,7 @@ int virThreadCreateFull(virThread *thread, void virThreadSelf(virThread *thread); bool virThreadIsSelf(virThread *thread); void virThreadJoin(virThread *thread); +int virThreadJoinRet(virThread *thread); size_t virThreadMaxName(void); -- 2.34.1