This was very helpful. However I think I'm not quite to the end of this path yet. I have the following rules in the mangle section of my iptables: iptables -t mangle -A PREROUTING -p tcp --dport 9999 -j NFQUEUE --queue-balance 0:1 iptables -t mangle -A PREROUTING -p tcp --sport 9999 -j NFQUEUE --queue-balance 0:1 I create 2 threads.Each thread does a nfq_open. So far so good. Thread 1 has his nfq_handle and thread 2 has his nfq_handle. Thread 1 does a nfq_create_queue on queue 0, and Thread 2 does a nfq_create_queue on queue 1. Each thread then opens a netlink handle. Each thread does this independently with the handle returned from nfq_open. Each thread gets an independent fd. When I run, only one thread receives traffic (queue 0 on thread #1). I am using iperf -P 8 for example (8 simultaneous threads/connections). Frankly I must be missing something since I don't see any relationship between the result of nfq_create_queue (which presumably binds to the queue number given in the second argument), and the file descriptor we get from nfnl_fd (unless there is a side-effect of this routine to bind the queue ID to the nfq_handle!). I can sort of see how nfq_handle_packet will make that association (presumably the queue # is squirreled away in some piece of state and that finds the right callback, etc.). I think I've misunderstood the basic structure, especially with respect to what should be in a thread. Or perhaps my complete naivete on iptables means I've screwed up that configuration. So either I've set my stuff up wrong (you can see the code below), or the iptables stuff isn't really distributing the requests across queues, or some of both. -- Mike Configuration being run: kernel: 2.6.32-358.6.1.el6.x86_64 libmnl vs 1.0.3 libnfnetlink vs. 1.0.1 libnetfilter_queue vs. 1.0.2 #include <stdlib.h> #include <stdio.h> #include <string.h> #include <ctype.h> #include <unistd.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <netinet/ip.h> #include <netinet/udp.h> #include <netinet/tcp.h> #include <pthread.h> #include <fcntl.h> #include <errno.h> #include <inttypes.h> extern "C" { #include <linux/netfilter.h> /* Defines verdicts (NF_ACCEPT, etc) */ #include <libnetfilter_queue/libnetfilter_queue.h> } .... uninteresting stuff eliminated.... struct queueStuff { int queue; int maxqueue; queueStuff(int i, int m): queue(i), maxqueue(m) {} }; static int Callback(nfq_q_handle *myQueue, struct nfgenmsg *msg, nfq_data *pkt, void *cbData) { uint32_t id = 0; nfqnl_msg_packet_hdr *header; queueStuff *p = (queueStuff *) cbData; if ((header = nfq_get_msg_packet_hdr(pkt))) { id = ntohl(header->packet_id); } // Print the payload; in copy meta mode, only headers will be included; // in copy packet mode, whole packet will be returned. unsigned char *pktData; int len = nfq_get_payload(pkt, &pktData); numPackets++; ip *ipp = (ip *) pktData; if (!ipp) numNull++; if (ipp && ipp->ip_p == IPPROTO_TCP) { tcphdr *tcpp = (tcphdr *)(pktData + sizeof(ip)); uint32_t headerlen = ipp->ip_hl * 4 + tcpp->doff * 4; uint32_t paylen = ntohs(ipp->ip_len) - headerlen; if (tcpp->fin) numFin++; if (tcpp->syn) numSyn++; if (tcpp->rst) numRst++; if (tcpp->psh) numPsh++; if (tcpp->ack) numAck++; if (tcpp->urg) numUrg++; if (!tcpp->fin && !tcpp->syn && !tcpp->rst && !tcpp->psh && !tcpp->ack && !tcpp->urg) numNone++; totalAmount += paylen; } if ((numPackets % 5000) == 0) { printf("Queue %d: %d packets, %d NULL, %d fin, %d syn, %d rst, %d psh, %d ack, %d urg, %d none -- %lld bytes\n", p->queue, numPackets, numNull, numFin, numSyn, numRst, numPsh, numAck, numUrg, numNone, totalAmount); fflush(stdout); } #if 0 printf("Packet data for %d / id = %d\n", p->queue, id); Dump(pktData); #endif // For this program we'll always accept the packet... return nfq_set_verdict(myQueue, id, NF_ACCEPT, 0, NULL); // end Callback } void *processQueue(void *q) { queueStuff *qs = (queueStuff *) q; struct nfq_handle *nfqHandle = NULL; nfqHandle = nfq_open(); if (!nfqHandle) { perror("nfq_open"); exit(1); } printf("Unbinding...nobody seems sure why to do this\n"); if (nfq_unbind_pf(nfqHandle, AF_INET) < 0) { perror("nfq_unbind_pf"); exit(1); } printf("Binding to process IP packets\n"); if (nfq_bind_pf(nfqHandle, AF_INET) < 0) { perror("nfq_bind_pf"); exit(1); } printf("Creating netfilter handle %d\n", qs->queue); struct nfq_q_handle *myQueue = NULL; struct nfnl_handle *netlinkHandle = NULL; int fd = -1; ssize_t res; char buf[4096]; printf("Installing queue %d\n", qs->queue); if (!(myQueue = nfq_create_queue(nfqHandle, qs->queue, &Callback, q))) { perror("nfq_create_queue"); exit(1); } printf("Myqueue for %d is %p\n", qs->queue, myQueue); fflush(stdout); // Turn on packet copy mode ... NOTE: only copy_packet really works. int whatToCopy = NFQNL_COPY_PACKET; // int whatToCopy = NFQNL_COPY_META; // A little more than the standard header... // int sizeToCopy = sizeof(ip) + sizeof(tcphdr) + 10; int sizeToCopy = 10000; if (nfq_set_mode(myQueue, whatToCopy, sizeToCopy) < 0) { perror("nfq_set_mode"); exit(1); } printf("Set mode for %d\n", qs->queue); fflush(stdout); if (nfq_set_queue_maxlen(myQueue, qs->maxqueue) < 0) { printf("Couldn't set queue max len to %d.\n",qs->maxqueue); } else { printf("Set queue length to %d packets\n", qs->maxqueue); fflush(stdout); } netlinkHandle = nfq_nfnlh(nfqHandle); if (!netlinkHandle) { perror("nfq_nfnlh"); exit(1); } printf("Got netlink handle for %d\n", qs->queue); fflush(stdout); nfnl_rcvbufsiz(netlinkHandle, qs->maxqueue * 1500); printf("Set recv buffer size to %d\n", qs->maxqueue*1500); fflush(stdout); fd = nfnl_fd(netlinkHandle); printf("Queue #%d: fd = %d\n", qs->queue, fd); fflush(stdout); int opt = 1; setsockopt(fd, SOL_NETLINK, NETLINK_BROADCAST_SEND_ERROR, &opt, sizeof(int)); setsockopt(fd, SOL_NETLINK, NETLINK_NO_ENOBUFS, &opt, sizeof(int)); printf("Ignoring buffer overflows...folklore\n"); fflush(stdout); while ((res = recv(fd, buf, sizeof(buf), 0)) && res >= 0) { // I am not totally sure why a callback mechanism is used // rather than just handling it directly here, but that // seems to be the convention... nfq_handle_packet(nfqHandle, buf, res); // end while receiving traffic } perror("recv"); nfq_destroy_queue(myQueue); nfq_close(nfqHandle); } int main(int argc, char *argv[]) { int numQueues = 1; if (argc == 2) numQueues = atoi(argv[1]); int maxqueue = 10000; if (argc == 3) maxqueue = atoi(argv[2]); pthread_t theThreads[100]; for (int i=0; i<numQueues; i++) { pthread_create(&theThreads[i], NULL, processQueue, new queueStuff(i, maxqueue)); sleep(1); } for (int i=0; i<numQueues; i++) { pthread_join(theThreads[i], NULL); } return 0; } What I see: # Generated by iptables-save v1.4.7 on Sat Jul 13 15:13:14 2013 *mangle :PREROUTING ACCEPT [1307:101481] :INPUT ACCEPT [588339:1228894985] :FORWARD ACCEPT [0:0] :OUTPUT ACCEPT [297758:15642665] :POSTROUTING ACCEPT [297758:15642665] -A PREROUTING -p tcp -m tcp --dport 9999 -j NFQUEUE --queue-balance 0:1 -A PREROUTING -p tcp -m tcp --sport 9999 -j NFQUEUE --queue-balance 0:1 COMMIT # Completed on Sat Jul 13 15:13:14 2013 # Generated by iptables-save v1.4.7 on Sat Jul 13 15:13:14 2013 *filter :INPUT ACCEPT [588350:1228895761] :FORWARD ACCEPT [0:0] :OUTPUT ACCEPT [297769:15644033] -A INPUT -i bond0 -p tcp -m tcp --dport 80 -m state --state NEW,ESTABLISHED -j ACCEPT -A OUTPUT -o bond0 -p tcp -m tcp --sport 80 -m state --state ESTABLISHED -j ACCEPT COMMIT # Completed on Sat Jul 13 15:13:14 2013 TBL205-ROOT>> iptables-save TBL205-ROOT>> ./t4 2 Unbinding...nobody seems sure why to do this Binding to process IP packets Creating netfilter handle 0 Installing queue 0 Myqueue for 0 is 0x7f95f8000b30 Set mode for 0 Set queue length to 10000 packets Got netlink handle for 0 Set recv buffer size to 15000000 Queue #0: fd = 3 Ignoring buffer overflows...folklore Unbinding...nobody seems sure why to do this Binding to process IP packets Creating netfilter handle 1 Installing queue 1 Myqueue for 1 is 0x7f95f0000b30 Set mode for 1 Set queue length to 10000 packets Got netlink handle for 1 Set recv buffer size to 15000000 Queue #1: fd = 4 Ignoring buffer overflows...folklore Queue 0: 5000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 76 psh, 4999 ack, 0 urg, 0 none -- 9954356 bytes Queue 0: 10000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 83 psh, 9999 ack, 0 urg, 0 none -- 20194356 bytes Queue 0: 15000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 87 psh, 14999 ack, 0 urg, 0 none -- 30434356 bytes Queue 0: 20000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 90 psh, 19999 ack, 0 urg, 0 none -- 40674356 bytes Queue 0: 25000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 92 psh, 24999 ack, 0 urg, 0 none -- 50914356 bytes Queue 0: 30000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 94 psh, 29999 ack, 0 urg, 0 none -- 61154356 bytes Queue 0: 35000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 96 psh, 34999 ack, 0 urg, 0 none -- 71394356 bytes Queue 0: 40000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 97 psh, 39999 ack, 0 urg, 0 none -- 81634356 bytes Queue 0: 45000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 99 psh, 44999 ack, 0 urg, 0 none -- 91874356 bytes Queue 0: 50000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 101 psh, 49999 ack, 0 urg, 0 none -- 102114356 bytes Queue 0: 55000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 103 psh, 54999 ack, 0 urg, 0 none -- 112354356 bytes Queue 0: 60000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 105 psh, 59999 ack, 0 urg, 0 none -- 122594356 bytes Queue 0: 65000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 107 psh, 64999 ack, 0 urg, 0 none -- 132834356 bytes Queue 0: 70000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 109 psh, 69999 ack, 0 urg, 0 none -- 143074356 bytes Queue 0: 75000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 111 psh, 74999 ack, 0 urg, 0 none -- 153314356 bytes Queue 0: 80000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 113 psh, 79999 ack, 0 urg, 0 none -- 163554356 bytes Queue 0: 85000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 115 psh, 84999 ack, 0 urg, 0 none -- 173794356 bytes Queue 0: 90000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 117 psh, 89999 ack, 0 urg, 0 none -- 184034356 bytes Queue 0: 95000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 119 psh, 94999 ack, 0 urg, 0 none -- 194274356 bytes Queue 0: 100000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 121 psh, 99999 ack, 0 urg, 0 none -- 204514356 bytes Queue 0: 105000 packets, 0 NULL, 0 fin, 1 syn, 0 rst, 122 psh, 104999 ack, 0 urg, 0 none -- 214754356 bytes Queue 0: 110000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 263 psh, 109991 ack, 0 urg, 0 none -- 222752512 bytes Queue 0: 115000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 381 psh, 114991 ack, 0 urg, 0 none -- 232954624 bytes Queue 0: 120000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 439 psh, 119991 ack, 0 urg, 0 none -- 243194624 bytes Queue 0: 125000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 472 psh, 124991 ack, 0 urg, 0 none -- 253434624 bytes Queue 0: 130000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 494 psh, 129991 ack, 0 urg, 0 none -- 263674624 bytes Queue 0: 135000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 507 psh, 134991 ack, 0 urg, 0 none -- 273914624 bytes Queue 0: 140000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 519 psh, 139991 ack, 0 urg, 0 none -- 284154624 bytes Queue 0: 145000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 528 psh, 144991 ack, 0 urg, 0 none -- 294394624 bytes Queue 0: 150000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 537 psh, 149991 ack, 0 urg, 0 none -- 304634624 bytes Queue 0: 155000 packets, 0 NULL, 1 fin, 9 syn, 0 rst, 545 psh, 154991 ack, 0 urg, 0 none -- 314874624 bytes On Fri, Jul 12, 2013 at 10:47 AM, Eric Leblond <eric@xxxxxxxxx> wrote: > Hi, > > Le vendredi 12 juillet 2013 à 09:23 -0400, Michael Kilian a écrit : >> I am a little confused with using netfilter API's using multiple >> threads to access multiple queues presumably to get better >> performance. >> >> Is there documentation on what is thread-safe? How about what can be >> run in parallel? > > > >> From what I can discern from the API structure you have a single fd to >> the NFQUEUE infrastructure (from nfq_open). All packets flow over >> that. > > The fd is per-queue so you can multithread using multiple recev thread > per-fd. > > You can find a multithreaded code source example here: > > https://redmine.openinfosecfoundation.org/projects/suricata/repository/revisions/master/entry/src/source-nfq.c > > and a documentation here: > > https://home.regit.org/netfilter-en/using-nfqueue-and-libnetfilter_queue/ >> >> Question #1: If I have n threads performing recv's on this fd, am I >> going to get complete packets atomically? Are multiple simultaneous >> recv's allowed? Or must this be handled with a single thread (and >> doesn't this inherently limit the scalability of this)? > > One single thread per fd is the way to go. > >> Question #2: If I have multiple queues (for example, using >> --queue_balance 0:3), it appears that we only see the multiple queues >> in the call back (which I get to by registering the queue number and >> the callback to the single nfqHandle using nfq_create_queue). So is >> the flow: single threaded access to the nfqHandle, but once I get to >> the callback I can handle different nfq_q_handles in different >> threads? This seems to only allow packet processing, verdict >> rendering etc. to be multi-threaded, but not the initial packet read. >> Is this correct? >> >> Question #3: It appears that --queue_balance 0:3 must relate to the >> nfq_q_handles, not the nfq_handle. Is this right? >> >> Ultimately I worry about how to get 10-20 Gbps through this interface >> if I end up single-threaded on the initial receive (see Question #1). >> Is there a programming paradigm that allows the recv's to be handled >> in multiple threads, across multiple CPUs? For example, in "normal" >> socket programming, content is distributed across the CPUs, >> connections can be accepted in parallel, etc. It would seem to me to >> be logical to allow the multiple queues specified with the >> --queue_balance to be treated completely independently in the threads. >> And while I am no kernel expert, don't most NICs now deliver the >> packets across cores today (they certainly can distribute the >> interrupts across cores). > > Latest kernel has cpu fanout support which allow to stay on the same CPU > for a single queue. This way you can load balance flow on your network > card and have the packet stay on the same core for treatment (if your > userspace program is bound to the correct cpu). > > BR, > >> >> -- Mike Kilian >> -- >> To unsubscribe from this list: send the line "unsubscribe netfilter" in >> the body of a message to majordomo@xxxxxxxxxxxxxxx >> More majordomo info at http://vger.kernel.org/majordomo-info.html > -- To unsubscribe from this list: send the line "unsubscribe netfilter" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html