Re: v0.40 released

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Sun, 15 Jan 2012, Martin Mailand wrote:
> Hi,
> is there an example how to use it, because there is no cpeh plugin for
> collectd?

Patch is attached.  This is against 4.10.1, which is a bit old (debian 
squeeze).  We need to get it ported to the latest version and submitted 
upstream, but haven't had the time. 

The collectd config needs to look something like

LoadPlugin ceph
<Plugin ceph>
                Name "osd.0"
                SocketPath "/var/run/ceph/osd.0.asok"
</Plugin>

where the .asok file is the admin socket for the daemon.  This gets daemon 
state via the admin socket, which you can also view with 

	ceph --admin-daemon /var/run/ceph/osd.0.asok perfcounters_dump
	ceph --admin-daemon /var/run/ceph/osd.0.asok perfcounters_schema

sage




> 
> -martin
> 
> 
> Am 14.01.2012 06:30, schrieb Sage Weil:
> > * mon: expose cluster stats via admin socket (accessible via collectd
> >     plugin)
> 
> 
From ee2f8644654e972b813208690c2b9583f4ac6f6f Mon Sep 17 00:00:00 2001
From: Sage Weil <sage@xxxxxxxxxxxx>
Date: Sun, 15 Jan 2012 13:21:11 -0800
Subject: [PATCH] ceph plugin

---
 src/Makefile.am |    9 +
 src/ceph.c      |  807 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 816 insertions(+), 0 deletions(-)
 create mode 100644 src/ceph.c

diff --git a/src/Makefile.am b/src/Makefile.am
index c6b0538..47fe489 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -950,6 +950,15 @@ collectd_LDADD += "-dlopen" snmp.la
 collectd_DEPENDENCIES += snmp.la
 endif
 
+pkglib_LTLIBRARIES += ceph.la
+ceph_la_SOURCES = ceph.c
+ceph_la_LDFLAGS = -module -avoid-version
+ceph_la_CFLAGS = $(AM_CFLAGS)
+ceph_la_LIBADD = -ljson
+collectd_LDADD += "-dlopen" ceph.la
+collectd_DEPENDENCIES += ceph.la
+
+
 if BUILD_PLUGIN_SWAP
 pkglib_LTLIBRARIES += swap.la
 swap_la_SOURCES = swap.c
diff --git a/src/ceph.c b/src/ceph.c
new file mode 100644
index 0000000..f41d52a
--- /dev/null
+++ b/src/ceph.c
@@ -0,0 +1,807 @@
+/**
+ * collectd - src/ceph.c
+ * Copyright (C) 2011  New Dream Network
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; only version 2 of the License is applicable.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301 USA
+ *
+ * Authors:
+ *   Colin McCabe <cmccabe@xxxxxxxxxxxxxx>
+ **/
+
+#define _BSD_SOURCE
+
+#include "collectd.h"
+#include "common.h"
+#include "plugin.h"
+
+#include <arpa/inet.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <json/json.h>
+#include <json/json_object_private.h> /* need for struct json_object_iter */
+#include <limits.h>
+#include <poll.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <strings.h>
+#include <sys/socket.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#define RETRY_ON_EINTR(ret, expr) \
+	while(1) { \
+		ret = expr; \
+		if (ret >= 0) \
+			break; \
+		ret = -errno; \
+		if (ret != -EINTR) \
+			break; \
+	}
+
+/** Timeout interval in seconds */
+#define CEPH_TIMEOUT_INTERVAL 1
+
+/** Maximum path length for a UNIX domain socket on this system */
+#define UNIX_DOMAIN_SOCK_PATH_MAX (sizeof(((struct sockaddr_un*)0)->sun_path))
+
+
+
+static void submit_values(const char *plugin_instance,
+			  const char *type, const char *type_instance,
+			  value_t *values, int num)
+{
+	value_list_t vl = VALUE_LIST_INIT;
+
+	DEBUG("submit_values %s/%s/%s num=%d)",
+	      plugin_instance, type, type_instance, num);
+	vl.values = values;
+	vl.values_len = num;
+	sstrncpy (vl.host, hostname_g, sizeof (vl.host));
+	sstrncpy (vl.plugin, "ceph", sizeof (vl.plugin));
+	if (plugin_instance != NULL)
+		sstrncpy (vl.plugin_instance, plugin_instance,
+				sizeof (vl.plugin_instance));
+	sstrncpy (vl.type, type, sizeof (vl.type));
+	if (type_instance != NULL)
+		sstrncpy (vl.type_instance, type_instance,
+				sizeof (vl.type_instance));
+
+	plugin_dispatch_values (&vl);
+}
+
+
+/******* ceph_daemon *******/
+struct ceph_daemon
+{
+	/** Version of the admin_socket interface */
+	uint32_t version;
+
+	/** Path to the socket that we use to talk to the ceph daemon */
+	char asok_path[UNIX_DOMAIN_SOCK_PATH_MAX];
+
+	/** instance name **/
+	char name[DATA_MAX_NAME_LEN];
+
+	/** our types **/
+	int num_types;
+	struct data_set_s *dsets;
+	int *pc_types;
+};
+
+enum perfcounter_type_d {
+	PERFCOUNTER_LONGRUNAVG = 0x4,
+	PERFCOUNTER_COUNTER = 0x8,
+};
+
+/** Array of daemons to monitor */
+static struct ceph_daemon **g_daemons = NULL;
+
+/** Number of elements in g_daemons */
+static int g_num_daemons = 0;
+
+static void ceph_daemon_print(const struct ceph_daemon *d)
+{
+	DEBUG("name=%s, asok_path=%s", d->name, d->asok_path);
+}
+
+static void ceph_daemons_print(void)
+{
+	int i;
+	for (i = 0; i < g_num_daemons; ++i) {
+		ceph_daemon_print(g_daemons[i]);
+	}
+}
+
+static void ceph_daemon_free(struct ceph_daemon *d)
+{
+	int i;
+
+	for (i = 0; i < d->num_types; i++)
+		plugin_unregister_data_set(d->dsets[i].type);
+	sfree(d->dsets);
+	sfree(d->pc_types);
+	sfree(d);
+}
+
+static int ceph_daemon_add_ds_entry(struct ceph_daemon *d,
+				    const char *group, const char *key,
+				    int pc_type)
+{
+	struct data_set_s *dset;
+	int idx;
+	int ret;
+
+	if (strlen(key) + 1 > DATA_MAX_NAME_LEN)
+		return -ENAMETOOLONG;
+
+	idx = d->num_types++;
+	d->dsets = realloc(d->dsets, sizeof(*d->dsets) * d->num_types);
+	if (!d->dsets)
+		return -ENOMEM;
+	d->pc_types = realloc(d->pc_types, sizeof(*d->pc_types) * d->num_types);
+	if (!d->pc_types)
+		return -ENOMEM;
+
+	dset = &d->dsets[idx];
+	d->pc_types[idx] = pc_type;
+	strncpy(dset->type, key, sizeof(dset->type));
+
+	if (pc_type & PERFCOUNTER_LONGRUNAVG) {
+		dset->ds_num = 2;
+		dset->ds = malloc(sizeof(*dset->ds) * dset->ds_num);
+		if (!dset->ds)
+			return -ENOMEM;
+		dset->ds[0].type = DS_TYPE_COUNTER;
+		strncpy(dset->ds[0].name, "sum", sizeof(dset->ds[0].name));
+		dset->ds[0].min = NAN;
+		dset->ds[0].max = NAN;
+		dset->ds[1].type = DS_TYPE_COUNTER;
+		strncpy(dset->ds[1].name, "count", sizeof(dset->ds[0].name));
+		dset->ds[1].min = NAN;
+		dset->ds[1].max = NAN;
+	}
+	else {
+		dset->ds_num = 1;
+		dset->ds = malloc(sizeof(*dset->ds) * dset->ds_num);
+		if (!dset->ds)
+			return -ENOMEM;
+		if (pc_type & PERFCOUNTER_COUNTER)
+			dset->ds[0].type = DS_TYPE_COUNTER;
+		else
+			dset->ds[0].type = DS_TYPE_GAUGE;
+		strncpy(dset->ds[0].name, "value", sizeof(dset->ds[0].name));
+		dset->ds[0].min = NAN;
+		dset->ds[0].max = NAN;
+	}
+
+	ret = plugin_register_data_set(dset);
+	DEBUG("plugin_register_data_set %s type %d ds_num %d returned %d", key,
+	      pc_type, dset->ds_num, ret);
+	return ret;
+}
+
+/******* ceph_config *******/
+static int cc_handle_str(struct oconfig_item_s *item, char *dest, int dest_len)
+{
+	const char *val;
+	if (item->values_num != 1) {
+		return -ENOTSUP; 
+	}
+	if (item->values[0].type != OCONFIG_TYPE_STRING) {
+		return -ENOTSUP; 
+	}
+	val = item->values[0].value.string;
+	if (snprintf(dest, dest_len, "%s", val) > (dest_len - 1)) {
+		ERROR("ceph plugin: configuration parameter '%s' is too long.\n",
+		      item->key);
+		return -ENAMETOOLONG;
+	}
+	return 0;
+}
+
+static int ceph_config(oconfig_item_t *ci)
+{
+	int ret, i;
+	struct ceph_daemon *array, *nd, cd;
+	memset(&cd, 0, sizeof(struct ceph_daemon));
+
+	for (i = 0; i < ci->children_num; ++i) {
+		oconfig_item_t *child = ci->children + i;
+		if (strcasecmp("Name", child->key) == 0) {
+			ret = cc_handle_str(child, cd.name, DATA_MAX_NAME_LEN);
+			if (ret)
+				return ret;
+		}
+		else if (strcasecmp("SocketPath", child->key) == 0) {
+			ret = cc_handle_str(child, cd.asok_path,
+					    sizeof(cd.asok_path));
+			if (ret)
+				return ret;
+		}
+		else {
+			WARNING("ceph plugin: ignoring unknown option %s",
+				child->key);
+		}
+	}
+	if (cd.name[0] == '\0') {
+		ERROR("ceph plugin: you must configure a daemon name.\n");
+		return -EINVAL;
+	}
+	else if (cd.asok_path[0] == '\0') {
+		ERROR("ceph plugin(name=%s): you must configure an administrative "
+		      "socket path.\n", cd.name);
+		return -EINVAL;
+	}
+	else if (!((cd.asok_path[0] == '/') ||
+	      (cd.asok_path[0] == '.' && cd.asok_path[1] == '/'))) {
+		ERROR("ceph plugin(name=%s): administrative socket paths must begin with "
+		      "'/' or './' Can't parse: '%s'\n",
+		      cd.name, cd.asok_path);
+		return -EINVAL;
+	}
+
+	/* . -> _ in name */
+	for (i = 0; cd.name[i]; ++i)
+		if (cd.name[i] == '.')
+			cd.name[i] = '_';
+
+	array = realloc(g_daemons,
+			sizeof(struct ceph_daemon *) * (g_num_daemons + 1));
+	if (array == NULL) {
+		/* The positive return value here indicates that this is a
+		 * runtime error, not a configuration error.  */
+		return ENOMEM;
+	}
+	g_daemons = (struct ceph_daemon**)array;
+	nd = malloc(sizeof(struct ceph_daemon));
+	if (!nd)
+		return ENOMEM;
+	memcpy(nd, &cd, sizeof(struct ceph_daemon));
+	g_daemons[g_num_daemons++] = nd;
+	return 0;
+}
+
+/******* JSON parsing *******/
+typedef int (*node_handler_t)(void*, const char *, const char*, json_object*);
+
+static int traverse_json(const char *json,
+			 node_handler_t handler, void *handler_arg)
+{
+	struct json_object_iter ia, ib;
+	json_object *root;
+	int ret;
+
+	root = json_tokener_parse(json);
+
+	ret = -EDOM;
+	if (!root)
+		goto out;
+	if (json_object_get_type(root) != json_type_object)
+		goto out;
+
+	json_object_object_foreachC(root, ia) {
+		json_object_object_foreachC(ia.val, ib) {
+			ret = handler(handler_arg, ia.key, ib.key, ib.val);
+			if (ret < 0)
+				goto out;
+		}
+	}
+	ret = 0;
+
+out:
+	json_object_put(root);
+	return ret;
+}
+
+static int node_handler_define_schema(void *arg, 
+				      const char *group, const char *key,
+				      json_object *jo)
+{
+	struct json_object_iter iter;
+	struct ceph_daemon *d = (struct ceph_daemon *)arg;
+	int pc_type;
+
+	if (json_object_get_type(jo) != json_type_object)
+		return -EDOM;
+
+	json_object_object_foreachC(jo, iter) {
+		if (strcmp(iter.key, "type"))
+			return -EDOM;
+		if (json_object_get_type(iter.val) != json_type_int)
+			return -EDOM;
+		pc_type = json_object_get_int(iter.val);
+		DEBUG(" got type d=%s group=%s key=%s pc_type=%04x",
+		      d->name, group, key, pc_type);
+		return ceph_daemon_add_ds_entry(d, group, key, pc_type);
+	}
+	return -EDOM;
+}
+
+static int node_handler_fetch_data(void *arg, 
+				   const char *group, const char *key,
+				   json_object *jo)
+{
+	struct ceph_daemon *d = (struct ceph_daemon *)arg;
+	int idx;
+	value_t v[2];
+
+	DEBUG("fetch_data %s/%s", group, key);
+
+	for (idx = 0; idx < d->num_types; ++idx)
+		if (strcmp(d->dsets[idx].type, key) == 0)
+			break;
+	if (idx == d->num_types) {
+		DEBUG("couldn't find '%s'", key);
+		return 0;
+	}
+
+	if (d->pc_types[idx] & PERFCOUNTER_LONGRUNAVG) {
+		json_object *avgcount, *sum; 
+		if (json_object_get_type(jo) != json_type_object)
+			return -EINVAL;
+		avgcount = json_object_object_get(jo, "avgcount");
+		sum = json_object_object_get(jo, "sum");
+		if ((!avgcount) || (!sum))
+			return -EINVAL;
+		v[0].counter = json_object_get_int(sum);
+		v[1].counter = json_object_get_int(avgcount);
+		submit_values(group, key, d->name, v, 2);
+	}
+	else if (d->pc_types[idx] & PERFCOUNTER_COUNTER) {
+		/* We use json_object_get_double here because anything > 32 
+		 * bits may get truncated by json_object_get_int */
+		v[0].counter = json_object_get_double(jo);
+		submit_values(group, key, d->name, v, 1);
+	}
+	else {
+		v[0].gauge = json_object_get_double(jo);
+		submit_values(group, key, d->name, v, 1);
+	}
+	return 0;
+}
+
+
+
+/******* network I/O *******/
+enum cstate_t {
+	CSTATE_UNCONNECTED = 0,
+	CSTATE_WRITE_REQUEST,
+	CSTATE_READ_VERSION,
+	CSTATE_READ_AMT,
+	CSTATE_READ_JSON,
+};
+
+enum request_type_t {
+	ASOK_REQ_VERSION = 0,
+	ASOK_REQ_DATA = 1,
+	ASOK_REQ_SCHEMA = 2,
+	ASOK_REQ_NONE = 1000,
+};
+
+struct cconn
+{
+	/** The Ceph daemon that we're talking to */
+	struct ceph_daemon *d;
+
+	/** Request type */
+	uint32_t request_type;
+
+	/** The connection state */
+	enum cstate_t state;
+
+	/** The socket we use to talk to this daemon */ 
+	int asok;
+
+	/** The amount of data remaining to read / write. */
+	uint32_t amt;
+
+	/** Length of the JSON to read */
+	uint32_t json_len;
+
+	/** Buffer containing JSON data */
+	char *json;
+};
+
+static int cconn_connect(struct cconn *io)
+{
+	struct sockaddr_un address;
+	int flags, fd, err;
+	if (io->state != CSTATE_UNCONNECTED) {
+		ERROR("cconn_connect: io->state != CSTATE_UNCONNECTED");
+		return -EDOM;
+	}
+	fd = socket(PF_UNIX, SOCK_STREAM, 0);
+	if (fd < 0) {
+		int err = -errno;
+		ERROR("cconn_connect: socket(PF_UNIX, SOCK_STREAM, 0) failed: "
+		      "error %d", err);
+		return err;
+	}
+	memset(&address, 0, sizeof(struct sockaddr_un));
+	address.sun_family = AF_UNIX;
+	snprintf(address.sun_path, sizeof(address.sun_path),
+		 "%s", io->d->asok_path);
+	RETRY_ON_EINTR(err, connect(fd, (struct sockaddr *) &address, 
+			sizeof(struct sockaddr_un)));
+	if (err < 0) {
+		ERROR("cconn_connect: connect(%d) failed: error %d", fd, err);
+		return err;
+	}
+
+	flags = fcntl(fd, F_GETFL, 0);
+	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) != 0) {
+		err = -errno;
+		ERROR("cconn_connect: fcntl(%d, O_NONBLOCK) error %d", fd, err);
+		return err;
+	}
+	io->asok = fd;
+	io->state = CSTATE_WRITE_REQUEST;
+	io->amt = 0;
+	io->json_len = 0;
+	io->json = NULL;
+	return 0;
+}
+
+static void cconn_close(struct cconn *io)
+{
+	io->state = CSTATE_UNCONNECTED;
+	if (io->asok != -1) {
+		int res;
+		RETRY_ON_EINTR(res, close(io->asok));
+	}
+	io->asok = -1;
+	io->amt = 0;
+	io->json_len = 0;
+	sfree(io->json);
+	io->json = NULL;
+}
+
+/* Process incoming JSON counter data */
+static int cconn_process_json(struct cconn *io)
+{
+	switch (io->request_type) {
+	case ASOK_REQ_DATA:
+		return traverse_json(io->json, node_handler_fetch_data, io->d);
+	case ASOK_REQ_SCHEMA:
+		return traverse_json(io->json, node_handler_define_schema,
+				     io->d);
+	default:
+		return -EDOM;
+	}
+}
+
+static int cconn_validate_revents(struct cconn *io, int revents)
+{
+	if (revents & POLLERR) {
+		ERROR("cconn_validate_revents(name=%s): got POLLERR",
+		      io->d->name);
+		return -EIO;
+	}
+	switch (io->state) {
+	case CSTATE_WRITE_REQUEST:
+		return (revents & POLLOUT) ? 0 : -EINVAL;
+	case CSTATE_READ_VERSION:
+	case CSTATE_READ_AMT:
+	case CSTATE_READ_JSON:
+		return (revents & POLLIN) ? 0 : -EINVAL;
+		return (revents & POLLIN) ? 0 : -EINVAL;
+	default:
+		ERROR("cconn_validate_revents(name=%s) got to illegal state on line %d",
+		      io->d->name, __LINE__);
+		return -EDOM;
+	}
+}
+
+/** Handle a network event for a connection */
+static int cconn_handle_event(struct cconn *io)
+{
+	int ret;
+	switch (io->state) {
+	case CSTATE_UNCONNECTED:
+		ERROR("cconn_handle_event(name=%s) got to illegal state on line %d",
+		      io->d->name, __LINE__);
+		return -EDOM;
+	case CSTATE_WRITE_REQUEST: {
+		uint32_t cmd = htonl(io->request_type);
+		RETRY_ON_EINTR(ret, write(io->asok, ((char*)&cmd) + io->amt,
+					       sizeof(cmd) - io->amt));
+		DEBUG("cconn_handle_event(name=%s,state=%d,amt=%d,ret=%d)",
+		      io->d->name, io->state, io->amt, ret);
+		if (ret < 0)
+			return ret;
+		io->amt += ret;
+		if (io->amt >= sizeof(cmd)) {
+			io->amt = 0;
+			switch (io->request_type) {
+			case ASOK_REQ_VERSION:
+				io->state = CSTATE_READ_VERSION;
+				break;
+			default:
+				io->state = CSTATE_READ_AMT;
+				break;
+			}
+		}
+		return 0;
+	}
+	case CSTATE_READ_VERSION: {
+		RETRY_ON_EINTR(ret, read(io->asok, 
+			((char*)(&io->d->version)) + io->amt,
+			sizeof(io->d->version) - io->amt));
+		DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+		      io->d->name, io->state, ret);
+		if (ret < 0)
+			return ret;
+		io->amt += ret;
+		if (io->amt >= sizeof(io->d->version)) {
+			io->d->version = ntohl(io->d->version);
+			if (io->d->version != 1) {
+				ERROR("cconn_handle_event(name=%s) not "
+				      "expecting version %d!", 
+				      io->d->name, io->d->version);
+				return -ENOTSUP;
+			}
+			DEBUG("cconn_handle_event(name=%s): identified as "
+			      "version %d", io->d->name, io->d->version);
+			io->amt = 0;
+			cconn_close(io);
+			io->request_type = ASOK_REQ_SCHEMA;
+		}
+		return 0;
+	}
+	case CSTATE_READ_AMT: {
+		RETRY_ON_EINTR(ret, read(io->asok, 
+			((char*)(&io->json_len)) + io->amt,
+			sizeof(io->json_len) - io->amt));
+		DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+		      io->d->name, io->state, ret);
+		if (ret < 0)
+			return ret;
+		io->amt += ret;
+		if (io->amt >= sizeof(io->json_len)) {
+			io->json_len = ntohl(io->json_len);
+			io->amt = 0;
+			io->state = CSTATE_READ_JSON;
+			io->json = calloc(1, io->json_len + 1);
+			if (!io->json)
+				return -ENOMEM;
+		}
+		return 0;
+	}
+	case CSTATE_READ_JSON: {
+		RETRY_ON_EINTR(ret, read(io->asok, io->json + io->amt,
+					      io->json_len - io->amt));
+		DEBUG("cconn_handle_event(name=%s,state=%d,ret=%d)",
+		      io->d->name, io->state, ret);
+		if (ret < 0)
+			return ret;
+		io->amt += ret;
+		if (io->amt >= io->json_len) {
+			DEBUG("raw ceph json is %s", io->json);
+			ret = cconn_process_json(io);
+			if (ret)
+				return ret;
+			cconn_close(io);
+			io->request_type = ASOK_REQ_NONE;
+		}
+		return 0;
+	}
+	default:
+		ERROR("cconn_handle_event(name=%s) got to illegal state on "
+		      "line %d", io->d->name, __LINE__);
+		return -EDOM;
+	}
+}
+
+static int cconn_prepare(struct cconn *io, struct pollfd* fds)
+{
+	int ret;
+	if (io->request_type == ASOK_REQ_NONE) {
+		/* The request has already been serviced. */
+		return 0;
+	}
+	else if ((io->request_type == ASOK_REQ_DATA) &&
+			 (io->d->num_types == 0)) {
+		/* If there are no counters to report on, don't bother
+		 * connecting */
+		return 0;
+	}
+
+	switch (io->state) {
+	case CSTATE_UNCONNECTED:
+		ret = cconn_connect(io);
+		if (ret > 0)
+			return -ret;
+		else if (ret < 0)
+			return ret;
+		fds->fd = io->asok;
+		fds->events = POLLOUT;
+		return 1;
+	case CSTATE_WRITE_REQUEST:
+		fds->fd = io->asok;
+		fds->events = POLLOUT;
+		return 1;
+	case CSTATE_READ_VERSION:
+	case CSTATE_READ_AMT:
+	case CSTATE_READ_JSON:
+		fds->fd = io->asok;
+		fds->events = POLLIN;
+		return 1;
+	default:
+		ERROR("cconn_prepare(name=%s) got to illegal state on line %d",
+		      io->d->name, __LINE__);
+		return -EDOM;
+	}
+}
+
+/** Returns the difference between two struct timevals in milliseconds.
+ * On overflow, we return max/min int.
+ */
+static int milli_diff(const struct timeval *t1, const struct timeval *t2)
+{
+	int64_t ret;
+	int sec_diff = t1->tv_sec - t2->tv_sec;
+	int usec_diff = t1->tv_usec - t2->tv_usec;
+	ret = usec_diff / 1000;
+	ret += (sec_diff * 1000);
+	if (ret > INT_MAX)
+		return INT_MAX;
+	else if (ret < INT_MIN)
+		return INT_MIN;
+	return (int)ret;
+}
+
+/** This handles the actual network I/O to talk to the Ceph daemons.
+ */
+static int cconn_main_loop(uint32_t request_type)
+{
+	int i, ret, some_unreachable = 0;
+	struct timeval end_tv;
+	struct cconn io_array[g_num_daemons];
+
+	DEBUG("entering cconn_main_loop(request_type = %d)", request_type);
+
+	/* create cconn array */
+	memset(io_array, 0, sizeof(io_array));
+	for (i = 0; i < g_num_daemons; ++i) {
+		io_array[i].d = g_daemons[i];
+		io_array[i].request_type = request_type;
+		io_array[i].state = CSTATE_UNCONNECTED;
+	}
+
+	/** Calculate the time at which we should give up */
+	gettimeofday(&end_tv, NULL);
+	end_tv.tv_sec += CEPH_TIMEOUT_INTERVAL;
+
+	while (1) {
+		int nfds, diff;
+		struct timeval tv;
+		struct cconn *polled_io_array[g_num_daemons];
+		struct pollfd fds[g_num_daemons];
+		memset(fds, 0, sizeof(fds));
+		nfds = 0;
+		for (i = 0; i < g_num_daemons; ++i) {
+			struct cconn *io = io_array + i;
+			ret = cconn_prepare(io, fds + nfds);
+			if (ret < 0) {
+				WARNING("ERROR: cconn_prepare(name=%s,i=%d,st=%d)=%d",
+				      io->d->name, i, io->state, ret);
+				cconn_close(io);
+				io->request_type = ASOK_REQ_NONE;
+				some_unreachable = 1;
+			}
+			else if (ret == 1) {
+				DEBUG("did cconn_prepare(name=%s,i=%d,st=%d)",
+				      io->d->name, i, io->state);
+				polled_io_array[nfds++] = io_array + i;
+			}
+		}
+		if (nfds == 0) {
+			/* finished */
+			ret = 0;
+			DEBUG("cconn_main_loop: no more cconn to manage.");
+			goto done;
+		}
+		gettimeofday(&tv, NULL);
+		diff = milli_diff(&end_tv, &tv);
+		if (diff <= 0) {
+			/* Timed out */
+			ret = -ETIMEDOUT;
+			WARNING("ERROR: cconn_main_loop: timed out.\n");
+			goto done;
+		}
+		RETRY_ON_EINTR(ret, poll(fds, nfds, diff));
+		if (ret < 0) {
+			ERROR("poll(2) error: %d", ret);
+			goto done;
+		}
+		for (i = 0; i < nfds; ++i) {
+			struct cconn *io = polled_io_array[i];
+			int revents = fds[i].revents;
+			if (revents == 0) {
+				/* do nothing */
+			}
+			else if (cconn_validate_revents(io, revents)) {
+				WARNING("ERROR: cconn(name=%s,i=%d,st=%d): "
+					"revents validation error: "
+					"revents=0x%08x", io->d->name, i,
+					io->state, revents);
+				cconn_close(io);
+				io->request_type = ASOK_REQ_NONE;
+				some_unreachable = 1;
+			}
+			else {
+				int ret = cconn_handle_event(io);
+				if (ret) {
+					WARNING("ERROR: cconn_handle_event(name=%s,"
+						"i=%d,st=%d): error %d",
+						io->d->name, i,
+						io->state, ret);
+					cconn_close(io);
+					io->request_type = ASOK_REQ_NONE;
+					some_unreachable = 1;
+				}
+			}
+		}
+	}
+done:
+	for (i = 0; i < g_num_daemons; ++i) {
+		cconn_close(io_array + i);
+	}
+	if (some_unreachable) {
+		DEBUG("cconn_main_loop: some Ceph daemons were unreachable.");
+	}
+	else {
+		DEBUG("cconn_main_loop: reached all Ceph daemons :)");
+	}
+	return ret;
+}
+
+static int ceph_read(void)
+{
+	return cconn_main_loop(ASOK_REQ_DATA);
+}
+
+/******* lifecycle *******/
+static int ceph_init(void)
+{
+	DEBUG("ceph_init");
+	ceph_daemons_print();
+
+	return cconn_main_loop(ASOK_REQ_VERSION);
+}
+
+static int ceph_shutdown(void)
+{
+	int i;
+	for (i = 0; i < g_num_daemons; ++i) {
+		ceph_daemon_free(g_daemons[i]);
+	}
+	sfree(g_daemons);
+	g_daemons = NULL;
+	g_num_daemons = 0;
+	DEBUG("finished ceph_shutdown");
+	return 0;
+}
+
+void module_register(void)
+{
+	plugin_register_complex_config("ceph", ceph_config);
+	plugin_register_init("ceph", ceph_init);
+	plugin_register_read("ceph", ceph_read);
+	plugin_register_shutdown("ceph", ceph_shutdown);
+}
-- 
1.7.2.5


[Index of Archives]     [CEPH Users]     [Ceph Large]     [Information on CEPH]     [Linux BTRFS]     [Linux USB Devel]     [Video for Linux]     [Linux Audio Users]     [Yosemite News]     [Linux Kernel]     [Linux SCSI]
  Powered by Linux