On 15.11.18, 00:35, "Jim Fehlig" <jfehlig@xxxxxxxx> wrote: On 11/12/18 8:12 AM, Michael Trapp wrote: > At the vhostmd side virtio channels are Unix domain sockets from QEMU > which are created during a VM start and removed when the VM is stopped. > > Basically this implementation > - monitors a directory for new virtio channels (channel names contain VM UUID) > - for valid UUIDs, also known by libvirtd, it connects to the UDS > - buffers VM/HOST metrics and handles request/response on the sockets > > It provides the functions > virtio_init() > init function of virtio layer > virtio_run() > the start_routine for the virtio-thread to handle the virtio based > communication > virtio_metrics_update() > called for each VM/HOST metrics update to update the virtio internal > metrics buffer. > virtio_stop() > stop the virtio-thread > > --- > Here is a brief description of the concept of vhostmd / virtio interaction > > Vhostmd calls the virtio API functions > > *** virtio API *** > > --> virtio_init() > Initialize the virtio layer. > Called once before the virtio thread starts. > > --> virtio_run() > Start routine of the virtio thread. > > --> virtio_stop() > Set virtio_status to stop the virtio thread. > > --> virtio_metrics_update() > Add/update the metric buffer of a VM/host. > It must be called for every change of VM/host metrics. > > *** virtio internal *** > > virtio internal code runs in the virtio thread - see virtio_run(). > > Access to mbuffers within the virtio thread is > - read the mbuffer content > see vio_channel_update() -> vio_mbuffer_find() > - check if a VM is 'known' by vhostmd > see vio_channel_readdir() -> vio_mbuffer_find() > - expire mbuffers > see virtio_run() -> vio_mbuffer_expire() > Expiration timeout is >= (3 * 'vhostmd update_period') > A VM expires when vhostmd did not receive a update for > 'expiration_time' seconds. > > The mbuffer (metrics buffer) structs of VMs and host are maintained in > a btree (mbuffer.root). > Every mbuffer access is exclusive - see mbuffer_mutex. > > *** tests *** > > So far I've tested vhostmd with virtio support in a setup with 100 alpine Vms, > each VM continiously polling the metrics every 5sec, for several hours. > To have a more dynamic test environment all VMs were stopped/started > several times. > > Beside the dependencies to the vu_buffer functions, virtio code does not call > any libvirt functions and can also be run/tested without vhostmd and libvirt. > I've also checked this variant in combination with valgrind. > But at the moment the required test code and the UDS server program is not > part of this patch. > > include/util.h | 4 + > include/virtio.h | 50 +++ > vhostmd/virtio.c | 900 +++++++++++++++++++++++++++++++++++++++++++++++ > 3 files changed, 954 insertions(+) > create mode 100644 include/virtio.h > create mode 100644 vhostmd/virtio.c > > diff --git a/include/util.h b/include/util.h > index c0bd19a..17ff09c 100644 > --- a/include/util.h > +++ b/include/util.h > @@ -26,8 +26,12 @@ > > #ifdef __GNUC__ > #define ATTRIBUTE_UNUSED __attribute__((unused)) > +#define ATTRIBUTE_OPTIMIZE_O0 __attribute__((optimize("O0"))) > +#define ATTRIBUTE_NOINLINE __attribute__((noinline())) > #else > #define ATTRIBUTE_UNUSED > +#define ATTRIBUTE_OPTIMIZE_O0 > +#define ATTRIBUTE_NOINLINE > #endif > > typedef enum { > diff --git a/include/virtio.h b/include/virtio.h > new file mode 100644 > index 0000000..b10dab5 > --- /dev/null > +++ b/include/virtio.h > @@ -0,0 +1,50 @@ > +/* > + * Copyright (C) 2018 SAP SE > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, write to the Free Software > + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA > + * > + * Author: Michael Trapp <michael.trapp@xxxxxxx> > + */ > + > +#ifndef __VIRTIO_H__ > +#define __VIRTIO_H__ > + > +/* > + * Initialize virtio layer > + */ > +int virtio_init(const char *virtio_path, > + int max_channel, > + int expiration_period); > + > +/* > + * Main virtio function > + * 'start_routine' of pthread_create() > + */ > +void *virtio_run(void *arg); > + > +/* > + * Update the metrics response buffer of a VM/host > + */ > +int virtio_metrics_update(const char *buf, > + int len, > + const char *uuid, > + metric_context ctx); > + > +/* > + * Stop virtio thread > + */ > +void virtio_stop(void); > + > +#endif /* __VIRTIO_H__ */ > diff --git a/vhostmd/virtio.c b/vhostmd/virtio.c > new file mode 100644 > index 0000000..f5e6306 > --- /dev/null > +++ b/vhostmd/virtio.c > @@ -0,0 +1,900 @@ > +/* > + * Copyright (C) 2018 SAP SE > + * > + * This library is free software; you can redistribute it and/or > + * modify it under the terms of the GNU Lesser General Public > + * License as published by the Free Software Foundation; either > + * version 2.1 of the License, or (at your option) any later version. > + * > + * This library is distributed in the hope that it will be useful, > + * but WITHOUT ANY WARRANTY; without even the implied warranty of > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU > + * Lesser General Public License for more details. > + * > + * You should have received a copy of the GNU Lesser General Public > + * License along with this library; if not, write to the Free Software > + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA > + * > + * Author: Michael Trapp <michael.trapp@xxxxxxx> > + */ > + > +#include <config.h> > + > +#include <errno.h> > +#include <sys/un.h> > +#include <sys/socket.h> > +#include <sys/epoll.h> > +#include <unistd.h> > +#include <fcntl.h> > +#include <search.h> > +#include <dirent.h> > +#include <pthread.h> > +#include <libvirt/libvirt.h> > + > +#include "util.h" > +#include "metric.h" > +#include "virtio.h" > + > + > +#define DEFAULT_VU_BUFFER_SIZE 1024 > +#define VIRTIO_PREFIX_LEN 21UL > +#define VIRTIO_NAME_BUFLEN (VIRTIO_PREFIX_LEN + VIR_UUID_STRING_BUFLEN) > + > + > +typedef struct { > + int fd; > + char uuid[VIR_UUID_STRING_BUFLEN]; > + vu_buffer *request; > + vu_buffer *response; > +} vio_channel; > + > +typedef struct { > + char uuid[VIR_UUID_STRING_BUFLEN]; > + time_t last_update; > + vu_buffer *xml; > +} vio_mbuffer; > + > +typedef enum { > + REQ_INCOMPLETE, > + REQ_INVALID, > + REQ_GET_XML > +} REQUEST_T; > + > + > +static vu_buffer *mbuffer_host = NULL; > +static volatile void *mbuffer_root = NULL; > +static int mbuffer_max_num = 0; > +static volatile int mbuffer_count = 0; > +#ifdef ENABLE_DEBUG > +static int mbuffer_idx = 0; > +#endif > +static time_t mbuffer_exp_period = 0; > +static time_t mbuffer_exp_ts = 0; > + > +static void *channel_root = NULL; > +static char *channel_path = NULL; > +static const char *channel_prefix = "org.github.vhostmd.1."; > +static int channel_max_num = 0; > +static int channel_count = 0; > + > +static int epoll_fd = -1; > +static struct epoll_event *epoll_events = NULL; > +static const size_t max_virtio_path_len = > + sizeof(((struct sockaddr_un *) 0)->sun_path) - 1; > +static pthread_mutex_t mbuffer_mtx; > + > +static enum { > + VIRTIO_INIT, > + VIRTIO_ACTIVE, > + VIRTIO_STOP, > + VIRTIO_ERROR > +} virtio_status = VIRTIO_INIT; > + > + > +/* > + * static functions > + */ > + > +static int vio_channel_compare(const void *a, const void *b); > +static void vio_channel_delete(const void *node, VISIT which, int depth); > + > +static vio_channel *vio_channel_find(const char *uuid); > +static vio_channel *vio_channel_add(const char *uuid); > +static int vio_channel_open(vio_channel * c); > +static void vio_channel_close(vio_channel * c); > +static int vio_channel_update(vio_channel * c); > +static int vio_channel_readdir(const char *path); > +static void vio_channel_recv(vio_channel * c); > +static void vio_channel_send(vio_channel * c, uint32_t ep_event); > + > +static int vio_mbuffer_compare(const void *a, const void *b); > +static void vio_mbuffer_delete(const void *node, VISIT which, int depth); > +static void vio_mbuffer_expire(const void *node, VISIT which, int depth); > +static void **vio_mbuffer_get_root(void); > +#ifdef ENABLE_DEBUG > +static void vio_mbuffer_print(const void *node, VISIT which, int depth); > +#endif > + > +static vio_mbuffer *vio_mbuffer_find(const char *uuid); > +static vio_mbuffer *vio_mbuffer_add(const char *uuid); > +static REQUEST_T vio_check_request(vio_channel * c); > + > +static void vio_handle_io(unsigned epoll_wait_ms); > + > +int virtio_cleanup(void); > +/* > + * update response buffer of a channel > + */ > +static int vio_channel_update(vio_channel * c) > +{ > + static const char *metrics_start_str = "<metrics>\n"; > + static const char *metrics_end_str = "</metrics>\n\n"; > + > + int rc = 0; > + vio_mbuffer *b = NULL; > + > + if (c == NULL) > + return -1; > + > + vu_buffer_erase(c->response); > + vu_buffer_add(c->response, metrics_start_str, -1); > + > + pthread_mutex_lock(&mbuffer_mtx); > + > + if (mbuffer_host->content && mbuffer_host->use) > + vu_buffer_add(c->response, mbuffer_host->content, -1); > + else > + vu_buffer_add(c->response, "host metrics not available", -1); > + > + b = vio_mbuffer_find(c->uuid); > + if (b && b->xml->use) > + vu_buffer_add(c->response, b->xml->content, -1); > + else > + rc = -1; > + > + pthread_mutex_unlock(&mbuffer_mtx); > + > + vu_buffer_add(c->response, metrics_end_str, -1); > + > +#ifdef ENABLE_DEBUG > + vu_log(VHOSTMD_DEBUG, "New response for %s (%u)\n>>>%s<<<\n", > + c->uuid, c->response->use, c->response->content); > +#endif > + return rc; > +} > + > +/* > + * close channel and free allocated buffers > + */ > +static void vio_channel_close(vio_channel * c) > +{ > + if (c != NULL) { > + channel_count--; > + > + if (c->fd >= 0) { > + vu_log(VHOSTMD_INFO, "Closed channel '%s%s%s' (%d/%d)", > + channel_path, channel_prefix, c->uuid, channel_count, > + channel_max_num); > + close(c->fd); > + } > + > + if (c->request) > + vu_buffer_delete(c->request); > + if (c->response) > + vu_buffer_delete(c->response); > + > + tdelete((const void *) c, &channel_root, vio_channel_compare); > + free(c); > + } > +} > + > +/* > + * connect channel and add the socket to the epoll desriptor > + */ > +static int vio_channel_open(vio_channel * c) > +{ > + struct sockaddr_un address; > + const size_t max_path_len = > + sizeof(((struct sockaddr_un *) 0)->sun_path) - 1; > + struct epoll_event evt; > + int len = (int) (strlen(channel_path) + VIRTIO_PREFIX_LEN + strlen(c->uuid)); > + int flags; > + > + bzero(&address, sizeof(address)); > + address.sun_family = AF_LOCAL; > + > + if (len >= (int) max_path_len) { > + vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name too long (%d/%lu)", > + channel_path, channel_prefix, c->uuid, len, max_path_len); > + return -1; > + } > + > + len = snprintf(address.sun_path, max_path_len, "%s%s%s", channel_path, > + channel_prefix, c->uuid); > + > + if (len >= (int) max_path_len || len <= 0) { > + vu_log(VHOSTMD_ERR, "Channel '%s%s%s' - name is invalid (%lu)", > + channel_path, channel_prefix, c->uuid, max_path_len); > + return -1; > + } > + > + if ((c->fd = socket(AF_LOCAL, SOCK_STREAM, 0)) < 0) { > + vu_log(VHOSTMD_ERR, "Channel '%s' - socket() failed (%s)", > + address.sun_path, strerror(errno)); > + return -1; > + } > + > + flags = fcntl(c->fd, F_GETFL, 0); > + if (flags < 0) { > + vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)", > + address.sun_path, strerror(errno)); > + return -1; > + } > + > + flags |= flags | O_NONBLOCK; > + if (fcntl(c->fd, F_SETFL, flags) == -1) { > + vu_log(VHOSTMD_ERR, "Channel '%s' - fcntl() failed (%s)", > + address.sun_path, strerror(errno)); > + return -1; > + } > + > + if (connect(c->fd, (struct sockaddr *) &address, > + (socklen_t) sizeof(address)) < 0) { > + vu_log(VHOSTMD_ERR, "Channel '%s' - connect() failed (%s)", > + address.sun_path, strerror(errno)); > + return -1; > + } > + > + evt.data.ptr = c; > + evt.events = EPOLLIN; > + > + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, c->fd, &evt) == -1) { > + vu_log(VHOSTMD_ERR, "Could not add channel '%s' - epoll_ctl() failed (%s)", > + address.sun_path, strerror(errno)); > + return -1; > + } > + > + vu_log(VHOSTMD_INFO, "Opened channel '%s' (%d/%d)", > + address.sun_path, channel_count, channel_max_num); > + > + return 0; > +} > + > +/* > + * lookup UDS sockets in the directory > + * for valid type/name/mbuffer connect and register channel > + */ For a long time libvirt has been able to generate a socket path for unix channels. The standard path prefix is /var/lib/libvirt/qemu/channel/target. When a domain is started a subdir is created with name 'domain-<domid>-<domname>, where each unix socket is created based on name attribute of target element. So e.g. a domain with id '5' and name 'foobar' containing the following channel config <channel type='unix'> <source mode='bind'/> <target type='virtio' name='org.qemu.guest_agent.0'/> </channel> <channel type='unix'> <source mode='bind'/> <target type='virtio' name='org.github.vhostmd.1'/> </channel> will result in /var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.qemu.guest_agent.0 /var/lib/libvirt/qemu/channel/target/domain-5-foobar/org.github.vhostmd.1 Within the VM you have /dev/virtio-ports/org.qemu.guest_agent.0 /dev/virtio-ports/org.github.vhostmd.1 For consistency with other channels like the guest agent it would be nice to not require specifying the channel path in the source element. I realize the importance of uuid throughout this patch, but would it be possible to make this work using libvirt's naming scheme? Sorry for not noticing this earlier :-(. Regards, Jim That's quite interesting and, beside the fact that it integrates in the available name scheme and directory structure of qemu, it would reduce administration and potential misconfiguration. >From my understanding the vu_vm.id of a VM must be unique on the host and based on that I can switch my internal 'index' from uuid to id and use the config you suggested. I guess we can rely on the fact that the unix socket of a virtio channel is immediately closed and removed from the filesystem with the 'virsh destroy' command, right? Regards Michael > +static int vio_channel_readdir(const char *path) > +{ > + struct dirent *ent; > + DIR *dir = NULL; > + > + if ((dir = opendir(path)) == NULL) { > + vu_log(VHOSTMD_ERR, "opendir(%s) failed (%s)", path, strerror(errno)); > + return -1; > + } > + > + while ((ent = readdir(dir)) != NULL) { > + > + if (ent->d_type == DT_SOCK && > + strncmp(ent->d_name, channel_prefix, VIRTIO_PREFIX_LEN) == 0 && > + strnlen(ent->d_name, VIRTIO_NAME_BUFLEN) == > + (VIRTIO_NAME_BUFLEN - 1)) { > + > + const char *uuid = &ent->d_name[VIRTIO_PREFIX_LEN]; > + > + vio_channel *c = vio_channel_find(uuid); > + > + if (c == NULL) { > + if (channel_count >= channel_max_num) { > + closedir(dir); > + vu_log(VHOSTMD_ERR, "Could not add channel '%s%s%s'" > + " - too many VMs (%d/%d)", > + path, channel_prefix, uuid, channel_count, > + channel_max_num); > + return -1; > + } > + > + pthread_mutex_lock(&mbuffer_mtx); > + > + /* don't add the channel if there is no mbuffer for this VM */ > + if (vio_mbuffer_find(uuid) != NULL) { > +#ifdef ENABLE_DEBUG > + vu_log(VHOSTMD_DEBUG, "New channel %s%s\n", path, ent->d_name); > +#endif > + c = vio_channel_add(uuid); > + > + if (c == NULL) > + vu_log(VHOSTMD_ERR, "Could not add channel '%s%s'", path, > + ent->d_name); > + } > + > + pthread_mutex_unlock(&mbuffer_mtx); > + } > + } > + } > + closedir(dir); > + > + return 0; > +} > + > +/* > + * channel - btree - compare function > + */ > +static int vio_channel_compare(const void *a, const void *b) > +{ > + if (a == NULL || b == NULL) > + return 1; > + > + return strncmp(((const vio_channel *) a)->uuid, ((const vio_channel *) b)->uuid, > + (size_t) VIR_UUID_STRING_BUFLEN); > +} > + > +/* > + * channel - btree/twalk - action function > + * delete entries > + */ > +static void vio_channel_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED) > +{ > + if (which == endorder || which == leaf) { > + if (node) { > + struct epoll_event evt; > + vio_channel *c = *(vio_channel * const *) node; > + if (c) { > + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt); > + vio_channel_close(c); > + } > + } > + } > +} > + > +/* > + * mbuffer - btree - compare function > + */ > +static int vio_mbuffer_compare(const void *a, const void *b) > +{ > + if (a == NULL || b == NULL) > + return 1; > + > + return strncmp(((const vio_mbuffer *) a)->uuid, ((const vio_mbuffer *) b)->uuid, > + (size_t) VIR_UUID_STRING_BUFLEN); > +} > + > +/* > + * mbuffer - btree/twalk - action function > + * delete entries > + */ > +static void vio_mbuffer_delete(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED) > +{ > + if (which == endorder || which == leaf) { > + if (node) { > + vio_mbuffer *b = *(vio_mbuffer * const *) node; > + > + if (b) { > + if (b->xml) > + vu_buffer_delete(b->xml); > + tdelete((const void *) b, vio_mbuffer_get_root(), > + vio_mbuffer_compare); > + free(b); > + mbuffer_count--; > + } > + } > + } > +} > + > +/* > + * mbuffer - btree/twalk - action function > + * expire entries > + */ > +static void vio_mbuffer_expire(const void *node, VISIT which, int depth ATTRIBUTE_UNUSED) > +{ > + if (which == endorder || which == leaf) { > + if (node) { > + vio_mbuffer *b = *(vio_mbuffer * const *) node; > + > + /* remove expired mbuffer > + * a mbuffer expires when the last update is older > + * than the expiration_period > + * > + * action function does not support custom arguments > + * --> use a static variable: exp_ts > + */ > + if (b && b->last_update < mbuffer_exp_ts) { > + vio_channel *c = vio_channel_find(b->uuid); > + > + if (c != NULL) > + vio_channel_close(c); > + > +#ifdef ENABLE_DEBUG > + vu_log(VHOSTMD_DEBUG, "Expire mbuffer '%s' (%d/%d)", > + b->uuid, mbuffer_count, mbuffer_max_num); > +#endif > + if (b->xml) > + vu_buffer_delete(b->xml); > + tdelete((const void *) b, vio_mbuffer_get_root(), > + vio_mbuffer_compare); > + free(b); > + mbuffer_count--; > + } > + } > + } > +} > + > +/* gcc's -Wall -Werror require a cast from (volatile void *) to (void *) > + * but this discards the volatile. The function attributes * 'otimize_O0' > + * and 'noinline' should avoid any optimization for this access. > + */ > +ATTRIBUTE_OPTIMIZE_O0 > +ATTRIBUTE_NOINLINE > +static void **vio_mbuffer_get_root(void) > +{ > + return (void *) &mbuffer_root; > +} > + > +#ifdef ENABLE_DEBUG > +static void vio_mbuffer_print(const void *node, > + VISIT which, > + int depth ATTRIBUTE_UNUSED) > +{ > + if (which == endorder || which == leaf) { > + if (node) { > + vio_mbuffer *b = *(vio_mbuffer **) node; > + > + if (b) { > + vu_log(VHOSTMD_DEBUG, "\t%4d %s %lu\n", > + ++mbuffer_idx, b->uuid, b->last_update); > + } > + } > + } > +} > +#endif > + > +/* > + * lookup metrics buffer in internal btree > + */ > +static vio_mbuffer *vio_mbuffer_find(const char *uuid) > +{ > + vio_mbuffer b; > + void *p; > + > + strncpy(b.uuid, uuid, sizeof(b.uuid)); > + > + p = tfind((const void *) &b, vio_mbuffer_get_root(), vio_mbuffer_compare); > + if (p == NULL) > + return NULL; > + > + return *(vio_mbuffer **) p; > +} > + > +/* > + * add metrics buffer to internal btree > + */ > +static vio_mbuffer *vio_mbuffer_add(const char *uuid) > +{ > + vio_mbuffer *b = NULL; > + void *p = NULL; > + > + if (mbuffer_count >= mbuffer_max_num) { > + vu_log(VHOSTMD_ERR, > + "Could not add metrics buffer '%s' - too many VMs (%d/%d)", > + uuid, mbuffer_count, mbuffer_max_num); > + > +#ifdef ENABLE_DEBUG > + mbuffer_idx = 0; > + vu_log(VHOSTMD_DEBUG, "exp_ts %lu, allocated mbuffer:\n", mbuffer_exp_ts); > + twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_print); > +#endif > + > + return NULL; > + } > + > + b = (vio_mbuffer *) calloc(1UL, sizeof(vio_mbuffer)); > + if (b == NULL) > + goto error; > + > + strncpy(b->uuid, uuid, sizeof(b->uuid)); > + b->xml = NULL; > + > + if (vu_buffer_create(&b->xml, DEFAULT_VU_BUFFER_SIZE) != 0) { > + free(b); > + goto error; > + } > + > + p = tsearch((const void *) b, vio_mbuffer_get_root(), vio_mbuffer_compare); > + if (p == NULL) { > + vu_buffer_delete(b->xml); > + free(b); > + goto error; > + } > + > + mbuffer_count++; > + return b; > + > + error: > + vu_log(VHOSTMD_ERR, "Could not add metrics buffer '%s' (%d/%d)", > + uuid, mbuffer_count, mbuffer_max_num); > + > + return NULL; > +} > + > +/* > + * lookup virtio channel in internal btree > + */ > +static vio_channel *vio_channel_find(const char *uuid) > +{ > + vio_channel c; > + void *p; > + > + strncpy(c.uuid, uuid, sizeof(c.uuid)); > + > + p = tfind((const void *) &c, &channel_root, vio_channel_compare); > + if (p == NULL) > + return NULL; > + > + return *(vio_channel **) p; > +} > + > +/* > + * add virtio channel to internal btree > + */ > +static vio_channel *vio_channel_add(const char *uuid) > +{ > + vio_channel *c = NULL; > + void *p = NULL; > + > + c = (vio_channel *) calloc(1UL, sizeof(vio_channel)); > + if (c == NULL) > + goto error; > + > + channel_count++; > + > + strncpy(c->uuid, uuid, sizeof(c->uuid)); > + c->fd = -1; > + c->request = NULL; > + c->response = NULL; > + > + p = tsearch((const void *) c, &channel_root, vio_channel_compare); > + if (p == NULL) > + goto error; > + > + if (vio_channel_open(c) != 0 || > + vu_buffer_create(&c->request, 512) != 0 || > + vu_buffer_create(&c->response, DEFAULT_VU_BUFFER_SIZE) != 0) > + goto error; > + > + return c; > + > + error: > + if (c) > + vio_channel_close(c); > + > + return NULL; > +} > + > +static REQUEST_T vio_check_request(vio_channel * c) > +{ > + const char xml_req_n[] = "GET /metrics/XML\n\n"; > + const char xml_req_rn[] = "GET /metrics/XML\r\n\r\n"; > + > + if (strcmp(c->request->content, xml_req_n) == 0 || > + strcmp(c->request->content, xml_req_rn) == 0) { > + // valid request > + vu_buffer_erase(c->request); > + return REQ_GET_XML; > + } else if (c->request->use >= (c->request->size - 1) || > + strstr(c->request->content, "\n\n") || > + strstr(c->request->content, "\r\n\r\n")) { > + // invalid request -> reset buffer > + vu_buffer_erase(c->request); > + > + vu_buffer_erase(c->response); > + vu_buffer_add(c->response, "INVALID REQUEST\n\n", -1); > + return REQ_INVALID; > + } else { > + // fragment > + c->request->use = (unsigned) strnlen(c->request->content, > + (size_t) c->request->size); > + } > + > + return REQ_INCOMPLETE; > +} > + > +static void vio_channel_recv(vio_channel * c) > +{ > + struct epoll_event evt; > + ssize_t rc = 0; > + REQUEST_T req_type = REQ_INCOMPLETE; > + > + do { > + char *buf = &c->request->content[c->request->use]; > + size_t len = c->request->size - c->request->use - 1; > + > + rc = recv(c->fd, buf, len, 0); > + > + if (rc > 0) { > + req_type = vio_check_request(c); > + } > + } while (rc > 0 && req_type == REQ_INCOMPLETE); > + > + if (req_type == REQ_GET_XML) { > + if (vio_channel_update(c)) { > + // no metrics available -> close channel > + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt); > + vio_channel_close(c); > + } else > + vio_channel_send(c, EPOLLIN); > + } else if (req_type == REQ_INVALID) > + vio_channel_send(c, EPOLLIN); > +} > + > +static void vio_channel_send(vio_channel * c, uint32_t ep_event) > +{ > + struct epoll_event evt; > + int len; > + > + while ((len = (int) (c->response->use - c->response->pos)) > 0) > + { > + char *buf = &c->response->content[c->response->pos]; > + ssize_t rc = send(c->fd, buf, (size_t) len, 0); > + > + if (rc > 0) > + c->response->pos += (unsigned) rc; > + else > + break; > + } > + > + if (ep_event == EPOLLOUT) { > + if (c->response->use <= c->response->pos) { > + // next request > + evt.data.ptr = c; > + evt.events = EPOLLIN; > + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt); > + } > + } else if (ep_event == EPOLLIN) { > + if (c->response->use > c->response->pos) { > + // incomplete response > + evt.data.ptr = c; > + evt.events = EPOLLOUT; > + epoll_ctl(epoll_fd, EPOLL_CTL_MOD, c->fd, &evt); > + } > + } > +} > + > +static void vio_handle_io(unsigned epoll_wait_ms) > +{ > + int i = 0; > + uint64_t ts_end, ts_now; > + struct epoll_event evt; > + struct timespec ts; > + > + clock_gettime(CLOCK_MONOTONIC, &ts); > + ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000); > + ts_end = ts_now + epoll_wait_ms; > + > + while (ts_now < ts_end) { > + int wait_ms = (int) (ts_end - ts_now); > + int n = > + epoll_wait(epoll_fd, epoll_events, channel_max_num + 1, wait_ms); > + > + for (i = 0; i < n; i++) { > + vio_channel *c = (epoll_events + i)->data.ptr; > + > + if ((epoll_events + i)->events & EPOLLHUP) { > + epoll_ctl(epoll_fd, EPOLL_CTL_DEL, c->fd, &evt); > + vio_channel_close(c); > + } else if ((epoll_events + i)->events & EPOLLIN) { > + vio_channel_recv(c); > + } else if ((epoll_events + i)->events & EPOLLOUT) { > + vio_channel_send(c, EPOLLOUT); > + } > + } > + > + clock_gettime(CLOCK_MONOTONIC, &ts); > + ts_now = (uint64_t) (ts.tv_sec * 1000 + ts.tv_nsec / 1000000); > + } > +} > + > +/* > + * Initialize virtio layer > + */ > +int virtio_init(const char *virtio_path, > + int max_channel, > + int expiration_period) > +{ > + if (virtio_status == VIRTIO_INIT) { > + pthread_mutex_init(&mbuffer_mtx, NULL); > + > + channel_max_num = max_channel; > + mbuffer_max_num = max_channel; > + mbuffer_exp_period = expiration_period; > + > + if (mbuffer_host == NULL) > + if (vu_buffer_create (&mbuffer_host, DEFAULT_VU_BUFFER_SIZE) != 0) > + goto error; > + > + if (epoll_fd == -1) { > + > + epoll_events = calloc((size_t) (channel_max_num + 1), > + sizeof(struct epoll_event)); > + if (epoll_events == NULL) > + goto error; > + > + epoll_fd = epoll_create(1); > + if (epoll_fd == -1) > + goto error; > + > + if (virtio_path == NULL || > + (strnlen(virtio_path, max_virtio_path_len) + > + VIR_UUID_STRING_BUFLEN) > (max_virtio_path_len - 2)) { > + > + vu_log(VHOSTMD_ERR, "Invalid virtio_path"); > + goto error; > + } > + > + if (channel_path == NULL) { > + channel_path = calloc(1UL, max_virtio_path_len + 2); > + if (channel_path == NULL) > + goto error; > + > + strncpy(channel_path, virtio_path, > + max_virtio_path_len - VIR_UUID_STRING_BUFLEN); > + > + if (channel_path[strlen(channel_path) - 1] != '/') > + channel_path[strlen(channel_path)] = '/'; > + } > + } > + > + virtio_status = VIRTIO_ACTIVE; > + vu_log(VHOSTMD_INFO, > + "Virtio using path '%s', max_channels %d, expiration_time %ld", > + channel_path, channel_max_num, mbuffer_exp_period); > + } > + > + return 0; > + > + error: > + vu_log(VHOSTMD_ERR, "Virtio initialization failed"); > + virtio_status = VIRTIO_ERROR; > + > + return -1; > +} > + > +/* > + * Cleanup virtio layer > + */ > +int virtio_cleanup(void) > +{ > + if (virtio_status == VIRTIO_STOP) { > + > + if (epoll_fd != -1) { > + close(epoll_fd); > + epoll_fd = -1; > + } > + > + if (channel_root) { > + twalk(channel_root, vio_channel_delete); > + tdestroy(channel_root, free); > + channel_root = NULL; > + channel_count = 0; > + } > + > + if (channel_path) { > + free(channel_path); > + channel_path = NULL; > + } > + > + if (*vio_mbuffer_get_root()) { > + twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_delete); > + tdestroy((void *) *vio_mbuffer_get_root(), free); > + mbuffer_root = NULL; > + mbuffer_count = 0; > + } > + > + if (mbuffer_host) { > + vu_buffer_delete((vu_buffer *) mbuffer_host); > + mbuffer_host = NULL; > + } > + > + if (epoll_events) > + free(epoll_events); > + > + pthread_mutex_destroy(&mbuffer_mtx); > + > + virtio_status = VIRTIO_INIT; > + > + return 0; > + } > + return -1; > +} > + > +/* > + * Main virtio function > + * 'start_routine' of pthread_create() > + */ > +void *virtio_run(void *arg ATTRIBUTE_UNUSED) > +{ > + if (virtio_status != VIRTIO_ACTIVE) { > + vu_log(VHOSTMD_ERR, "Virtio was not initialized"); > + return NULL; > + } > + > + while (virtio_status == VIRTIO_ACTIVE) { > + if (channel_count < channel_max_num) > + vio_channel_readdir(channel_path); > + > + vio_handle_io(3000); // process avaible requests > + > + // remove expired metrics buffers > + mbuffer_exp_ts = time(NULL) - mbuffer_exp_period; > + > + pthread_mutex_lock(&mbuffer_mtx); > + > + if (*vio_mbuffer_get_root()) > + twalk((const void *) *vio_mbuffer_get_root(), vio_mbuffer_expire); > + > + pthread_mutex_unlock(&mbuffer_mtx); > + } > + > + virtio_cleanup(); > + > + return NULL; > +} > + > +/* > + * Update the metrics response buffer of a VM/host > + */ > +int virtio_metrics_update(const char *buf, > + int len, > + const char *uuid, > + metric_context ctx) > +{ > + int rc = -1; > + vio_mbuffer *b; > + > + if (buf == NULL || len == 0 || virtio_status != VIRTIO_ACTIVE || > + (ctx == METRIC_CONTEXT_VM && uuid == NULL)) > + return -1; > + > + pthread_mutex_lock(&mbuffer_mtx); > + > + if (ctx == METRIC_CONTEXT_HOST) { > + vu_buffer_erase(mbuffer_host); > + vu_buffer_add(mbuffer_host, buf, len); > +#ifdef ENABLE_DEBUG > + vu_log(VHOSTMD_DEBUG, "New content for HOST (%u)\n>>>%s<<<\n", > + len, mbuffer_host->content); > +#endif > + rc = 0; > + } > + else if (ctx == METRIC_CONTEXT_VM) { > + if ((b = vio_mbuffer_find(uuid)) == NULL) { > + // for a new VM create a new mbuffer > + b = vio_mbuffer_add(uuid); > + } > + > + if (b != NULL) { > + vu_buffer_erase(b->xml); > + vu_buffer_add(b->xml, buf, len); > + // update the timestamp that mbuffer can be expired > + b->last_update = time(NULL); > +#ifdef ENABLE_DEBUG > + vu_log(VHOSTMD_DEBUG, "New content for %s (%u)\n>>>%s<<<\n", > + uuid, len, b->xml->content); > +#endif > + rc = 0; > + } > + } > + > + pthread_mutex_unlock(&mbuffer_mtx); > + > + return rc; > +} > + > +/* > + * Stop virtio thread > + */ > +void virtio_stop(void) > +{ > + if (virtio_status == VIRTIO_ACTIVE) > + virtio_status = VIRTIO_STOP; > +} > _______________________________________________ virt-tools-list mailing list virt-tools-list@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/virt-tools-list