Re: [PATCH] IPFIX output plugin

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



I'm following up on this. I'd say this is complete and I don't plan to introduce further significant changes. Maybe just small fixes. I attach two patches, tell me if you'd rather have them as a single patch.

The first patch applies the cosmetic changes suggested by Pablo.

The second patch generates and sends template records, introducing a new 'send_template' config parameter, and fixes the data record's layout, as it was previously broken. It also puts my author name in the source files together with Holger's.

The 'send_template' parameter tells whether a template record should be sent. It can have the following string values: "once" (default), "always" and "never".

With "once", a template is sent with the first record set. Subsequent sets will only contain data records as the collector is assumed to already by aware of the template (this is ok). The values "always" and "never" do what you probably already expect.


I am currently testing this with the NFCT input and Wireshark.

Place the following in ulogd.conf:

     # this will print all flows on screen
     loglevel=1

     # load NFCT and IPFIX plugins
     plugin="/lib/ulogd/ulogd_inpflow_NFCT.so"
     plugin="/lib/ulogd/ulogd_output_IPFIX.so"

     stack=ct1:NFCT,ipfix1:IPFIX

     [ct1]
     netlink_socket_buffer_size=217088
     netlink_socket_buffer_maxsize=1085440
     accept_proto_filter=tcp,sctp

     [ipfix1]
     oid=1
     host="127.0.0.1"
     #port=4739
     #send_template="once"

I am currently testing it by launching a plain NetCat listener on port 4739 (the default for IPFIX) and then running Wireshark and see that it dissects the IPFIX/NetFlow traffic correctly (obviously this relies on the Wireshark NetFlow dissector being correct).

First:

     nc -vvvv -l 127.0.0.1 4739

Then:

     sudo ulogd -vc ulogd.conf

At this point IPFIX/NetFlow traffic should be sent to NetCat (which will print garbage as it's a binary protocol). It may take some time until traffic is actually sent, as captured flows are buffered until the buffer is filled. The buffer's length can be changed with the 'mtu' config parameter in ulogd.conf (it's 512 by default).

Currently Wireshark correctly dissects all the traffic.

IPFIX collector software is unfortunately not as widespread as for other protocols.

I am looking for free/open-source IPFIX collectors out there to better test this. I've found one at [0], but if someone has a better idea just let me know. I'll try it and test it against it in the following days and will send further patches fixing bugs should problems arise.

[0] https://github.com/CESNET/ipfixcol2

On 9/4/19 0:30, Pablo Neira Ayuso wrote:
Hi Ander,

On Tue, Apr 02, 2019 at 09:27:32AM +0200, Ander Juaristi wrote:
Hi,

The attached patch provides an IPFIX output plugin for ulogd2.

This patch is functionally equivalent to that sent by Holger Eitzenberger
(Astaro) some time ago. I've reworked it to make it compile under the
current plugin framework, which has suffered some changes since then.

The current patch (being functionally equivalent) does not send IPFIX
template records.

Yes, this is missing.

This is not necessary if the collector knows the template in
advance. However I plan to add such a feature in the following days
(sending template records in the IPFIX sets), unless you tell me
it's not necessary. I have already started working on it and another
patch will follow soon.

Great, if you could send the build and send template too, that would
be good.

I would also like to take this opportunity to introduce myself as a
prospective GSoC student for Netfilter (on pending approval for gsoc13
mailing list). I approached Pablo off-list on January asking for some
pointers to undone work on Netfilter. I am interested in idea 1 and all of
its subtasks, which, I suppose, would all form part of the same project. My
intention is to start writing the GSoC proposal now, submit it before April
9 and then submit the second (definitive) patch for IPFIX some time later,
before end of April, so that you could assess my coding skills based on this
patch. Please let me know if you'd like me to proceed another way.

More comments below on this patch.

 From e4a2367ced2062ee6b00f33d890c830e702650e4 Mon Sep 17 00:00:00 2001
From: Holger Eitzenberger <heitzenberger@xxxxxxxxxx>
Date: Fri, 30 Oct 2009 11:25:52 +0100

I'd suggest you use the existing date, also place yourself as the
patch author of this. Just say that "this is based on original work
from Holger Eitzenberger" in the patch description.

Subject: [PATCH] IPFIX: Add IPFIX output plugin

Please add a description to this patch, including an example on how to
use this.

Please, tell us how you are testing this patch, we would like to see a
working version of the ipfix plugin in the tree.

IIRC,the existing plugin is incomplete, so it would be good to say
that you just decided to remove the incomplete one and provide one
that is working, if you can prove it, of course ;-)

Signed-off-by: Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>
Signed-off-by: Ander Juaristi <a@xxxxxxxxxxxx>
---
  configure.ac                      |  11 +-
  include/ulogd/ulogd.h             |   3 +
  input/flow/ulogd_inpflow_IPFIX.c  |   2 -
  output/Makefile.am                |   2 +-
  output/ipfix/Makefile.am          |  12 +
  output/ipfix/ipfix.c              | 153 +++++++++
  output/ipfix/ipfix.h              |  89 +++++
  output/ipfix/ulogd_output_IPFIX.c | 526 ++++++++++++++++++++++++++++
  output/ulogd_output_IPFIX.c       | 546 ------------------------------
  9 files changed, 794 insertions(+), 550 deletions(-)
  delete mode 100644 input/flow/ulogd_inpflow_IPFIX.c
  create mode 100644 output/ipfix/Makefile.am
  create mode 100644 output/ipfix/ipfix.c
  create mode 100644 output/ipfix/ipfix.h
  create mode 100644 output/ipfix/ulogd_output_IPFIX.c
  delete mode 100644 output/ulogd_output_IPFIX.c

diff --git a/configure.ac b/configure.ac
index 3aa0624..cd9ac7e 100644
--- a/configure.ac
+++ b/configure.ac
@@ -150,6 +150,14 @@ else
  	enable_jansson="no"
  fi
+AC_ARG_WITH([ipfix], AS_HELP_STRING([--without-ipfix], [Build without IPFIX output plugin [default=test]]))
+AM_CONDITIONAL([HAVE_IPFIX], [test "x$with_ipfix" != "xno"])
+if test "x$with_ipfix" != "xno"; then
+	enable_ipfix="yes"
+else
+	enable_ipfix="no"
+fi

I think we don't need this knob. We don't have any external library
dependency, right? If not, please remove this.

+
  AC_ARG_WITH([ulogd2libdir],
  	AS_HELP_STRING([--with-ulogd2libdir=PATH],
          [Default directory to load ulogd2 plugin from [[LIBDIR/ulogd]]]),
@@ -179,7 +187,7 @@ AC_CONFIG_FILES(include/Makefile include/ulogd/Makefile include/libipulog/Makefi
  	  input/sum/Makefile \
  	  filter/Makefile filter/raw2packet/Makefile filter/packet2flow/Makefile \
  	  output/Makefile output/pcap/Makefile output/mysql/Makefile output/pgsql/Makefile output/sqlite3/Makefile \
-	  output/dbi/Makefile \
+	  output/dbi/Makefile output/ipfix/Makefile \
  	  src/Makefile Makefile Rules.make)
  AC_OUTPUT
@@ -214,5 +222,6 @@ Ulogd configuration:
      SQLITE3 plugin:			${enable_sqlite3}
      DBI plugin:				${enable_dbi}
      JSON plugin:			${enable_jansson}
+    IPFIX plugin:                       ${enable_ipfix}
  "
  echo "You can now run 'make' and 'make install'"
diff --git a/include/ulogd/ulogd.h b/include/ulogd/ulogd.h
index 2e38195..c017085 100644
--- a/include/ulogd/ulogd.h
+++ b/include/ulogd/ulogd.h
@@ -28,6 +28,9 @@
/* types without length */
  #define ULOGD_RET_NONE		0x0000

Missing line break here.

+#define __packed		__attribute__((packed))
+#define __noreturn		__attribute__((noreturn))
+#define __cold			__attribute__((cold))

__noreturn and __cold are not used. Remove them.

  #define ULOGD_RET_INT8		0x0001
  #define ULOGD_RET_INT16		0x0002
diff --git a/input/flow/ulogd_inpflow_IPFIX.c b/input/flow/ulogd_inpflow_IPFIX.c
deleted file mode 100644
index 27ce5b2..0000000
--- a/input/flow/ulogd_inpflow_IPFIX.c
+++ /dev/null
@@ -1,2 +0,0 @@
-/* */
-
diff --git a/output/Makefile.am b/output/Makefile.am
index ff851ad..7ba8217 100644
--- a/output/Makefile.am
+++ b/output/Makefile.am
@@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include ${LIBNETFILTER_ACCT_CFLAGS} \
                ${LIBNETFILTER_CONNTRACK_CFLAGS} ${LIBNETFILTER_LOG_CFLAGS}
  AM_CFLAGS = ${regular_CFLAGS}
-SUBDIRS= pcap mysql pgsql sqlite3 dbi
+SUBDIRS= pcap mysql pgsql sqlite3 dbi ipfix
pkglib_LTLIBRARIES = ulogd_output_LOGEMU.la ulogd_output_SYSLOG.la \
  			 ulogd_output_OPRINT.la ulogd_output_GPRINT.la \
diff --git a/output/ipfix/Makefile.am b/output/ipfix/Makefile.am
new file mode 100644
index 0000000..315f3b8
--- /dev/null
+++ b/output/ipfix/Makefile.am
@@ -0,0 +1,11 @@
+AM_CPPFLAGS = -I$(top_srcdir)/include
+AM_CFLAGS = $(regular_CFLAGS)
+
+if HAVE_IPFIX
+
+pkglib_LTLIBRARIES = ulogd_output_IPFIX.la
+
+ulogd_output_IPFIX_la_SOURCES = ulogd_output_IPFIX.c ipfix.c
+ulogd_output_IPFIX_la_LDFLAGS = -avoid-version -module
+
+endif
diff --git a/output/ipfix/ipfix.c b/output/ipfix/ipfix.c
new file mode 100644
index 0000000..d7006ca
--- /dev/null
+++ b/output/ipfix/ipfix.c
@@ -0,0 +1,153 @@
+/*
+ * ipfix.c
+ *
+ * Holger Eitzenberger, 2009.
+ */
+
+/* These forward declarations are needed since ulogd.h doesn't like to be the first */
+#include <ulogd/linuxlist.h>
+
+#define __packed		__attribute__((packed))
+#define __noreturn		__attribute__((noreturn))
+#define __cold			__attribute__((cold))

This is redefined in ulogd.h

+
+#include "ipfix.h"
+
+#include <ulogd/ulogd.h>
+#include <ulogd/common.h>
+
+struct ipfix_msg *
+ipfix_msg_alloc(size_t len, uint32_t oid)
+{
+	struct ipfix_msg *msg;
+	struct ipfix_hdr *hdr;
+
+	if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
+		return NULL;
+
+	msg = malloc(sizeof(struct ipfix_msg) + len);
+	memset(msg, 0, sizeof(struct ipfix_msg));
+	msg->tail = msg->data + IPFIX_HDRLEN;
+	msg->end = msg->data + len;
+
+	hdr = ipfix_msg_hdr(msg);
+	memset(hdr, 0, IPFIX_HDRLEN);
+	hdr->version = htons(IPFIX_VERSION);
+	hdr->oid = htonl(oid);
+
+	return msg;
+}
+
+void
+ipfix_msg_free(struct ipfix_msg *msg)
+{
+	if (!msg)
+		return;
+
+	if (msg->nrecs > 0)
+		ulogd_log(ULOGD_DEBUG, "%s: %d flows have been lost\n", __func__,
+			msg->nrecs);
+
+	free(msg);
+}
+
+struct ipfix_hdr *
+ipfix_msg_hdr(const struct ipfix_msg *msg)

No need for line break, this should be fine:

struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *msg)

+{
+	return (struct ipfix_hdr *)msg->data;
+}
+
+void *
+ipfix_msg_data(struct ipfix_msg *msg)

same here and so on.

+{
+	return msg->data;
+}
+
+size_t
+ipfix_msg_len(const struct ipfix_msg *msg)
+{
+	return msg->tail - msg->data;
+}
+
+struct ipfix_set_hdr *
+ipfix_msg_add_set(struct ipfix_msg *msg, uint16_t sid)
+{
+	struct ipfix_set_hdr *shdr;
+
+	if (msg->end - msg->tail < (int) IPFIX_SET_HDRLEN)
+		return NULL;
+
+	shdr = (struct ipfix_set_hdr *)msg->tail;
+	shdr->id = sid;
+	shdr->len = IPFIX_SET_HDRLEN;
+	msg->tail += IPFIX_SET_HDRLEN;
+	msg->last_set = shdr;
+	return shdr;
+}
+
+struct ipfix_set_hdr *
+ipfix_msg_get_set(const struct ipfix_msg *msg)
+{
+	return msg->last_set;
+}
+
+/**
+ * Add data record to an IPFIX message.  The data is accounted properly.
+ *
+ * @return pointer to data or %NULL if not that much space left.
+ */
+void *
+ipfix_msg_add_data(struct ipfix_msg *msg, size_t len)
+{
+	void *data;
+
+	if (!msg->last_set) {
+		ulogd_log(ULOGD_FATAL, "msg->last_set is NULL\n");
+		return NULL;
+	}
+
+	if ((ssize_t) len > msg->end - msg->tail)
+		return NULL;
+
+	data = msg->tail;
+	msg->tail += len;
+	msg->nrecs++;
+	msg->last_set->len += len;
+
+	return data;
+}
+
+/* check and dump message */
+int
+ipfix_dump_msg(const struct ipfix_msg *msg)
+{
+	const struct ipfix_hdr *hdr = ipfix_msg_hdr(msg);
+	const struct ipfix_set_hdr *shdr = (struct ipfix_set_hdr *) hdr->data;
+
+	if (ntohs(hdr->len) < IPFIX_HDRLEN) {
+		ulogd_log(ULOGD_FATAL, "Invalid IPFIX message header length\n");
+		return -1;
+	}
+	if (ipfix_msg_len(msg) != IPFIX_HDRLEN + ntohs(shdr->len)) {
+		ulogd_log(ULOGD_FATAL, "Invalid IPFIX message length\n");
+		return -1;
+	}
+
+	ulogd_log(ULOGD_DEBUG, "msg: ver=%#x len=%#x t=%#x seq=%#x oid=%d\n",
+			  ntohs(hdr->version), ntohs(hdr->len), htonl(hdr->time),
+			  ntohl(hdr->seqno), ntohl(hdr->oid));
+
+	return 0;
+}
+
+/* template management */
+size_t
+ipfix_rec_len(uint16_t sid)
+{
+	if (sid != htons(VY_IPFIX_SID)) {
+		ulogd_log(ULOGD_FATAL, "Invalid SID\n");
+		return 0;
+	}
+
+	return sizeof(struct vy_ipfix_data);
+}
diff --git a/output/ipfix/ipfix.h b/output/ipfix/ipfix.h
new file mode 100644
index 0000000..cdb5a6f
--- /dev/null
+++ b/output/ipfix/ipfix.h
@@ -0,0 +1,89 @@
+/*
+ * ipfix.h
+ *
+ * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>, 2009.
+ */
+#ifndef IPFIX_H
+#define IPFIX_H
+
+#include <stdint.h>
+#include <netinet/in.h>
+
+
+struct ipfix_hdr {
+#define IPFIX_VERSION			0xa
+	uint16_t version;
+	uint16_t len;
+	uint32_t time;
+	uint32_t seqno;
+	uint32_t oid;				/* Observation Domain ID */
+	uint8_t data[];
+} __packed;
+
+#define IPFIX_HDRLEN	sizeof(struct ipfix_hdr)
+
+/*
+ * IDs 0-255 are reserved for Template Sets.  IDs of Data Sets are > 255.
+ */
+struct ipfix_templ_hdr {
+	uint16_t id;
+	uint16_t cnt;
+	uint8_t data[];
+} __packed;
+
+struct ipfix_set_hdr {
+#define IPFIX_SET_TEMPL			2
+#define IPFIX_SET_OPT_TEMPL		3
+	uint16_t id;
+	uint16_t len;
+	uint8_t data[];
+} __packed;
+
+#define IPFIX_SET_HDRLEN		sizeof(struct ipfix_set_hdr)
+
+struct ipfix_msg {
+	struct llist_head link;
+	uint8_t *tail;
+	uint8_t *end;
+	unsigned nrecs;
+	struct ipfix_set_hdr *last_set;
+	uint8_t data[];
+};
+
+struct vy_ipfix_data {
+	struct in_addr saddr;
+	struct in_addr daddr;
+	uint16_t ifi_in;
+	uint16_t ifi_out;
+	uint32_t packets;
+	uint32_t bytes;
+	uint32_t start;				/* Unix time */
+	uint32_t end;				/* Unix time */
+	uint16_t sport;
+	uint16_t dport;
+	uint32_t aid;				/* Application ID */
+	uint8_t l4_proto;
+	uint8_t dscp;
+	uint16_t __padding;
+} __packed;
+
+#define VY_IPFIX_SID		256
+
+#define VY_IPFIX_FLOWS		36
+#define VY_IPFIX_PKT_LEN	(IPFIX_HDRLEN + IPFIX_SET_HDRLEN \
+							 + VY_IPFIX_FLOWS * sizeof(struct vy_ipfix_data))
+
+/* template management */
+size_t ipfix_rec_len(uint16_t);
+
+/* message handling */
+struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
+void ipfix_msg_free(struct ipfix_msg *);
+struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
+size_t ipfix_msg_len(const struct ipfix_msg *);
+void *ipfix_msg_data(struct ipfix_msg *);
+struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
+void *ipfix_msg_add_data(struct ipfix_msg *, size_t);
+int ipfix_dump_msg(const struct ipfix_msg *);
+
+#endif /* IPFIX_H */
diff --git a/output/ipfix/ulogd_output_IPFIX.c b/output/ipfix/ulogd_output_IPFIX.c
new file mode 100644
index 0000000..02bc21f
--- /dev/null
+++ b/output/ipfix/ulogd_output_IPFIX.c
@@ -0,0 +1,526 @@
+/*
+ * ulogd_output_IPFIX.c
+ *
+ * ulogd IPFIX Exporter plugin.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>  Astaro AG 2009
+ */
+#include <unistd.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ulogd/ulogd.h>
+#include <ulogd/common.h>
+
+#include "ipfix.h"
+
+#define DEFAULT_MTU		512 /* RFC 5101, 10.3.3 */
+#define DEFAULT_PORT		4739 /* RFC 5101, 10.3.4 */
+#define DEFAULT_SPORT		4740
+
+enum {
+	OID_CE = 0,
+	HOST_CE,
+	PORT_CE,
+	PROTO_CE,
+	MTU_CE,
+};
+
+#define oid_ce(x)	(x->ces[OID_CE])
+#define host_ce(x)	(x->ces[HOST_CE])
+#define port_ce(x)	(x->ces[PORT_CE])
+#define proto_ce(x)	(x->ces[PROTO_CE])
+#define mtu_ce(x)	(x->ces[MTU_CE])
+
+static const struct config_keyset ipfix_kset = {
+	.num_ces = 5,
+	.ces = {
+		{
+			.key = "oid",
+			.type = CONFIG_TYPE_INT,
+			.u.value = 0
+		},
+		{
+			.key = "host",
+			.type = CONFIG_TYPE_STRING,
+			.u.string = ""
+		},
+		{
+			.key = "port",
+			.type = CONFIG_TYPE_INT,
+			.u.value = DEFAULT_PORT
+		},
+		{
+			.key = "proto",
+			.type = CONFIG_TYPE_STRING,
+			.u.string = "tcp"
+		},
+		{
+			.key = "mtu",
+			.type = CONFIG_TYPE_INT,
+			.u.value = DEFAULT_MTU
+		}
+	}
+};
+
+struct ipfix_templ {
+	struct ipfix_templ *next;
+};
+
+struct ipfix_priv {
+	struct ulogd_fd ufd;
+	uint32_t seqno;
+	struct ipfix_msg *msg;		/* current message */
+	struct llist_head list;
+	struct ipfix_templ *templates;
+	int proto;
+	struct ulogd_timer timer;
+	struct sockaddr_in sa;
+};
+
+enum {
+	InIpSaddr = 0,
+	InIpDaddr,
+	InRawInPktCount,
+	InRawInPktLen,
+	InRawOutPktCount,
+	InRawOutPktLen,
+	InFlowStartSec,
+	InFlowStartUsec,
+	InFlowEndSec,
+	InFlowEndUsec,
+	InL4SPort,
+	InL4DPort,
+	InIpProto,
+	InCtMark
+};
+
+static struct ulogd_key ipfix_in_keys[] = {
+		[InIpSaddr] = {
+			.type = ULOGD_RET_IPADDR,
+			.name = "orig.ip.saddr"
+		},
+		[InIpDaddr] = {
+			.type = ULOGD_RET_IPADDR,
+			.name = "orig.ip.daddr"
+		},
+		[InRawInPktCount] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "orig.raw.pktcount"
+		},
+		[InRawInPktLen] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "orig.raw.pktlen"
+		},
+		[InRawOutPktCount] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "reply.raw.pktcount"
+		},
+		[InRawOutPktLen] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "reply.raw.pktlen"
+		},
+		[InFlowStartSec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.start.sec"
+		},
+		[InFlowStartUsec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.start.usec"
+		},
+		[InFlowEndSec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.end.sec"
+		},
+		[InFlowEndUsec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.end.usec"
+		},
+		[InL4SPort] = {
+			.type = ULOGD_RET_UINT16,
+			.name = "orig.l4.sport"
+		},
+		[InL4DPort] = {
+			.type = ULOGD_RET_UINT16,
+			.name = "orig.l4.dport"
+		},
+		[InIpProto] = {
+			.type = ULOGD_RET_UINT8,
+			.name = "orig.ip.protocol"
+		},
+		[InCtMark] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "ct.mark"
+		}
+};
+
+/* do some polishing and enqueue it */
+static void
+enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
+{
+	struct ipfix_hdr *hdr = ipfix_msg_data(msg);
+
+	if (!msg)
+		return;
+
+	hdr->time = htonl(time(NULL));
+	hdr->seqno = htonl(priv->seqno += msg->nrecs);
+	if (msg->last_set) {
+		msg->last_set->id = htons(msg->last_set->id);
+		msg->last_set->len = htons(msg->last_set->len);
+		msg->last_set = NULL;
+	}
+	hdr->len = htons(ipfix_msg_len(msg));
+
+	llist_add(&msg->link, &priv->list);
+}
+
+/**
+ * @return %ULOGD_IRET_OK or error value
+ */
+static int
+send_msgs(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_msg *msg;
+	struct llist_head *curr, *tmp;
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;

Please, reverse xmas tree in variable definition for new code is preferred, ie.

	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
	struct llist_head *curr, *tmp;
	struct ipfix_msg *msg;

ie. From larger line to shorter line.

+	int ret = ULOGD_IRET_OK;
+
+	llist_for_each_prev(curr, &priv->list) {
+		int ret;

This is shadowing the previous 'int ret' definition.

+		msg = llist_entry(curr, struct ipfix_msg, link);
+
+		ret = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
+		if (ret < 0) {
+			ulogd_log(ULOGD_ERROR, "send: %m\n");
+
+			if (errno == EAGAIN || errno == EINTR)

Socket is in blocking mode, I see not fcntl to make it enter
non-blocking mode, so I guess this is not needed.

+				goto done;
+			else
+				ret = ULOGD_IRET_ERR;
+
+			goto done;
+		}
+
+		/* TODO handle short send() for other protocols */
+		if ((size_t) ret < ipfix_msg_len(msg))
+			ulogd_log(ULOGD_ERROR, "short send: %d < %d\n",
+					ret, ipfix_msg_len(msg));
+	}
+
+	llist_for_each_safe(curr, tmp, &priv->list) {
+		msg = llist_entry(curr, struct ipfix_msg, link);
+		llist_del(curr);
+		msg->nrecs = 0;
+		ipfix_msg_free(msg);
+	}
+
+done:
+	return ret;
+}
+
+static int
+ipfix_ufd_cb(int fd, unsigned what, void *arg)
+{
+	struct ulogd_pluginstance *pi = arg;
+	struct ipfix_priv *priv = (struct ipfix_priv *) pi->private;
+	char buf[16];
+	ssize_t nread;
+
+	if (what & ULOGD_FD_READ) {
+		nread = recv(priv->ufd.fd, buf, sizeof(buf), MSG_DONTWAIT);
+		if (nread < 0) {
+			ulogd_log(ULOGD_ERROR, "recv: %m\n");
+			if (errno == EWOULDBLOCK || errno == EINTR)

Same comment here as above.

+				goto done;
+		} else if (!nread) {
+			ulogd_log(ULOGD_INFO, "connection reset by peer\n");
+			ulogd_unregister_fd(&priv->ufd);
+		} else
+			ulogd_log(ULOGD_INFO, "unexpected data (%d bytes)\n", nread);
+	}
+
+done:
+	return 0;
+}
+
+static void
+ipfix_timer_cb(struct ulogd_timer *t, void *data)
+{
+	struct ulogd_pluginstance *pi = data;
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+
+	if (priv->msg && priv->msg->nrecs > 0) {
+		enqueue_msg(priv, priv->msg);
+		priv->msg = NULL;
+

No need for empty line here above.

+		send_msgs(pi);
+	}
+}
+
+static int
+ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginstance_stack *stack)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	char addr[16];
+	int oid, port, mtu, ret;
+	char *host = NULL, *proto = NULL;

Reverse xmas tree here again.

No need to initialize proto and host to NULL.

+
+	ret = config_parse_file(pi->id, pi->config_kset);
+	if (ret < 0)
+		return ret;
+
+	oid = oid_ce(pi->config_kset).u.value;
+	host = host_ce(pi->config_kset).u.string;
+	port = port_ce(pi->config_kset).u.value;
+	proto = proto_ce(pi->config_kset).u.string;
+	mtu = mtu_ce(pi->config_kset).u.value;
+
+	if (!oid) {
+		ulogd_log(ULOGD_FATAL, "invalid Observation ID\n");
+		return ULOGD_IRET_ERR;
+	}
+	if (!host || !strcmp(host, "")) {
+		ulogd_log(ULOGD_FATAL, "no destination host specified\n");
+		return ULOGD_IRET_ERR;
+	}
+
+	if (!strcmp(proto, "udp")) {
+		priv->proto = IPPROTO_UDP;
+	} else if (!strcmp(proto, "tcp")) {
+		priv->proto = IPPROTO_TCP;
+	} else {
+		ulogd_log(ULOGD_FATAL, "unsupported protocol '%s'\n", proto);
+		return ULOGD_IRET_ERR;
+	}
+
+	memset(&priv->sa, 0, sizeof(priv->sa));
+	priv->sa.sin_family = AF_INET;
+	priv->sa.sin_port = htons(port);
+	ret = inet_pton(AF_INET, host, &priv->sa.sin_addr);
+	if (ret < 0) {
+		ulogd_log(ULOGD_FATAL, "inet_pton: %m\n");
+		return ULOGD_IRET_ERR;
+	} else if (!ret) {
+		ulogd_log(ULOGD_FATAL, "host: invalid address '%s'\n", host);
+		return ULOGD_IRET_ERR;
+	}

You can just probably simplify this to:

	ret = inet_pton(AF_INET, host, &priv->sa.sin_addr);
	if (ret <= 0) {
                 ...

to check for errors.

+
+	INIT_LLIST_HEAD(&priv->list);
+
+	ulogd_init_timer(&priv->timer, pi, ipfix_timer_cb);
+
+	ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
+			inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
+			port, mtu);

Better align function parameters to parens, ie.

	ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
                   inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
                   port, mtu);

Please, send a version 2.

Thanks.


>From a7ef95c7f3507270f726bf3c543f0148acc77807 Mon Sep 17 00:00:00 2001
From: Ander Juaristi <a@xxxxxxxxxxxx>
Date: Wed, 17 Apr 2019 13:43:09 +0200
Subject: [PATCH 1/2] IPFIX: Add IPFIX output plugin

Based on original work by Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>.

Signed-off-by: Ander Juaristi <a@xxxxxxxxxxxx>
---
 configure.ac                      |   2 +-
 include/ulogd/ulogd.h             |   5 +
 input/flow/ulogd_inpflow_IPFIX.c  |   2 -
 output/Makefile.am                |   2 +-
 output/ipfix/Makefile.am          |   7 +
 output/ipfix/ipfix.c              | 141 ++++++++
 output/ipfix/ipfix.h              |  89 +++++
 output/ipfix/ulogd_output_IPFIX.c | 503 +++++++++++++++++++++++++++
 output/ulogd_output_IPFIX.c       | 546 ------------------------------
 9 files changed, 747 insertions(+), 550 deletions(-)
 delete mode 100644 input/flow/ulogd_inpflow_IPFIX.c
 create mode 100644 output/ipfix/Makefile.am
 create mode 100644 output/ipfix/ipfix.c
 create mode 100644 output/ipfix/ipfix.h
 create mode 100644 output/ipfix/ulogd_output_IPFIX.c
 delete mode 100644 output/ulogd_output_IPFIX.c

diff --git a/configure.ac b/configure.ac
index 3aa0624..48b4995 100644
--- a/configure.ac
+++ b/configure.ac
@@ -179,7 +179,7 @@ AC_CONFIG_FILES(include/Makefile include/ulogd/Makefile include/libipulog/Makefi
 	  input/sum/Makefile \
 	  filter/Makefile filter/raw2packet/Makefile filter/packet2flow/Makefile \
 	  output/Makefile output/pcap/Makefile output/mysql/Makefile output/pgsql/Makefile output/sqlite3/Makefile \
-	  output/dbi/Makefile \
+	  output/dbi/Makefile output/ipfix/Makefile \
 	  src/Makefile Makefile Rules.make)
 AC_OUTPUT
 
diff --git a/include/ulogd/ulogd.h b/include/ulogd/ulogd.h
index 2e38195..1636a8c 100644
--- a/include/ulogd/ulogd.h
+++ b/include/ulogd/ulogd.h
@@ -28,6 +28,11 @@
 
 /* types without length */
 #define ULOGD_RET_NONE		0x0000
+#define __packed		__attribute__((packed))
+#define __noreturn		__attribute__((noreturn))
+#define __cold			__attribute__((cold))
+
+#define __packed		__attribute__((packed))
 
 #define ULOGD_RET_INT8		0x0001
 #define ULOGD_RET_INT16		0x0002
diff --git a/input/flow/ulogd_inpflow_IPFIX.c b/input/flow/ulogd_inpflow_IPFIX.c
deleted file mode 100644
index 27ce5b2..0000000
--- a/input/flow/ulogd_inpflow_IPFIX.c
+++ /dev/null
@@ -1,2 +0,0 @@
-/* */
-
diff --git a/output/Makefile.am b/output/Makefile.am
index ff851ad..7ba8217 100644
--- a/output/Makefile.am
+++ b/output/Makefile.am
@@ -2,7 +2,7 @@ AM_CPPFLAGS = -I$(top_srcdir)/include ${LIBNETFILTER_ACCT_CFLAGS} \
               ${LIBNETFILTER_CONNTRACK_CFLAGS} ${LIBNETFILTER_LOG_CFLAGS}
 AM_CFLAGS = ${regular_CFLAGS}
 
-SUBDIRS= pcap mysql pgsql sqlite3 dbi
+SUBDIRS= pcap mysql pgsql sqlite3 dbi ipfix
 
 pkglib_LTLIBRARIES = ulogd_output_LOGEMU.la ulogd_output_SYSLOG.la \
 			 ulogd_output_OPRINT.la ulogd_output_GPRINT.la \
diff --git a/output/ipfix/Makefile.am b/output/ipfix/Makefile.am
new file mode 100644
index 0000000..cacda26
--- /dev/null
+++ b/output/ipfix/Makefile.am
@@ -0,0 +1,7 @@
+AM_CPPFLAGS = -I$(top_srcdir)/include
+AM_CFLAGS = $(regular_CFLAGS)
+
+pkglib_LTLIBRARIES = ulogd_output_IPFIX.la
+
+ulogd_output_IPFIX_la_SOURCES = ulogd_output_IPFIX.c ipfix.c
+ulogd_output_IPFIX_la_LDFLAGS = -avoid-version -module
diff --git a/output/ipfix/ipfix.c b/output/ipfix/ipfix.c
new file mode 100644
index 0000000..60a4c7f
--- /dev/null
+++ b/output/ipfix/ipfix.c
@@ -0,0 +1,141 @@
+/*
+ * ipfix.c
+ *
+ * Holger Eitzenberger, 2009.
+ */
+
+/* These forward declarations are needed since ulogd.h doesn't like to be the first */
+#include <ulogd/linuxlist.h>
+
+#define __packed		__attribute__((packed))
+
+#include "ipfix.h"
+
+#include <ulogd/ulogd.h>
+#include <ulogd/common.h>
+
+struct ipfix_msg *ipfix_msg_alloc(size_t len, uint32_t oid)
+{
+	struct ipfix_msg *msg;
+	struct ipfix_hdr *hdr;
+
+	if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
+		return NULL;
+
+	msg = malloc(sizeof(struct ipfix_msg) + len);
+	memset(msg, 0, sizeof(struct ipfix_msg));
+	msg->tail = msg->data + IPFIX_HDRLEN;
+	msg->end = msg->data + len;
+
+	hdr = ipfix_msg_hdr(msg);
+	memset(hdr, 0, IPFIX_HDRLEN);
+	hdr->version = htons(IPFIX_VERSION);
+	hdr->oid = htonl(oid);
+
+	return msg;
+}
+
+void ipfix_msg_free(struct ipfix_msg *msg)
+{
+	if (!msg)
+		return;
+
+	if (msg->nrecs > 0)
+		ulogd_log(ULOGD_DEBUG, "%s: %d flows have been lost\n", __func__,
+			msg->nrecs);
+
+	free(msg);
+}
+
+struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *msg)
+{
+	return (struct ipfix_hdr *)msg->data;
+}
+
+void *ipfix_msg_data(struct ipfix_msg *msg)
+{
+	return msg->data;
+}
+
+size_t ipfix_msg_len(const struct ipfix_msg *msg)
+{
+	return msg->tail - msg->data;
+}
+
+struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *msg, uint16_t sid)
+{
+	struct ipfix_set_hdr *shdr;
+
+	if (msg->end - msg->tail < (int) IPFIX_SET_HDRLEN)
+		return NULL;
+
+	shdr = (struct ipfix_set_hdr *)msg->tail;
+	shdr->id = sid;
+	shdr->len = IPFIX_SET_HDRLEN;
+	msg->tail += IPFIX_SET_HDRLEN;
+	msg->last_set = shdr;
+	return shdr;
+}
+
+struct ipfix_set_hdr *ipfix_msg_get_set(const struct ipfix_msg *msg)
+{
+	return msg->last_set;
+}
+
+/**
+ * Add data record to an IPFIX message.  The data is accounted properly.
+ *
+ * @return pointer to data or %NULL if not that much space left.
+ */
+void *ipfix_msg_add_data(struct ipfix_msg *msg, size_t len)
+{
+	void *data;
+
+	if (!msg->last_set) {
+		ulogd_log(ULOGD_FATAL, "msg->last_set is NULL\n");
+		return NULL;
+	}
+
+	if ((ssize_t) len > msg->end - msg->tail)
+		return NULL;
+
+	data = msg->tail;
+	msg->tail += len;
+	msg->nrecs++;
+	msg->last_set->len += len;
+
+	return data;
+}
+
+/* check and dump message */
+int ipfix_dump_msg(const struct ipfix_msg *msg)
+{
+	const struct ipfix_hdr *hdr = ipfix_msg_hdr(msg);
+	const struct ipfix_set_hdr *shdr = (struct ipfix_set_hdr *) hdr->data;
+
+	if (ntohs(hdr->len) < IPFIX_HDRLEN) {
+		ulogd_log(ULOGD_FATAL, "Invalid IPFIX message header length\n");
+		return -1;
+	}
+	if (ipfix_msg_len(msg) != IPFIX_HDRLEN + ntohs(shdr->len)) {
+		ulogd_log(ULOGD_FATAL, "Invalid IPFIX message length\n");
+		return -1;
+	}
+
+	ulogd_log(ULOGD_DEBUG, "msg: ver=%#x len=%#x t=%#x seq=%#x oid=%d\n",
+			  ntohs(hdr->version), ntohs(hdr->len), htonl(hdr->time),
+			  ntohl(hdr->seqno), ntohl(hdr->oid));
+
+	return 0;
+}
+
+/* template management */
+size_t ipfix_rec_len(uint16_t sid)
+{
+	if (sid != htons(VY_IPFIX_SID)) {
+		ulogd_log(ULOGD_FATAL, "Invalid SID\n");
+		return 0;
+	}
+
+	return sizeof(struct vy_ipfix_data);
+}
diff --git a/output/ipfix/ipfix.h b/output/ipfix/ipfix.h
new file mode 100644
index 0000000..cdb5a6f
--- /dev/null
+++ b/output/ipfix/ipfix.h
@@ -0,0 +1,89 @@
+/*
+ * ipfix.h
+ *
+ * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>, 2009.
+ */
+#ifndef IPFIX_H
+#define IPFIX_H
+
+#include <stdint.h>
+#include <netinet/in.h>
+
+
+struct ipfix_hdr {
+#define IPFIX_VERSION			0xa
+	uint16_t version;
+	uint16_t len;
+	uint32_t time;
+	uint32_t seqno;
+	uint32_t oid;				/* Observation Domain ID */
+	uint8_t data[];
+} __packed;
+
+#define IPFIX_HDRLEN	sizeof(struct ipfix_hdr)
+
+/*
+ * IDs 0-255 are reserved for Template Sets.  IDs of Data Sets are > 255.
+ */
+struct ipfix_templ_hdr {
+	uint16_t id;
+	uint16_t cnt;
+	uint8_t data[];
+} __packed;
+
+struct ipfix_set_hdr {
+#define IPFIX_SET_TEMPL			2
+#define IPFIX_SET_OPT_TEMPL		3
+	uint16_t id;
+	uint16_t len;
+	uint8_t data[];
+} __packed;
+
+#define IPFIX_SET_HDRLEN		sizeof(struct ipfix_set_hdr)
+
+struct ipfix_msg {
+	struct llist_head link;
+	uint8_t *tail;
+	uint8_t *end;
+	unsigned nrecs;
+	struct ipfix_set_hdr *last_set;
+	uint8_t data[];
+};
+
+struct vy_ipfix_data {
+	struct in_addr saddr;
+	struct in_addr daddr;
+	uint16_t ifi_in;
+	uint16_t ifi_out;
+	uint32_t packets;
+	uint32_t bytes;
+	uint32_t start;				/* Unix time */
+	uint32_t end;				/* Unix time */
+	uint16_t sport;
+	uint16_t dport;
+	uint32_t aid;				/* Application ID */
+	uint8_t l4_proto;
+	uint8_t dscp;
+	uint16_t __padding;
+} __packed;
+
+#define VY_IPFIX_SID		256
+
+#define VY_IPFIX_FLOWS		36
+#define VY_IPFIX_PKT_LEN	(IPFIX_HDRLEN + IPFIX_SET_HDRLEN \
+							 + VY_IPFIX_FLOWS * sizeof(struct vy_ipfix_data))
+
+/* template management */
+size_t ipfix_rec_len(uint16_t);
+
+/* message handling */
+struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
+void ipfix_msg_free(struct ipfix_msg *);
+struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
+size_t ipfix_msg_len(const struct ipfix_msg *);
+void *ipfix_msg_data(struct ipfix_msg *);
+struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
+void *ipfix_msg_add_data(struct ipfix_msg *, size_t);
+int ipfix_dump_msg(const struct ipfix_msg *);
+
+#endif /* IPFIX_H */
diff --git a/output/ipfix/ulogd_output_IPFIX.c b/output/ipfix/ulogd_output_IPFIX.c
new file mode 100644
index 0000000..ec143b1
--- /dev/null
+++ b/output/ipfix/ulogd_output_IPFIX.c
@@ -0,0 +1,503 @@
+/*
+ * ulogd_output_IPFIX.c
+ *
+ * ulogd IPFIX Exporter plugin.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>  Astaro AG 2009
+ */
+#include <unistd.h>
+#include <time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+#include <ulogd/ulogd.h>
+#include <ulogd/common.h>
+
+#include "ipfix.h"
+
+#define DEFAULT_MTU		512 /* RFC 5101, 10.3.3 */
+#define DEFAULT_PORT		4739 /* RFC 5101, 10.3.4 */
+#define DEFAULT_SPORT		4740
+
+enum {
+	OID_CE = 0,
+	HOST_CE,
+	PORT_CE,
+	PROTO_CE,
+	MTU_CE,
+};
+
+#define oid_ce(x)	(x->ces[OID_CE])
+#define host_ce(x)	(x->ces[HOST_CE])
+#define port_ce(x)	(x->ces[PORT_CE])
+#define proto_ce(x)	(x->ces[PROTO_CE])
+#define mtu_ce(x)	(x->ces[MTU_CE])
+
+static const struct config_keyset ipfix_kset = {
+	.num_ces = 5,
+	.ces = {
+		{
+			.key = "oid",
+			.type = CONFIG_TYPE_INT,
+			.u.value = 0
+		},
+		{
+			.key = "host",
+			.type = CONFIG_TYPE_STRING,
+			.u.string = ""
+		},
+		{
+			.key = "port",
+			.type = CONFIG_TYPE_INT,
+			.u.value = DEFAULT_PORT
+		},
+		{
+			.key = "proto",
+			.type = CONFIG_TYPE_STRING,
+			.u.string = "tcp"
+		},
+		{
+			.key = "mtu",
+			.type = CONFIG_TYPE_INT,
+			.u.value = DEFAULT_MTU
+		}
+	}
+};
+
+struct ipfix_templ {
+	struct ipfix_templ *next;
+};
+
+struct ipfix_priv {
+	struct ulogd_fd ufd;
+	uint32_t seqno;
+	struct ipfix_msg *msg;		/* current message */
+	struct llist_head list;
+	struct ipfix_templ *templates;
+	int proto;
+	struct ulogd_timer timer;
+	struct sockaddr_in sa;
+};
+
+enum {
+	InIpSaddr = 0,
+	InIpDaddr,
+	InRawInPktCount,
+	InRawInPktLen,
+	InRawOutPktCount,
+	InRawOutPktLen,
+	InFlowStartSec,
+	InFlowStartUsec,
+	InFlowEndSec,
+	InFlowEndUsec,
+	InL4SPort,
+	InL4DPort,
+	InIpProto,
+	InCtMark
+};
+
+static struct ulogd_key ipfix_in_keys[] = {
+		[InIpSaddr] = {
+			.type = ULOGD_RET_IPADDR,
+			.name = "orig.ip.saddr"
+		},
+		[InIpDaddr] = {
+			.type = ULOGD_RET_IPADDR,
+			.name = "orig.ip.daddr"
+		},
+		[InRawInPktCount] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "orig.raw.pktcount"
+		},
+		[InRawInPktLen] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "orig.raw.pktlen"
+		},
+		[InRawOutPktCount] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "reply.raw.pktcount"
+		},
+		[InRawOutPktLen] = {
+			.type = ULOGD_RET_UINT64,
+			.name = "reply.raw.pktlen"
+		},
+		[InFlowStartSec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.start.sec"
+		},
+		[InFlowStartUsec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.start.usec"
+		},
+		[InFlowEndSec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.end.sec"
+		},
+		[InFlowEndUsec] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "flow.end.usec"
+		},
+		[InL4SPort] = {
+			.type = ULOGD_RET_UINT16,
+			.name = "orig.l4.sport"
+		},
+		[InL4DPort] = {
+			.type = ULOGD_RET_UINT16,
+			.name = "orig.l4.dport"
+		},
+		[InIpProto] = {
+			.type = ULOGD_RET_UINT8,
+			.name = "orig.ip.protocol"
+		},
+		[InCtMark] = {
+			.type = ULOGD_RET_UINT32,
+			.name = "ct.mark"
+		}
+};
+
+/* do some polishing and enqueue it */
+static void enqueue_msg(struct ipfix_priv *priv, struct ipfix_msg *msg)
+{
+	struct ipfix_hdr *hdr = ipfix_msg_data(msg);
+
+	if (!msg)
+		return;
+
+	hdr->time = htonl(time(NULL));
+	hdr->seqno = htonl(priv->seqno += msg->nrecs);
+	if (msg->last_set) {
+		msg->last_set->id = htons(msg->last_set->id);
+		msg->last_set->len = htons(msg->last_set->len);
+		msg->last_set = NULL;
+	}
+	hdr->len = htons(ipfix_msg_len(msg));
+
+	llist_add(&msg->link, &priv->list);
+}
+
+/**
+ * @return %ULOGD_IRET_OK or error value
+ */
+static int send_msgs(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	struct llist_head *curr, *tmp;
+	struct ipfix_msg *msg;
+	int ret = ULOGD_IRET_OK, sent;
+
+	llist_for_each_prev(curr, &priv->list) {
+		msg = llist_entry(curr, struct ipfix_msg, link);
+
+		sent = send(priv->ufd.fd, ipfix_msg_data(msg), ipfix_msg_len(msg), 0);
+		if (sent < 0) {
+			ulogd_log(ULOGD_ERROR, "send: %m\n");
+			ret = ULOGD_IRET_ERR;
+			goto done;
+		}
+
+		/* TODO handle short send() for other protocols */
+		if ((size_t) sent < ipfix_msg_len(msg))
+			ulogd_log(ULOGD_ERROR, "short send: %d < %d\n",
+					sent, ipfix_msg_len(msg));
+	}
+
+	llist_for_each_safe(curr, tmp, &priv->list) {
+		msg = llist_entry(curr, struct ipfix_msg, link);
+		llist_del(curr);
+		msg->nrecs = 0;
+		ipfix_msg_free(msg);
+	}
+
+done:
+	return ret;
+}
+
+static int ipfix_ufd_cb(int fd, unsigned what, void *arg)
+{
+	struct ulogd_pluginstance *pi = arg;
+	struct ipfix_priv *priv = (struct ipfix_priv *) pi->private;
+	ssize_t nread;
+	char buf[16];
+
+	if (what & ULOGD_FD_READ) {
+		nread = recv(priv->ufd.fd, buf, sizeof(buf), MSG_DONTWAIT);
+		if (nread < 0) {
+			ulogd_log(ULOGD_ERROR, "recv: %m\n");
+		} else if (!nread) {
+			ulogd_log(ULOGD_INFO, "connection reset by peer\n");
+			ulogd_unregister_fd(&priv->ufd);
+		} else
+			ulogd_log(ULOGD_INFO, "unexpected data (%d bytes)\n", nread);
+	}
+
+	return 0;
+}
+
+static void ipfix_timer_cb(struct ulogd_timer *t, void *data)
+{
+	struct ulogd_pluginstance *pi = data;
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+
+	if (priv->msg && priv->msg->nrecs > 0) {
+		enqueue_msg(priv, priv->msg);
+		priv->msg = NULL;
+		send_msgs(pi);
+	}
+}
+
+static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginstance_stack *stack)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	int oid, port, mtu, ret;
+	char *host, *proto;
+	char addr[16];
+
+	ret = config_parse_file(pi->id, pi->config_kset);
+	if (ret < 0)
+		return ret;
+
+	oid = oid_ce(pi->config_kset).u.value;
+	host = host_ce(pi->config_kset).u.string;
+	port = port_ce(pi->config_kset).u.value;
+	proto = proto_ce(pi->config_kset).u.string;
+	mtu = mtu_ce(pi->config_kset).u.value;
+
+	if (!oid) {
+		ulogd_log(ULOGD_FATAL, "invalid Observation ID\n");
+		return ULOGD_IRET_ERR;
+	}
+	if (!host || !strcmp(host, "")) {
+		ulogd_log(ULOGD_FATAL, "no destination host specified\n");
+		return ULOGD_IRET_ERR;
+	}
+
+	if (!strcmp(proto, "udp")) {
+		priv->proto = IPPROTO_UDP;
+	} else if (!strcmp(proto, "tcp")) {
+		priv->proto = IPPROTO_TCP;
+	} else {
+		ulogd_log(ULOGD_FATAL, "unsupported protocol '%s'\n", proto);
+		return ULOGD_IRET_ERR;
+	}
+
+	memset(&priv->sa, 0, sizeof(priv->sa));
+	priv->sa.sin_family = AF_INET;
+	priv->sa.sin_port = htons(port);
+	ret = inet_pton(AF_INET, host, &priv->sa.sin_addr);
+	if (ret <= 0) {
+		ulogd_log(ULOGD_FATAL, "inet_pton: %m\n");
+		return ULOGD_IRET_ERR;
+	}
+
+	INIT_LLIST_HEAD(&priv->list);
+
+	ulogd_init_timer(&priv->timer, pi, ipfix_timer_cb);
+
+	ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
+		  inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
+		  port, mtu);
+
+	return ULOGD_IRET_OK;
+}
+
+static int tcp_connect(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	int ret = ULOGD_IRET_ERR;
+
+	if ((priv->ufd.fd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
+		ulogd_log(ULOGD_FATAL, "socket: %m\n");
+		return ULOGD_IRET_ERR;
+	}
+
+	if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
+		ulogd_log(ULOGD_ERROR, "connect: %m\n");
+		ret = ULOGD_IRET_ERR;
+		goto err_close;
+	}
+
+	return ULOGD_IRET_OK;
+
+err_close:
+	close(priv->ufd.fd);
+	return ret;
+}
+
+static int udp_connect(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+
+	if ((priv->ufd.fd = socket(AF_INET, SOCK_DGRAM, 0)) < 0) {
+		ulogd_log(ULOGD_FATAL, "socket: %m\n");
+		return ULOGD_IRET_ERR;
+	}
+
+	if (connect(priv->ufd.fd, (struct sockaddr *) &priv->sa, sizeof(priv->sa)) < 0) {
+		ulogd_log(ULOGD_ERROR, "connect: %m\n");
+		return ULOGD_IRET_ERR;
+	}
+
+	return 0;
+}
+
+static int ipfix_start(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	char addr[16];
+	int port, ret;
+
+	switch (priv->proto) {
+	case IPPROTO_UDP:
+		if ((ret = udp_connect(pi)) < 0)
+			return ret;
+		break;
+	case IPPROTO_TCP:
+		if ((ret = tcp_connect(pi)) < 0)
+			return ret;
+		break;
+
+	default:
+		break;
+	}
+
+	priv->seqno = 0;
+
+	port = port_ce(pi->config_kset).u.value;
+	ulogd_log(ULOGD_INFO, "connected to %s:%d\n",
+			inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
+			port);
+
+	/* Register the socket FD */
+	priv->ufd.when = ULOGD_FD_READ;
+	priv->ufd.cb = ipfix_ufd_cb;
+	priv->ufd.data = pi;
+
+	if (ulogd_register_fd(&priv->ufd) < 0)
+		return ULOGD_IRET_ERR;
+
+	/* Add a 1 second timer */
+	ulogd_add_timer(&priv->timer, 1);
+
+	return ULOGD_IRET_OK;
+}
+
+static int ipfix_stop(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+
+	ulogd_unregister_fd(&priv->ufd);
+	close(priv->ufd.fd);
+	priv->ufd.fd = -1;
+
+	ulogd_del_timer(&priv->timer);
+
+	ipfix_msg_free(priv->msg);
+	priv->msg = NULL;
+
+	return 0;
+}
+
+static int ipfix_interp(struct ulogd_pluginstance *pi)
+{
+	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	struct vy_ipfix_data *data;
+	int oid, mtu, ret;
+	char addr[16];
+
+	if (!(GET_FLAGS(pi->input.keys, InIpSaddr) & ULOGD_RETF_VALID))
+		return ULOGD_IRET_OK;
+
+	oid = oid_ce(pi->config_kset).u.value;
+	mtu = mtu_ce(pi->config_kset).u.value;
+
+again:
+	if (!priv->msg) {
+		priv->msg = ipfix_msg_alloc(mtu, oid);
+		if (!priv->msg) {
+			/* just drop this flow */
+			ulogd_log(ULOGD_ERROR, "out of memory, dropping flow\n");
+			return ULOGD_IRET_OK;
+		}
+		ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
+	}
+
+	data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
+	if (!data) {
+		enqueue_msg(priv, priv->msg);
+		priv->msg = NULL;
+		/* can't loop because the next will definitely succeed */
+		goto again;
+	}
+
+	data->ifi_in = data->ifi_out = 0;
+
+	data->saddr.s_addr = ikey_get_u32(&pi->input.keys[InIpSaddr]);
+	data->daddr.s_addr = ikey_get_u32(&pi->input.keys[InIpDaddr]);
+
+	data->packets = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktCount])
+						+ ikey_get_u64(&pi->input.keys[InRawOutPktCount])));
+	data->bytes = htonl((uint32_t) (ikey_get_u64(&pi->input.keys[InRawInPktLen])
+						+ ikey_get_u64(&pi->input.keys[InRawOutPktLen])));
+
+	data->start = htonl(ikey_get_u32(&pi->input.keys[InFlowStartSec]));
+	data->end = htonl(ikey_get_u32(&pi->input.keys[InFlowEndSec]));
+
+	if (GET_FLAGS(pi->input.keys, InL4SPort) & ULOGD_RETF_VALID) {
+		data->sport = htons(ikey_get_u16(&pi->input.keys[InL4SPort]));
+		data->dport = htons(ikey_get_u16(&pi->input.keys[InL4DPort]));
+	}
+
+	data->aid = 0;
+	if (GET_FLAGS(pi->input.keys, InCtMark) & ULOGD_RETF_VALID)
+		data->aid = htonl(ikey_get_u32(&pi->input.keys[InCtMark]));
+
+	data->l4_proto = ikey_get_u8(&pi->input.keys[InIpProto]);
+	data->__padding = 0;
+
+	ulogd_log(ULOGD_DEBUG, "Got new packet (packets = %u, bytes = %u, flow = (%u, %u), saddr = %s, daddr = %s, sport = %u, dport = %u)\n",
+			ntohl(data->packets), ntohl(data->bytes), ntohl(data->start), ntohl(data->end),
+			inet_ntop(AF_INET, &data->saddr.s_addr, addr, sizeof(addr)),
+			inet_ntop(AF_INET, &data->daddr.s_addr, addr, sizeof(addr)),
+			ntohs(data->sport), ntohs(data->dport));
+
+	if ((ret = send_msgs(pi)) < 0)
+		return ret;
+
+	return ULOGD_IRET_OK;
+}
+
+static struct ulogd_plugin ipfix_plugin = {
+	.name = "IPFIX",
+	.input = {
+		.keys = ipfix_in_keys,
+		.num_keys = ARRAY_SIZE(ipfix_in_keys),
+		.type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW | ULOGD_DTYPE_SUM
+	},
+	.output = {
+		.type = ULOGD_DTYPE_SINK
+	},
+	.config_kset = (struct config_keyset *) &ipfix_kset,
+	.priv_size = sizeof(struct ipfix_priv),
+	.configure = ipfix_configure,
+	.start = ipfix_start,
+	.stop = ipfix_stop,
+	.interp = ipfix_interp,
+	.version = VERSION,
+};
+
+void __attribute__ ((constructor)) init(void);
+
+void init(void)
+{
+	ulogd_register_plugin(&ipfix_plugin);
+}
diff --git a/output/ulogd_output_IPFIX.c b/output/ulogd_output_IPFIX.c
deleted file mode 100644
index 62f1d60..0000000
--- a/output/ulogd_output_IPFIX.c
+++ /dev/null
@@ -1,546 +0,0 @@
-/* ulogd_output_IPFIX.c
- *
- * ulogd output plugin for IPFIX
- *
- * This target produces a file which looks the same like the syslog-entries
- * of the LOG target.
- *
- * (C) 2005 by Harald Welte <laforge@xxxxxxxxxxxx>
- *
- *  This program is free software; you can redistribute it and/or modify
- *  it under the terms of the GNU General Public License version 2 
- *  as published by the Free Software Foundation
- *
- *  This program is distributed in the hope that it will be useful,
- *  but WITHOUT ANY WARRANTY; without even the implied warranty of
- *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *  GNU General Public License for more details.
- *
- *  You should have received a copy of the GNU General Public License
- *  along with this program; if not, write to the Free Software
- *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * TODO:
- * - where to get a useable <sctp.h> for linux ?
- * - implement PR-SCTP (no api definition in draft sockets api)
- *
- */
-
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#include <errno.h>
-
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netdb.h>
-
-#include <ulogd/linuxlist.h>
-
-#ifdef IPPROTO_SCTP
-/* temporarily disable sctp until we know which headers to use */
-#undef IPPROTO_SCTP
-#endif
-
-#ifdef IPPROTO_SCTP
-typedef uint32_t sctp_assoc_t;
-
-/* glibc doesn't yet have this, as defined by
- * draft-ietf-tsvwg-sctpsocket-11.txt */
-struct sctp_sndrcvinfo {
-	uint16_t	sinfo_stream;
-	uint16_t	sinfo_ssn;
-	uint16_t	sinfo_flags;
-	uint32_t	sinfo_ppid;
-	uint32_t	sinfo_context;
-	uint32_t	sinfo_timetolive;
-	uint32_t	sinfo_tsn;
-	uint32_t	sinfo_cumtsn;
-	sctp_assoc_t	sinfo_assoc_id;
-};
-#endif
-
-#include <ulogd/ulogd.h>
-#include <ulogd/conffile.h>
-#include <ulogd/linuxlist.h>
-#include <ulogd/ipfix_protocol.h>
-
-#define IPFIX_DEFAULT_TCPUDP_PORT	4739
-
-/* bitmask stuff */
-struct bitmask {
-	int size_bits;
-	char *buf;
-};
-
-#define SIZE_OCTETS(x)	((x/8)+1)
-
-void bitmask_clear(struct bitmask *bm)
-{
-	memset(bm->buf, 0, SIZE_OCTETS(bm->size_bits));
-}
-
-struct bitmask *bitmask_alloc(unsigned int num_bits)
-{
-	struct bitmask *bm;
-	unsigned int size_octets = SIZE_OCTETS(num_bits);
-
-	bm = malloc(sizeof(*bm) + size_octets);
-	if (!bm)
-		return NULL;
-
-	bm->size_bits = num_bits;
-	bm->buf = (void *)bm + sizeof(*bm);
-
-	bitmask_clear(bm);
-
-	return bm;
-}
-
-void bitmask_free(struct bitmask *bm)
-{
-	free(bm);
-}
-
-int bitmask_set_bit_to(struct bitmask *bm, unsigned int bits, int to)
-{
-	unsigned int byte = bits / 8;
-	unsigned int bit = bits % 8;
-	unsigned char *ptr;
-
-	if (byte > SIZE_OCTETS(bm->size_bits))
-		return -EINVAL;
-
-	if (to == 0)
-		bm->buf[byte] &= ~(1 << bit);
-	else
-		bm->buf[byte] |= (1 << bit);
-
-	return 0;
-}
-
-#define bitmask_clear_bit(bm, bit) \
-	bitmask_set_bit_to(bm, bit, 0)
-
-#define bitmask_set_bit(bm, bit) \
-	bitmask_set_bit_to(bm, bit, 1)
-
-int bitmasks_equal(const struct bitmask *bm1, const struct bitmask *bm2)
-{
-	if (bm1->size_bits != bm2->size_bits)
-		return -1;
-
-	if (!memcmp(bm1->buf, bm2->buf, SIZE_OCTETS(bm1->size_bits)))
-		return 1;
-	else
-		return 0;
-}
-
-struct bitmask *bitmask_dup(const struct bitmask *bm_orig)
-{
-	struct bitmask *bm_new;
-	int size = sizeof(*bm_new) + SIZE_OCTETS(bm_orig->size_bits);
-
-	bm_new = malloc(size);
-	if (!bm_new)
-		return NULL;
-
-	memcpy(bm_new, bm_orig, size);
-
-	return bm_new;
-}
-
-static struct config_keyset ipfix_kset = {
-	.num_ces = 3,
-	.ces = {
-		{
-			.key 	 = "host",
-			.type	 = CONFIG_TYPE_STRING,
-			.options = CONFIG_OPT_NONE,
-		},
-		{
-			.key	 = "port",
-			.type	 = CONFIG_TYPE_STRING,
-			.options = CONFIG_OPT_NONE,
-			.u	 = { .string = "4739" },
-		},
-		{
-			.key	 = "protocol",
-			.type	 = CONFIG_TYPE_STRING,
-			.options = CONFIG_OPT_NONE,
-			.u	= { .string = "udp" },
-		},
-	},
-};
-
-#define host_ce(x)	(x->ces[0])
-#define port_ce(x)	(x->ces[1])
-#define proto_ce(x)	(x->ces[2])
-
-struct ipfix_template {
-	struct ipfix_templ_rec_hdr hdr;
-	char buf[0];
-};
-
-struct ulogd_ipfix_template {
-	struct llist_head list;
-	struct bitmask *bitmask;
-	unsigned int total_length;	/* length of the DATA */
-	char *tmpl_cur;		/* cursor into current template position */
-	struct ipfix_template tmpl;
-};
-
-struct ipfix_instance {
-	int fd;		/* socket that we use for sending IPFIX data */
-	int sock_type;	/* type (SOCK_*) */
-	int sock_proto;	/* protocol (IPPROTO_*) */
-
-	struct llist_head template_list;
-
-	struct ipfix_template *tmpl;
-	unsigned int tmpl_len;
-
-	struct bitmask *valid_bitmask;	/* bitmask of valid keys */
-
-	unsigned int total_length;	/* total size of all data elements */
-};
-
-#define ULOGD_IPFIX_TEMPL_BASE 1024
-static uint16_t next_template_id = ULOGD_IPFIX_TEMPL_BASE;
-
-/* Build the IPFIX template from the input keys */
-struct ulogd_ipfix_template *
-build_template_for_bitmask(struct ulogd_pluginstance *upi,
-			   struct bitmask *bm)
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &upi->private;
-	struct ipfix_templ_rec_hdr *rhdr;
-	struct ulogd_ipfix_template *tmpl;
-	unsigned int i, j;
-	int size = sizeof(struct ulogd_ipfix_template)
-		   + (upi->input.num_keys * sizeof(struct ipfix_vendor_field));
-
-	tmpl = malloc(size);
-	if (!tmpl)
-		return NULL;
-	memset(tmpl, 0, size);
-
-	tmpl->bitmask = bitmask_dup(bm);
-	if (!tmpl->bitmask) {
-		free(tmpl);
-		return NULL;
-	}
-
-	/* initialize template header */
-	tmpl->tmpl.hdr.templ_id = htons(next_template_id++);
-
-	tmpl->tmpl_cur = tmpl->tmpl.buf;
-
-	tmpl->total_length = 0;
-
-	for (i = 0, j = 0; i < upi->input.num_keys; i++) {
-		struct ulogd_key *key = &upi->input.keys[i];
-		int length = ulogd_key_size(key);
-
-		if (!(key->u.source->flags & ULOGD_RETF_VALID))
-			continue;
-
-		if (length < 0 || length > 0xfffe) {
-			ulogd_log(ULOGD_INFO, "ignoring key `%s' because "
-				  "it has an ipfix incompatible length\n",
-				  key->name);
-			continue;
-		}
-
-		if (key->ipfix.field_id == 0) {
-			ulogd_log(ULOGD_INFO, "ignoring key `%s' because "
-				  "it has no field_id\n", key->name);
-			continue;
-		}
-
-		if (key->ipfix.vendor == IPFIX_VENDOR_IETF) {
-			struct ipfix_ietf_field *field = 
-				(struct ipfix_ietf_field *) tmpl->tmpl_cur;
-
-			field->type = htons(key->ipfix.field_id | 0x8000000);
-			field->length = htons(length);
-			tmpl->tmpl_cur += sizeof(*field);
-		} else {
-			struct ipfix_vendor_field *field =
-				(struct ipfix_vendor_field *) tmpl->tmpl_cur;
-
-			field->enterprise_num = htonl(key->ipfix.vendor);
-			field->type = htons(key->ipfix.field_id);
-			field->length = htons(length);
-			tmpl->tmpl_cur += sizeof(*field);
-		}
-		tmpl->total_length += length;
-		j++;
-	}
-
-	tmpl->tmpl.hdr.field_count = htons(j);
-
-	return tmpl;
-}
-
-
-
-static struct ulogd_ipfix_template *
-find_template_for_bitmask(struct ulogd_pluginstance *upi,
-			  struct bitmask *bm)
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &upi->private;
-	struct ulogd_ipfix_template *tmpl;
-	
-	/* FIXME: this can be done more efficient! */
-	llist_for_each_entry(tmpl, &ii->template_list, list) {
-		if (bitmasks_equal(bm, tmpl->bitmask))
-			return tmpl;
-	}
-	return NULL;
-}
-
-static int output_ipfix(struct ulogd_pluginstance *upi)
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &upi->private;
-	struct ulogd_ipfix_template *template;
-	unsigned int total_size;
-	int i;
-
-	/* FIXME: it would be more cache efficient if the IS_VALID
-	 * flags would be a separate bitmask outside of the array.
-	 * ulogd core could very easily flush it after every packet,
-	 * too. */
-
-	bitmask_clear(ii->valid_bitmask);
-
-	for (i = 0; i < upi->input.num_keys; i++) {
-		struct ulogd_key *key = upi->input.keys[i].u.source;
-
-		if (key->flags & ULOGD_RETF_VALID)
-			bitmask_set_bit(ii->valid_bitmask, i);
-	}
-	
-	/* lookup template ID for this bitmask */
-	template = find_template_for_bitmask(upi, ii->valid_bitmask);
-	if (!template) {
-		ulogd_log(ULOGD_INFO, "building new template\n");
-		template = build_template_for_bitmask(upi, ii->valid_bitmask);
-		if (!template) {
-			ulogd_log(ULOGD_ERROR, "can't build new template!\n");
-			return ULOGD_IRET_ERR;
-		}
-		llist_add(&template->list, &ii->template_list);
-	}
-	
-	total_size = template->total_length;
-
-	/* decide if it's time to retransmit our template and (optionally)
-	 * prepend it into the to-be-sent IPFIX message */
-	if (0 /* FIXME */) {
-		/* add size of template */
-		//total_size += (template->tmpl_cur - (void *)&template->tmpl);
-		total_size += sizeof(template->tmpl);
-	}
-
-	return ULOGD_IRET_OK;
-}
-
-static int open_connect_socket(struct ulogd_pluginstance *pi)
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &pi->private;
-	struct addrinfo hint, *res, *resave;
-	int ret;
-
-	memset(&hint, 0, sizeof(hint));
-	hint.ai_socktype = ii->sock_type;
-	hint.ai_protocol = ii->sock_proto;
-	hint.ai_flags = AI_ADDRCONFIG;
-
-	ret = getaddrinfo(host_ce(pi->config_kset).u.string,
-			  port_ce(pi->config_kset).u.string,
-			  &hint, &res);
-	if (ret != 0) {
-		ulogd_log(ULOGD_ERROR, "can't resolve host/service: %s\n",
-			  gai_strerror(ret));
-		return -1;
-	}
-
-	resave = res;
-
-	for (; res; res = res->ai_next) {
-		ii->fd = socket(res->ai_family, res->ai_socktype,
-				res->ai_protocol);
-		if (ii->fd < 0) {
-			switch (errno) {
-			case EACCES:
-			case EAFNOSUPPORT:
-			case EINVAL:
-			case EPROTONOSUPPORT:
-				/* try next result */
-				continue;
-			default:
-				ulogd_log(ULOGD_ERROR, "error: %s\n",
-					  strerror(errno));
-				break;
-			}
-		}
-
-#ifdef IPPROTO_SCTP
-		/* Set the number of SCTP output streams */
-		if (res->ai_protocol == IPPROTO_SCTP) {
-			struct sctp_initmsg initmsg;
-			int ret; 
-			memset(&initmsg, 0, sizeof(initmsg));
-			initmsg.sinit_num_ostreams = 2;
-			ret = setsockopt(ii->fd, IPPROTO_SCTP, SCTP_INITMSG,
-					 &initmsg, sizeof(initmsg));
-			if (ret < 0) {
-				ulogd_log(ULOGD_ERROR, "cannot set number of"
-					  "sctp streams: %s\n",
-					  strerror(errno));
-				close(ii->fd);
-				freeaddrinfo(resave);
-				return ret;
-			}
-		}
-#endif
-
-		if (connect(ii->fd, res->ai_addr, res->ai_addrlen) != 0) {
-			close(ii->fd);
-			/* try next result */
-			continue;
-		}
-
-		/* if we reach this, we have a working connection */
-		ulogd_log(ULOGD_NOTICE, "connection established\n");
-		freeaddrinfo(resave);
-		return 0;
-	}
-
-	freeaddrinfo(resave);
-	return -1;
-}
-
-static int start_ipfix(struct ulogd_pluginstance *pi)
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &pi->private;
-	int ret;
-
-	ulogd_log(ULOGD_DEBUG, "starting ipfix\n");
-
-	ii->valid_bitmask = bitmask_alloc(pi->input.num_keys);
-	if (!ii->valid_bitmask)
-		return -ENOMEM;
-
-	INIT_LLIST_HEAD(&ii->template_list);
-
-	ret = open_connect_socket(pi);
-	if (ret < 0)
-		goto out_bm_free;
-
-	return 0;
-
-out_bm_free:
-	bitmask_free(ii->valid_bitmask);
-	ii->valid_bitmask = NULL;
-
-	return ret;
-}
-
-static int stop_ipfix(struct ulogd_pluginstance *pi) 
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &pi->private;
-
-	close(ii->fd);
-
-	bitmask_free(ii->valid_bitmask);
-	ii->valid_bitmask = NULL;
-
-	return 0;
-}
-
-static void signal_handler_ipfix(struct ulogd_pluginstance *pi, int signal)
-{
-	struct ipfix_instance *li = (struct ipfix_instance *) &pi->private;
-
-	switch (signal) {
-	case SIGHUP:
-		ulogd_log(ULOGD_NOTICE, "ipfix: reopening connection\n");
-		stop_ipfix(pi);
-		start_ipfix(pi);
-		break;
-	default:
-		break;
-	}
-}
-	
-static int configure_ipfix(struct ulogd_pluginstance *pi,
-			    struct ulogd_pluginstance_stack *stack)
-{
-	struct ipfix_instance *ii = (struct ipfix_instance *) &pi->private;
-	char *proto_str = proto_ce(pi->config_kset).u.string;
-	int ret;
-
-	/* FIXME: error handling */
-	ulogd_log(ULOGD_DEBUG, "parsing config file section %s\n", pi->id);
-	ret = config_parse_file(pi->id, pi->config_kset);
-	if (ret < 0)
-		return ret;
-
-	/* determine underlying protocol */
-	if (!strcasecmp(proto_str, "udp")) {
-		ii->sock_type = SOCK_DGRAM;
-		ii->sock_proto = IPPROTO_UDP;
-	} else if (!strcasecmp(proto_str, "tcp")) {
-		ii->sock_type = SOCK_STREAM;
-		ii->sock_proto = IPPROTO_TCP;
-#ifdef IPPROTO_SCTP
-	} else if (!strcasecmp(proto_str, "sctp")) {
-		ii->sock_type = SOCK_SEQPACKET;
-		ii->sock_proto = IPPROTO_SCTP;
-#endif
-#ifdef _HAVE_DCCP
-	} else if (!strcasecmp(proto_str, "dccp")) {
-		ii->sock_type = SOCK_SEQPACKET;
-		ii->sock_proto = IPPROTO_DCCP;
-#endif
-	} else {
-		ulogd_log(ULOGD_ERROR, "unknown protocol `%s'\n",
-			  proto_ce(pi->config_kset));
-		return -EINVAL;
-	}
-
-	/* postpone address lookup to ->start() time, since we want to 
-	 * re-lookup an address on SIGHUP */
-
-	return ulogd_wildcard_inputkeys(pi);
-}
-
-static struct ulogd_plugin ipfix_plugin = { 
-	.name = "IPFIX",
-	.input = {
-		.type = ULOGD_DTYPE_PACKET | ULOGD_DTYPE_FLOW, 
-	},
-	.output = {
-		.type = ULOGD_DTYPE_SINK,
-	},
-	.config_kset 	= &ipfix_kset,
-	.priv_size 	= sizeof(struct ipfix_instance),
-
-	.configure	= &configure_ipfix,
-	.start	 	= &start_ipfix,
-	.stop	 	= &stop_ipfix,
-
-	.interp 	= &output_ipfix, 
-	.signal 	= &signal_handler_ipfix,
-	.version	= VERSION,
-};
-
-void __attribute__ ((constructor)) init(void);
-
-void init(void)
-{
-	ulogd_register_plugin(&ipfix_plugin);
-}
-- 
2.17.1

>From 592c7a55f52a8c73c51c5929fdef97bb7ef42c93 Mon Sep 17 00:00:00 2001
From: Ander Juaristi <a@xxxxxxxxxxxx>
Date: Wed, 17 Apr 2019 13:35:37 +0200
Subject: [PATCH 2/2] IPFIX: Introduce template record support

This commit adds the ability to send template records
to the remote collector.

In addition, it also introduces a new
configuration parameter 'send_template', which tells when template
records should be sent. It accepts the following string values:

 - "once": Send the template record only the first time (might be coalesced
    with data records).
 - "always": Send the template record always, with every data record that is sent
    to the collector (multiple data records might be sent together).
 - "never": Assume the collector knows the schema already. Do not send template records.

If omitted, the default value for 'send_template' is "once".

Signed-off-by: Ander Juaristi <a@xxxxxxxxxxxx>
---
 include/ulogd/ipfix_protocol.h    |  1 +
 output/ipfix/ipfix.c              | 97 ++++++++++++++++++++++++++++++-
 output/ipfix/ipfix.h              | 22 +++----
 output/ipfix/ulogd_output_IPFIX.c | 56 ++++++++++--------
 4 files changed, 139 insertions(+), 37 deletions(-)

diff --git a/include/ulogd/ipfix_protocol.h b/include/ulogd/ipfix_protocol.h
index aef47f0..01dd96a 100644
--- a/include/ulogd/ipfix_protocol.h
+++ b/include/ulogd/ipfix_protocol.h
@@ -129,6 +129,7 @@ enum {
 	/* reserved */
 	IPFIX_fragmentOffsetIPv4	= 88,
 	/* reserved */
+	IPFIX_applicationId		= 95,
 	IPFIX_bgpNextAdjacentAsNumber	= 128,
 	IPFIX_bgpPrevAdjacentAsNumber	= 129,
 	IPFIX_exporterIPv4Address	= 130,
diff --git a/output/ipfix/ipfix.c b/output/ipfix/ipfix.c
index 60a4c7f..4bb432a 100644
--- a/output/ipfix/ipfix.c
+++ b/output/ipfix/ipfix.c
@@ -2,6 +2,7 @@
  * ipfix.c
  *
  * Holger Eitzenberger, 2009.
+ * Ander Juaristi, 2019
  */
 
 /* These forward declarations are needed since ulogd.h doesn't like to be the first */
@@ -13,25 +14,107 @@
 
 #include <ulogd/ulogd.h>
 #include <ulogd/common.h>
+#include <ulogd/ipfix_protocol.h>
+
+struct ipfix_templ_elem {
+	uint16_t id;
+	uint16_t len;
+};
+
+struct ipfix_templ {
+	unsigned int num_templ_elements;
+	struct ipfix_templ_elem templ_elements[];
+};
+
+/* Template fields modeled after vy_ipfix_data */
+static const struct ipfix_templ template = {
+	.num_templ_elements = 10,
+	.templ_elements = {
+		{
+			.id = IPFIX_sourceIPv4Address,
+			.len = sizeof(uint32_t)
+		},
+		{
+			.id = IPFIX_destinationIPv4Address,
+			.len = sizeof(uint32_t)
+		},
+		{
+			.id = IPFIX_packetTotalCount,
+			.len = sizeof(uint32_t)
+		},
+		{
+			.id = IPFIX_octetTotalCount,
+			.len = sizeof(uint32_t)
+		},
+		{
+			.id = IPFIX_flowStartSeconds,
+			.len = sizeof(uint32_t)
+		},
+		{
+			.id = IPFIX_flowEndSeconds,
+			.len = sizeof(uint32_t)
+		},
+		{
+			.id = IPFIX_sourceTransportPort,
+			.len = sizeof(uint16_t)
+		},
+		{
+			.id = IPFIX_destinationTransportPort,
+			.len = sizeof(uint16_t)
+		},
+		{
+			.id = IPFIX_protocolIdentifier,
+			.len = sizeof(uint8_t)
+		},
+		{
+			.id = IPFIX_applicationId,
+			.len = sizeof(uint32_t)
+		}
+	}
+};
 
-struct ipfix_msg *ipfix_msg_alloc(size_t len, uint32_t oid)
+struct ipfix_msg *ipfix_msg_alloc(size_t len, uint32_t oid, int tid)
 {
 	struct ipfix_msg *msg;
 	struct ipfix_hdr *hdr;
+	struct ipfix_templ_hdr *templ_hdr;
+	struct ipfix_templ_elem *elem;
+	unsigned int i = 0;
 
-	if (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN)
+	if ((tid > 0 && len < IPFIX_HDRLEN + IPFIX_TEMPL_HDRLEN(template.num_templ_elements) + IPFIX_SET_HDRLEN) ||
+	    (len < IPFIX_HDRLEN + IPFIX_SET_HDRLEN))
 		return NULL;
 
 	msg = malloc(sizeof(struct ipfix_msg) + len);
 	memset(msg, 0, sizeof(struct ipfix_msg));
-	msg->tail = msg->data + IPFIX_HDRLEN;
+	msg->tid = tid;
 	msg->end = msg->data + len;
+	msg->tail = msg->data + IPFIX_HDRLEN;
+	if (tid > 0)
+		msg->tail += IPFIX_TEMPL_HDRLEN(template.num_templ_elements);
 
+	/* Initialize message header */
 	hdr = ipfix_msg_hdr(msg);
 	memset(hdr, 0, IPFIX_HDRLEN);
 	hdr->version = htons(IPFIX_VERSION);
 	hdr->oid = htonl(oid);
 
+	if (tid > 0) {
+		/* Initialize template record header */
+		templ_hdr = ipfix_msg_templ_hdr(msg);
+		templ_hdr->sid = htons(2);
+		templ_hdr->tid = htons(tid);
+		templ_hdr->len = htons(IPFIX_TEMPL_HDRLEN(template.num_templ_elements));
+		templ_hdr->cnt = htons(template.num_templ_elements);
+
+		while (i < template.num_templ_elements) {
+			elem = (struct ipfix_templ_elem *) &templ_hdr->data[i * 4];
+			elem->id = htons(template.templ_elements[i].id);
+			elem->len = htons(template.templ_elements[i].len);
+			i++;
+		}
+	}
+
 	return msg;
 }
 
@@ -47,6 +130,14 @@ void ipfix_msg_free(struct ipfix_msg *msg)
 	free(msg);
 }
 
+struct ipfix_templ_hdr *ipfix_msg_templ_hdr(const struct ipfix_msg *msg)
+{
+	if (msg->tid > 0)
+		return (struct ipfix_templ_hdr *) (msg->data + IPFIX_HDRLEN);
+
+	return NULL;
+}
+
 struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *msg)
 {
 	return (struct ipfix_hdr *)msg->data;
diff --git a/output/ipfix/ipfix.h b/output/ipfix/ipfix.h
index cdb5a6f..93945fb 100644
--- a/output/ipfix/ipfix.h
+++ b/output/ipfix/ipfix.h
@@ -2,6 +2,7 @@
  * ipfix.h
  *
  * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>, 2009.
+ * Ander Juaristi <a@xxxxxxxxxxxx>, 2019
  */
 #ifndef IPFIX_H
 #define IPFIX_H
@@ -20,17 +21,21 @@ struct ipfix_hdr {
 	uint8_t data[];
 } __packed;
 
-#define IPFIX_HDRLEN	sizeof(struct ipfix_hdr)
+#define IPFIX_HDRLEN		sizeof(struct ipfix_hdr)
 
 /*
  * IDs 0-255 are reserved for Template Sets.  IDs of Data Sets are > 255.
  */
 struct ipfix_templ_hdr {
-	uint16_t id;
+	uint16_t sid;
+	uint16_t len;
+	uint16_t tid;
 	uint16_t cnt;
 	uint8_t data[];
 } __packed;
 
+#define IPFIX_TEMPL_HDRLEN(nfields)	sizeof(struct ipfix_templ_hdr) + (sizeof(uint16_t) * 2 * nfields)
+
 struct ipfix_set_hdr {
 #define IPFIX_SET_TEMPL			2
 #define IPFIX_SET_OPT_TEMPL		3
@@ -46,6 +51,7 @@ struct ipfix_msg {
 	uint8_t *tail;
 	uint8_t *end;
 	unsigned nrecs;
+	int tid;
 	struct ipfix_set_hdr *last_set;
 	uint8_t data[];
 };
@@ -53,18 +59,14 @@ struct ipfix_msg {
 struct vy_ipfix_data {
 	struct in_addr saddr;
 	struct in_addr daddr;
-	uint16_t ifi_in;
-	uint16_t ifi_out;
 	uint32_t packets;
 	uint32_t bytes;
 	uint32_t start;				/* Unix time */
 	uint32_t end;				/* Unix time */
 	uint16_t sport;
 	uint16_t dport;
-	uint32_t aid;				/* Application ID */
 	uint8_t l4_proto;
-	uint8_t dscp;
-	uint16_t __padding;
+	uint32_t aid;				/* Application ID */
 } __packed;
 
 #define VY_IPFIX_SID		256
@@ -73,13 +75,11 @@ struct vy_ipfix_data {
 #define VY_IPFIX_PKT_LEN	(IPFIX_HDRLEN + IPFIX_SET_HDRLEN \
 							 + VY_IPFIX_FLOWS * sizeof(struct vy_ipfix_data))
 
-/* template management */
-size_t ipfix_rec_len(uint16_t);
-
 /* message handling */
-struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t);
+struct ipfix_msg *ipfix_msg_alloc(size_t, uint32_t, int);
 void ipfix_msg_free(struct ipfix_msg *);
 struct ipfix_hdr *ipfix_msg_hdr(const struct ipfix_msg *);
+struct ipfix_templ_hdr *ipfix_msg_templ_hdr(const struct ipfix_msg *);
 size_t ipfix_msg_len(const struct ipfix_msg *);
 void *ipfix_msg_data(struct ipfix_msg *);
 struct ipfix_set_hdr *ipfix_msg_add_set(struct ipfix_msg *, uint16_t);
diff --git a/output/ipfix/ulogd_output_IPFIX.c b/output/ipfix/ulogd_output_IPFIX.c
index ec143b1..5b59003 100644
--- a/output/ipfix/ulogd_output_IPFIX.c
+++ b/output/ipfix/ulogd_output_IPFIX.c
@@ -3,6 +3,9 @@
  *
  * ulogd IPFIX Exporter plugin.
  *
+ * (C) 2009 by Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>, Astaro AG
+ * (C) 2019 by Ander Juaristi <a@xxxxxxxxxxxx>
+ *
  * This program is distributed in the hope that it will be useful,
  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
@@ -11,8 +14,6 @@
  * You should have received a copy of the GNU General Public License
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
- *
- * Holger Eitzenberger <holger@xxxxxxxxxxxxxxxx>  Astaro AG 2009
  */
 #include <unistd.h>
 #include <time.h>
@@ -28,6 +29,7 @@
 #define DEFAULT_MTU		512 /* RFC 5101, 10.3.3 */
 #define DEFAULT_PORT		4739 /* RFC 5101, 10.3.4 */
 #define DEFAULT_SPORT		4740
+#define DEFAULT_SEND_TEMPLATE	"once"
 
 enum {
 	OID_CE = 0,
@@ -35,16 +37,18 @@ enum {
 	PORT_CE,
 	PROTO_CE,
 	MTU_CE,
+	SEND_TEMPLATE_CE
 };
 
-#define oid_ce(x)	(x->ces[OID_CE])
-#define host_ce(x)	(x->ces[HOST_CE])
-#define port_ce(x)	(x->ces[PORT_CE])
-#define proto_ce(x)	(x->ces[PROTO_CE])
-#define mtu_ce(x)	(x->ces[MTU_CE])
+#define oid_ce(x)		(x->ces[OID_CE])
+#define host_ce(x)		(x->ces[HOST_CE])
+#define port_ce(x)		(x->ces[PORT_CE])
+#define proto_ce(x)		(x->ces[PROTO_CE])
+#define mtu_ce(x)		(x->ces[MTU_CE])
+#define send_template_ce(x)	(x->ces[SEND_TEMPLATE_CE])
 
 static const struct config_keyset ipfix_kset = {
-	.num_ces = 5,
+	.num_ces = 6,
 	.ces = {
 		{
 			.key = "oid",
@@ -70,20 +74,21 @@ static const struct config_keyset ipfix_kset = {
 			.key = "mtu",
 			.type = CONFIG_TYPE_INT,
 			.u.value = DEFAULT_MTU
+		},
+		{
+			.key = "send_template",
+			.type = CONFIG_TYPE_STRING,
+			.u.string = DEFAULT_SEND_TEMPLATE
 		}
 	}
 };
 
-struct ipfix_templ {
-	struct ipfix_templ *next;
-};
-
 struct ipfix_priv {
 	struct ulogd_fd ufd;
 	uint32_t seqno;
 	struct ipfix_msg *msg;		/* current message */
 	struct llist_head list;
-	struct ipfix_templ *templates;
+	int tid;
 	int proto;
 	struct ulogd_timer timer;
 	struct sockaddr_in sa;
@@ -258,8 +263,8 @@ static void ipfix_timer_cb(struct ulogd_timer *t, void *data)
 static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginstance_stack *stack)
 {
 	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	char *host, *proto, *send_template;
 	int oid, port, mtu, ret;
-	char *host, *proto;
 	char addr[16];
 
 	ret = config_parse_file(pi->id, pi->config_kset);
@@ -271,6 +276,7 @@ static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginsta
 	port = port_ce(pi->config_kset).u.value;
 	proto = proto_ce(pi->config_kset).u.string;
 	mtu = mtu_ce(pi->config_kset).u.value;
+	send_template = send_template_ce(pi->config_kset).u.string;
 
 	if (!oid) {
 		ulogd_log(ULOGD_FATAL, "invalid Observation ID\n");
@@ -303,6 +309,8 @@ static int ipfix_configure(struct ulogd_pluginstance *pi, struct ulogd_pluginsta
 
 	ulogd_init_timer(&priv->timer, pi, ipfix_timer_cb);
 
+	priv->tid = (strcmp(send_template, "never") ? VY_IPFIX_SID : -1);
+
 	ulogd_log(ULOGD_INFO, "using IPFIX Collector at %s:%d (MTU %d)\n",
 		  inet_ntop(AF_INET, &priv->sa.sin_addr, addr, sizeof(addr)),
 		  port, mtu);
@@ -410,25 +418,30 @@ static int ipfix_stop(struct ulogd_pluginstance *pi)
 static int ipfix_interp(struct ulogd_pluginstance *pi)
 {
 	struct ipfix_priv *priv = (struct ipfix_priv *) &pi->private;
+	char saddr[16], daddr[16], *send_template;
 	struct vy_ipfix_data *data;
 	int oid, mtu, ret;
-	char addr[16];
 
 	if (!(GET_FLAGS(pi->input.keys, InIpSaddr) & ULOGD_RETF_VALID))
 		return ULOGD_IRET_OK;
 
 	oid = oid_ce(pi->config_kset).u.value;
 	mtu = mtu_ce(pi->config_kset).u.value;
+	send_template = send_template_ce(pi->config_kset).u.string;
 
 again:
 	if (!priv->msg) {
-		priv->msg = ipfix_msg_alloc(mtu, oid);
+		priv->msg = ipfix_msg_alloc(mtu, oid, priv->tid);
 		if (!priv->msg) {
 			/* just drop this flow */
 			ulogd_log(ULOGD_ERROR, "out of memory, dropping flow\n");
 			return ULOGD_IRET_OK;
 		}
 		ipfix_msg_add_set(priv->msg, VY_IPFIX_SID);
+
+		/* template sent - do not send it again the next time */
+		if (priv->tid == VY_IPFIX_SID && strcmp(send_template, "once") == 0)
+			priv->tid = -1;
 	}
 
 	data = ipfix_msg_add_data(priv->msg, sizeof(struct vy_ipfix_data));
@@ -439,8 +452,6 @@ again:
 		goto again;
 	}
 
-	data->ifi_in = data->ifi_out = 0;
-
 	data->saddr.s_addr = ikey_get_u32(&pi->input.keys[InIpSaddr]);
 	data->daddr.s_addr = ikey_get_u32(&pi->input.keys[InIpDaddr]);
 
@@ -462,13 +473,12 @@ again:
 		data->aid = htonl(ikey_get_u32(&pi->input.keys[InCtMark]));
 
 	data->l4_proto = ikey_get_u8(&pi->input.keys[InIpProto]);
-	data->__padding = 0;
 
 	ulogd_log(ULOGD_DEBUG, "Got new packet (packets = %u, bytes = %u, flow = (%u, %u), saddr = %s, daddr = %s, sport = %u, dport = %u)\n",
-			ntohl(data->packets), ntohl(data->bytes), ntohl(data->start), ntohl(data->end),
-			inet_ntop(AF_INET, &data->saddr.s_addr, addr, sizeof(addr)),
-			inet_ntop(AF_INET, &data->daddr.s_addr, addr, sizeof(addr)),
-			ntohs(data->sport), ntohs(data->dport));
+		  ntohl(data->packets), ntohl(data->bytes), ntohl(data->start), ntohl(data->end),
+		  inet_ntop(AF_INET, &data->saddr.s_addr, saddr, sizeof(saddr)),
+		  inet_ntop(AF_INET, &data->daddr.s_addr, daddr, sizeof(daddr)),
+		  ntohs(data->sport), ntohs(data->dport));
 
 	if ((ret = send_msgs(pi)) < 0)
 		return ret;
-- 
2.17.1


[Index of Archives]     [Netfitler Users]     [Berkeley Packet Filter]     [LARTC]     [Bugtraq]     [Yosemite Forum]

  Powered by Linux