Add support for capturing from multiple interfaces.
authorMichael Tüxen <tuexen@fh-muenster.de>
Mon, 16 May 2011 21:56:12 +0000 (21:56 -0000)
committerMichael Tüxen <tuexen@fh-muenster.de>
Mon, 16 May 2011 21:56:12 +0000 (21:56 -0000)
This patch is basedon work done by Irene Ruengeler.

This feature is considered experimental at the moment.
However, you need to use the -t command line option
to use the feature. When not providing it, the old
method will be used.

svn path=/trunk/; revision=37191

dumpcap.c

index d86dd1eac539cc1fa9ccd95c62641173bc4a11cd..22bc416b868036f101e3f3380f33a88f18c7fe21 100644 (file)
--- a/dumpcap.c
+++ b/dumpcap.c
@@ -139,6 +139,8 @@ FILE *debug_log;   /* for logging debug messages to  */
 #define USE_THREADS
 #endif
 
+static GAsyncQueue *pcap_queue;
+
 static gboolean capture_child = FALSE; /* FALSE: standalone call, TRUE: this is an Wireshark capture child */
 #ifdef _WIN32
 static gchar *sig_pipe_name = NULL;
@@ -213,6 +215,12 @@ typedef enum {
     INITFILTER_OTHER_ERROR
 } initfilter_status_t;
 
+typedef struct _pcap_queue_element {
+    guint              interface_id;
+    struct pcap_pkthdr phdr;
+    u_char             *pd;
+} pcap_queue_element;
+
 typedef struct _pcap_options {
     pcap_t         *pcap_h;
 #ifdef MUST_DO_SELECT
@@ -318,6 +326,8 @@ static gboolean need_timeout_workaround;
 #endif
 static const char *cap_pipe_err_str;
 
+#define WRITER_THREAD_TIMEOUT 100000 /* usecs */
+
 static void
 console_log_handler(const char *log_domain, GLogLevelFlags log_level,
                     const char *message, gpointer user_data _U_);
@@ -325,9 +335,12 @@ console_log_handler(const char *log_domain, GLogLevelFlags log_level,
 /* capture related options */
 static capture_options global_capture_opts;
 static gboolean quiet = FALSE;
+static gboolean use_threads = FALSE;
 
-static void capture_loop_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
-                                   const u_char *pd);
+static void capture_loop_write_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+                                         const u_char *pd);
+static void capture_loop_queue_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+                                         const u_char *pd);
 static void capture_loop_get_errmsg(char *errmsg, int errmsglen, const char *fname,
                                     int err, gboolean is_close);
 
@@ -404,6 +417,7 @@ print_usage(gboolean print_ver)
     fprintf(output, "  -n                       use pcapng format instead of pcap\n");
     /*fprintf(output, "\n");*/
     fprintf(output, "Miscellaneous:\n");
+    fprintf(output, "  -t                       use a separate thread per interface\n");
     fprintf(output, "  -q                       don't report packet capture counts\n");
     fprintf(output, "  -v                       print version information and exit\n");
     fprintf(output, "  -h                       display this help and exit\n");
@@ -2114,8 +2128,11 @@ cap_pipe_dispatch(loop_data *ld, pcap_options *pcap_opts, guchar *data, char *er
         phdr.caplen = pcap_opts->cap_pipe_rechdr.hdr.incl_len;
         phdr.len = pcap_opts->cap_pipe_rechdr.hdr.orig_len;
 
-        capture_loop_packet_cb((u_char *)ld, &phdr, data);
-
+        if (use_threads) {
+            capture_loop_queue_packet_cb((u_char *)&pcap_opts->interface_id, &phdr, data);
+        } else {
+            capture_loop_write_packet_cb((u_char *)&pcap_opts->interface_id, &phdr, data);
+        }
         pcap_opts->cap_pipe_state = STATE_EXPECT_REC_HDR;
         return 1;
 
@@ -2212,6 +2229,13 @@ capture_loop_open_input(capture_options *capture_opts, loop_data *ld,
         return FALSE;
     }
 #endif
+    if ((use_threads == FALSE) &&
+        (capture_opts->ifaces->len > 1)) {
+        g_snprintf(errmsg, errmsg_len,
+                   "Using threads is required for capturing on mulitple interfaces! Use the -t option.");
+        return FALSE;
+    }
+
     for (i = 0; i < capture_opts->ifaces->len; i++) {
         interface_opts = g_array_index(capture_opts->ifaces, interface_options, i);
         pcap_opts.pcap_h = NULL;
@@ -2564,7 +2588,7 @@ capture_loop_close_output(capture_options *capture_opts, loop_data *ld, int *err
  * indefinitely.
  */
 static int
-capture_loop_dispatch(capture_options *capture_opts _U_, loop_data *ld,
+capture_loop_dispatch(loop_data *ld,
                       char *errmsg, int errmsg_len, pcap_options *pcap_opts)
 {
     int       inpkts;
@@ -2634,8 +2658,13 @@ capture_loop_dispatch(capture_options *capture_opts _U_, loop_data *ld,
                  * processing immediately, rather than processing all packets
                  * in a batch before quitting.
                  */
-                inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_packet_cb,
+                if (use_threads) { 
+                    inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_queue_packet_cb,
                                        (u_char *)&(pcap_opts->interface_id));
+                } else {
+                    inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_write_packet_cb,
+                                       (u_char *)&(pcap_opts->interface_id));
+                }
                 if (inpkts < 0) {
                     if (inpkts == -1) {
                         /* Error, rather than pcap_breakloop(). */
@@ -2667,9 +2696,17 @@ capture_loop_dispatch(capture_options *capture_opts _U_, loop_data *ld,
              * after processing packets.  We therefore process only one packet
              * at a time, so that we can check the pipe after every packet.
              */
-            inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_packet_cb, (u_char *) ld);
+            if (use_threads) { 
+                inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_queue_packet_cb, (u_char *)&pcap_opts->interface_id);
+            } else {
+                inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_write_packet_cb, (u_char *)&pcap_opts->interface_id);
+            }
 #else
-            inpkts = pcap_dispatch(pcap_opts->pcap_h, -1, capture_loop_packet_cb, (u_char *) ld);
+            if (use_threads) { 
+                inpkts = pcap_dispatch(pcap_opts->pcap_h, -1, capture_loop_queue_packet_cb, (u_char *)&pcap_opts->interface_id);
+            } else {
+                inpkts = pcap_dispatch(pcap_opts->pcap_h, -1, capture_loop_write_packet_cb, (u_char *)&pcap_opts->interface_id);
+            }
 #endif
             if (inpkts < 0) {
                 if (inpkts == -1) {
@@ -2701,8 +2738,13 @@ capture_loop_dispatch(capture_options *capture_opts _U_, loop_data *ld,
 
                 in = 0;
                 while(ld->go &&
-                      (in = pcap_next_ex(pcap_opts->pcap_h, &pkt_header, &pkt_data)) == 1)
-                    capture_loop_packet_cb( (u_char *) ld, pkt_header, pkt_data);
+                      (in = pcap_next_ex(pcap_opts->pcap_h, &pkt_header, &pkt_data)) == 1) {
+                    if (use_threads) {
+                        capture_loop_queue_packet_cb((u_char *)&pcap_opts->interface_id, pkt_header, pkt_data);
+                    } else {
+                        capture_loop_write_packet_cb((u_char *)&pcap_opts->interface_id, pkt_header, pkt_data);
+                    }
+                }
 
                 if(in < 0) {
                     pcap_opts->pcap_err = TRUE;
@@ -2928,6 +2970,30 @@ do_file_switch_or_stop(capture_options *capture_opts,
     return TRUE;
 }
 
+static void *
+pcap_read_handler(void* arg)
+{
+    pcap_options pcap_opts;
+    guint interface_id;
+    char errmsg[MSG_MAX_LENGTH+1];
+
+    interface_id = *(guint *)arg;
+    g_free(arg);
+    pcap_opts = g_array_index(global_ld.pcaps, pcap_options, interface_id);
+
+    g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Started thread for interface %d.",
+          interface_id);
+
+    while (global_ld.go) {
+        /* dispatch incoming packets */
+        capture_loop_dispatch(&global_ld, errmsg, sizeof(errmsg), &pcap_opts);
+    }
+    g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Stopped thread for interface %d.",
+          interface_id);
+    g_thread_exit(NULL);
+    return (NULL);
+}
+
 /* Do the low-level work of a capture.
    Returns TRUE if it succeeds, FALSE otherwise. */
 static gboolean
@@ -3071,11 +3137,46 @@ capture_loop_start(capture_options *capture_opts, gboolean *stats_known, struct
 
     /* WOW, everything is prepared! */
     /* please fasten your seat belts, we will enter now the actual capture loop */
+    if (use_threads) {
+        pcap_queue = g_async_queue_new();
+        for (i = 0; i < global_ld.pcaps->len; i++) {
+            gint *interface_id;
+
+            interface_id = (gint *)g_malloc(sizeof(guint));
+            *interface_id = i;
+            pcap_opts = g_array_index(global_ld.pcaps, pcap_options, i);
+            pcap_opts.tid = g_thread_create(pcap_read_handler, interface_id, TRUE, NULL);
+            global_ld.pcaps = g_array_remove_index(global_ld.pcaps, i);
+            g_array_insert_val(global_ld.pcaps, i, pcap_opts);
+        }
+    }
     while (global_ld.go) {
         /* dispatch incoming packets */
-        inpkts = capture_loop_dispatch(capture_opts, &global_ld, errmsg,
-                                       sizeof(errmsg), &pcap_opts);
-
+        if (use_threads) {
+            GTimeVal    write_thread_time;
+            pcap_queue_element *queue_element;
+
+            g_get_current_time(&write_thread_time);
+            g_time_val_add(&write_thread_time, WRITER_THREAD_TIMEOUT);
+            queue_element = g_async_queue_timed_pop(pcap_queue, &write_thread_time);
+            if (queue_element) {
+                g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+                      "Dequeued a packet of length %d captured on interface %d.",
+                      queue_element->phdr.caplen,queue_element->interface_id);
+
+                capture_loop_write_packet_cb((u_char *)&queue_element->interface_id,
+                                             &queue_element->phdr,
+                                             queue_element->pd);
+                g_free(queue_element->pd);
+                g_free(queue_element);
+                inpkts = 1;
+            } else {
+                inpkts = 0;
+            }
+        } else {
+            inpkts = capture_loop_dispatch(&global_ld, errmsg,
+                                           sizeof(errmsg), &pcap_opts);
+        }
 #ifdef SIGINFO
         /* Were we asked to print packet counts by the SIGINFO handler? */
         if (global_ld.report_packet_count) {
@@ -3161,6 +3262,33 @@ capture_loop_start(capture_options *capture_opts, gboolean *stats_known, struct
     }
 
     g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Capture loop stopping ...");
+    if (use_threads) {
+        pcap_queue_element *queue_element;
+
+        for (i = 0; i < global_ld.pcaps->len; i++) {
+            pcap_opts = g_array_index(global_ld.pcaps, pcap_options, i);
+            g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Waiting for thread of interface %u...",
+                  pcap_opts.interface_id);
+            g_thread_join(pcap_opts.tid);
+            g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Thread of interface %u terminated.",
+                  pcap_opts.interface_id);
+        }
+        while ((queue_element = g_async_queue_try_pop(pcap_queue))) {
+            g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+                  "Dequeued a packet of length %d captured on interface %d.",
+                  queue_element->phdr.caplen,queue_element->interface_id);
+            capture_loop_write_packet_cb((u_char *)&queue_element->interface_id,
+                                         &queue_element->phdr,
+                                         queue_element->pd);
+            g_free(queue_element->pd);
+            g_free(queue_element);
+            global_ld.inpkts_to_sync_pipe += 1;
+            if (capture_opts->output_to_pipe) {
+                libpcap_dump_flush(global_ld.pdh, NULL);
+            }
+        }
+    }
+
 
     /* delete stop conditions */
     if (cnd_file_duration != NULL)
@@ -3378,42 +3506,70 @@ capture_loop_get_errmsg(char *errmsg, int errmsglen, const char *fname,
 
 /* one packet was captured, process it */
 static void
-capture_loop_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
-                       const u_char *pd)
+capture_loop_write_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+                             const u_char *pd)
 {
-    loop_data *ld = (loop_data *) (void *) user;
+    guint interface_id = *(guint *) (void *) user;
     int err;
 
     /* We may be called multiple times from pcap_dispatch(); if we've set
        the "stop capturing" flag, ignore this packet, as we're not
        supposed to be saving any more packets. */
-    if (!ld->go)
+    if (!global_ld.go)
         return;
 
-    if (ld->pdh) {
+    if (global_ld.pdh) {
         gboolean successful;
 
         /* We're supposed to write the packet to a file; do so.
            If this fails, set "ld->go" to FALSE, to stop the capture, and set
            "ld->err" to the error. */
         if (global_capture_opts.use_pcapng) {
-            successful = libpcap_write_enhanced_packet_block(ld->pdh, phdr, 0, pd, &ld->bytes_written, &err);
+            successful = libpcap_write_enhanced_packet_block(global_ld.pdh, phdr, interface_id, pd, &global_ld.bytes_written, &err);
         } else {
-            successful = libpcap_write_packet(ld->pdh, phdr, pd, &ld->bytes_written, &err);
+            successful = libpcap_write_packet(global_ld.pdh, phdr, pd, &global_ld.bytes_written, &err);
         }
         if (!successful) {
-            ld->go = FALSE;
-            ld->err = err;
+            global_ld.go = FALSE;
+            global_ld.err = err;
         } else {
-            ld->packet_count++;
+            g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+                  "Wrote a packet of length %d captured on interface %u.",
+                   phdr->caplen, interface_id);
+            global_ld.packet_count++;
             /* if the user told us to stop after x packets, do we already have enough? */
-            if ((ld->packet_max > 0) && (ld->packet_count >= ld->packet_max)) {
-                ld->go = FALSE;
+            if ((global_ld.packet_max > 0) && (global_ld.packet_count >= global_ld.packet_max)) {
+                global_ld.go = FALSE;
             }
         }
     }
 }
 
+/* one packet was captured, queue it */
+static void
+capture_loop_queue_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+                             const u_char *pd)
+{
+    pcap_queue_element *queue_element;
+    guint interface_id;
+
+    /* We may be called multiple times from pcap_dispatch(); if we've set
+       the "stop capturing" flag, ignore this packet, as we're not
+       supposed to be saving any more packets. */
+    if (!global_ld.go)
+        return;
+
+    interface_id = *(guint *) (void *) user;
+    queue_element = (pcap_queue_element *)g_malloc(sizeof(pcap_queue_element));
+    queue_element->interface_id = interface_id;
+    queue_element->phdr = *phdr;
+    queue_element->pd = (u_char *)g_malloc(phdr->caplen);
+    memcpy(queue_element->pd, pd, phdr->caplen);
+    g_async_queue_push(pcap_queue, queue_element);
+    g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+          "Queued a packet of length %d captured on interface %u.",
+          phdr->caplen, interface_id);
+}
 
 /* And now our feature presentation... [ fade to music ] */
 int
@@ -3500,7 +3656,7 @@ main(int argc, char *argv[])
 #define OPTSTRING_d ""
 #endif
 
-#define OPTSTRING "a:" OPTSTRING_A "b:" OPTSTRING_B "c:" OPTSTRING_d "Df:ghi:" OPTSTRING_I "L" OPTSTRING_m "Mnpq" OPTSTRING_r "Ss:" OPTSTRING_u "vw:y:Z:"
+#define OPTSTRING "a:" OPTSTRING_A "b:" OPTSTRING_B "c:" OPTSTRING_d "Df:ghi:" OPTSTRING_I "L" OPTSTRING_m "Mnpq" OPTSTRING_r "Ss:t" OPTSTRING_u "vw:y:Z:"
 
 #ifdef DEBUG_CHILD_DUMPCAP
     if ((debug_log = ws_fopen("dumpcap_debug_log.tmp","w")) == NULL) {
@@ -3819,6 +3975,9 @@ main(int argc, char *argv[])
             quiet = TRUE;
             break;
 
+        case 't':
+            use_threads = TRUE;
+            break;
             /*** all non capture option specific ***/
         case 'D':        /* Print a list of capture devices and exit */
             list_interfaces = TRUE;