#define USE_THREADS
#endif
+static GAsyncQueue *pcap_queue;
+
static gboolean capture_child = FALSE; /* FALSE: standalone call, TRUE: this is an Wireshark capture child */
#ifdef _WIN32
static gchar *sig_pipe_name = NULL;
INITFILTER_OTHER_ERROR
} initfilter_status_t;
+typedef struct _pcap_queue_element {
+ guint interface_id;
+ struct pcap_pkthdr phdr;
+ u_char *pd;
+} pcap_queue_element;
+
typedef struct _pcap_options {
pcap_t *pcap_h;
#ifdef MUST_DO_SELECT
#endif
static const char *cap_pipe_err_str;
+#define WRITER_THREAD_TIMEOUT 100000 /* usecs */
+
static void
console_log_handler(const char *log_domain, GLogLevelFlags log_level,
const char *message, gpointer user_data _U_);
/* capture related options */
static capture_options global_capture_opts;
static gboolean quiet = FALSE;
+static gboolean use_threads = FALSE;
-static void capture_loop_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
- const u_char *pd);
+static void capture_loop_write_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+ const u_char *pd);
+static void capture_loop_queue_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+ const u_char *pd);
static void capture_loop_get_errmsg(char *errmsg, int errmsglen, const char *fname,
int err, gboolean is_close);
fprintf(output, " -n use pcapng format instead of pcap\n");
/*fprintf(output, "\n");*/
fprintf(output, "Miscellaneous:\n");
+ fprintf(output, " -t use a separate thread per interface\n");
fprintf(output, " -q don't report packet capture counts\n");
fprintf(output, " -v print version information and exit\n");
fprintf(output, " -h display this help and exit\n");
phdr.caplen = pcap_opts->cap_pipe_rechdr.hdr.incl_len;
phdr.len = pcap_opts->cap_pipe_rechdr.hdr.orig_len;
- capture_loop_packet_cb((u_char *)ld, &phdr, data);
-
+ if (use_threads) {
+ capture_loop_queue_packet_cb((u_char *)&pcap_opts->interface_id, &phdr, data);
+ } else {
+ capture_loop_write_packet_cb((u_char *)&pcap_opts->interface_id, &phdr, data);
+ }
pcap_opts->cap_pipe_state = STATE_EXPECT_REC_HDR;
return 1;
return FALSE;
}
#endif
+ if ((use_threads == FALSE) &&
+ (capture_opts->ifaces->len > 1)) {
+ g_snprintf(errmsg, errmsg_len,
+ "Using threads is required for capturing on mulitple interfaces! Use the -t option.");
+ return FALSE;
+ }
+
for (i = 0; i < capture_opts->ifaces->len; i++) {
interface_opts = g_array_index(capture_opts->ifaces, interface_options, i);
pcap_opts.pcap_h = NULL;
* indefinitely.
*/
static int
-capture_loop_dispatch(capture_options *capture_opts _U_, loop_data *ld,
+capture_loop_dispatch(loop_data *ld,
char *errmsg, int errmsg_len, pcap_options *pcap_opts)
{
int inpkts;
* processing immediately, rather than processing all packets
* in a batch before quitting.
*/
- inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_packet_cb,
+ if (use_threads) {
+ inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_queue_packet_cb,
(u_char *)&(pcap_opts->interface_id));
+ } else {
+ inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_write_packet_cb,
+ (u_char *)&(pcap_opts->interface_id));
+ }
if (inpkts < 0) {
if (inpkts == -1) {
/* Error, rather than pcap_breakloop(). */
* after processing packets. We therefore process only one packet
* at a time, so that we can check the pipe after every packet.
*/
- inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_packet_cb, (u_char *) ld);
+ if (use_threads) {
+ inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_queue_packet_cb, (u_char *)&pcap_opts->interface_id);
+ } else {
+ inpkts = pcap_dispatch(pcap_opts->pcap_h, 1, capture_loop_write_packet_cb, (u_char *)&pcap_opts->interface_id);
+ }
#else
- inpkts = pcap_dispatch(pcap_opts->pcap_h, -1, capture_loop_packet_cb, (u_char *) ld);
+ if (use_threads) {
+ inpkts = pcap_dispatch(pcap_opts->pcap_h, -1, capture_loop_queue_packet_cb, (u_char *)&pcap_opts->interface_id);
+ } else {
+ inpkts = pcap_dispatch(pcap_opts->pcap_h, -1, capture_loop_write_packet_cb, (u_char *)&pcap_opts->interface_id);
+ }
#endif
if (inpkts < 0) {
if (inpkts == -1) {
in = 0;
while(ld->go &&
- (in = pcap_next_ex(pcap_opts->pcap_h, &pkt_header, &pkt_data)) == 1)
- capture_loop_packet_cb( (u_char *) ld, pkt_header, pkt_data);
+ (in = pcap_next_ex(pcap_opts->pcap_h, &pkt_header, &pkt_data)) == 1) {
+ if (use_threads) {
+ capture_loop_queue_packet_cb((u_char *)&pcap_opts->interface_id, pkt_header, pkt_data);
+ } else {
+ capture_loop_write_packet_cb((u_char *)&pcap_opts->interface_id, pkt_header, pkt_data);
+ }
+ }
if(in < 0) {
pcap_opts->pcap_err = TRUE;
return TRUE;
}
+static void *
+pcap_read_handler(void* arg)
+{
+ pcap_options pcap_opts;
+ guint interface_id;
+ char errmsg[MSG_MAX_LENGTH+1];
+
+ interface_id = *(guint *)arg;
+ g_free(arg);
+ pcap_opts = g_array_index(global_ld.pcaps, pcap_options, interface_id);
+
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Started thread for interface %d.",
+ interface_id);
+
+ while (global_ld.go) {
+ /* dispatch incoming packets */
+ capture_loop_dispatch(&global_ld, errmsg, sizeof(errmsg), &pcap_opts);
+ }
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Stopped thread for interface %d.",
+ interface_id);
+ g_thread_exit(NULL);
+ return (NULL);
+}
+
/* Do the low-level work of a capture.
Returns TRUE if it succeeds, FALSE otherwise. */
static gboolean
/* WOW, everything is prepared! */
/* please fasten your seat belts, we will enter now the actual capture loop */
+ if (use_threads) {
+ pcap_queue = g_async_queue_new();
+ for (i = 0; i < global_ld.pcaps->len; i++) {
+ gint *interface_id;
+
+ interface_id = (gint *)g_malloc(sizeof(guint));
+ *interface_id = i;
+ pcap_opts = g_array_index(global_ld.pcaps, pcap_options, i);
+ pcap_opts.tid = g_thread_create(pcap_read_handler, interface_id, TRUE, NULL);
+ global_ld.pcaps = g_array_remove_index(global_ld.pcaps, i);
+ g_array_insert_val(global_ld.pcaps, i, pcap_opts);
+ }
+ }
while (global_ld.go) {
/* dispatch incoming packets */
- inpkts = capture_loop_dispatch(capture_opts, &global_ld, errmsg,
- sizeof(errmsg), &pcap_opts);
-
+ if (use_threads) {
+ GTimeVal write_thread_time;
+ pcap_queue_element *queue_element;
+
+ g_get_current_time(&write_thread_time);
+ g_time_val_add(&write_thread_time, WRITER_THREAD_TIMEOUT);
+ queue_element = g_async_queue_timed_pop(pcap_queue, &write_thread_time);
+ if (queue_element) {
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+ "Dequeued a packet of length %d captured on interface %d.",
+ queue_element->phdr.caplen,queue_element->interface_id);
+
+ capture_loop_write_packet_cb((u_char *)&queue_element->interface_id,
+ &queue_element->phdr,
+ queue_element->pd);
+ g_free(queue_element->pd);
+ g_free(queue_element);
+ inpkts = 1;
+ } else {
+ inpkts = 0;
+ }
+ } else {
+ inpkts = capture_loop_dispatch(&global_ld, errmsg,
+ sizeof(errmsg), &pcap_opts);
+ }
#ifdef SIGINFO
/* Were we asked to print packet counts by the SIGINFO handler? */
if (global_ld.report_packet_count) {
}
g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Capture loop stopping ...");
+ if (use_threads) {
+ pcap_queue_element *queue_element;
+
+ for (i = 0; i < global_ld.pcaps->len; i++) {
+ pcap_opts = g_array_index(global_ld.pcaps, pcap_options, i);
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Waiting for thread of interface %u...",
+ pcap_opts.interface_id);
+ g_thread_join(pcap_opts.tid);
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO, "Thread of interface %u terminated.",
+ pcap_opts.interface_id);
+ }
+ while ((queue_element = g_async_queue_try_pop(pcap_queue))) {
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+ "Dequeued a packet of length %d captured on interface %d.",
+ queue_element->phdr.caplen,queue_element->interface_id);
+ capture_loop_write_packet_cb((u_char *)&queue_element->interface_id,
+ &queue_element->phdr,
+ queue_element->pd);
+ g_free(queue_element->pd);
+ g_free(queue_element);
+ global_ld.inpkts_to_sync_pipe += 1;
+ if (capture_opts->output_to_pipe) {
+ libpcap_dump_flush(global_ld.pdh, NULL);
+ }
+ }
+ }
+
/* delete stop conditions */
if (cnd_file_duration != NULL)
/* one packet was captured, process it */
static void
-capture_loop_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
- const u_char *pd)
+capture_loop_write_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+ const u_char *pd)
{
- loop_data *ld = (loop_data *) (void *) user;
+ guint interface_id = *(guint *) (void *) user;
int err;
/* We may be called multiple times from pcap_dispatch(); if we've set
the "stop capturing" flag, ignore this packet, as we're not
supposed to be saving any more packets. */
- if (!ld->go)
+ if (!global_ld.go)
return;
- if (ld->pdh) {
+ if (global_ld.pdh) {
gboolean successful;
/* We're supposed to write the packet to a file; do so.
If this fails, set "ld->go" to FALSE, to stop the capture, and set
"ld->err" to the error. */
if (global_capture_opts.use_pcapng) {
- successful = libpcap_write_enhanced_packet_block(ld->pdh, phdr, 0, pd, &ld->bytes_written, &err);
+ successful = libpcap_write_enhanced_packet_block(global_ld.pdh, phdr, interface_id, pd, &global_ld.bytes_written, &err);
} else {
- successful = libpcap_write_packet(ld->pdh, phdr, pd, &ld->bytes_written, &err);
+ successful = libpcap_write_packet(global_ld.pdh, phdr, pd, &global_ld.bytes_written, &err);
}
if (!successful) {
- ld->go = FALSE;
- ld->err = err;
+ global_ld.go = FALSE;
+ global_ld.err = err;
} else {
- ld->packet_count++;
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+ "Wrote a packet of length %d captured on interface %u.",
+ phdr->caplen, interface_id);
+ global_ld.packet_count++;
/* if the user told us to stop after x packets, do we already have enough? */
- if ((ld->packet_max > 0) && (ld->packet_count >= ld->packet_max)) {
- ld->go = FALSE;
+ if ((global_ld.packet_max > 0) && (global_ld.packet_count >= global_ld.packet_max)) {
+ global_ld.go = FALSE;
}
}
}
}
+/* one packet was captured, queue it */
+static void
+capture_loop_queue_packet_cb(u_char *user, const struct pcap_pkthdr *phdr,
+ const u_char *pd)
+{
+ pcap_queue_element *queue_element;
+ guint interface_id;
+
+ /* We may be called multiple times from pcap_dispatch(); if we've set
+ the "stop capturing" flag, ignore this packet, as we're not
+ supposed to be saving any more packets. */
+ if (!global_ld.go)
+ return;
+
+ interface_id = *(guint *) (void *) user;
+ queue_element = (pcap_queue_element *)g_malloc(sizeof(pcap_queue_element));
+ queue_element->interface_id = interface_id;
+ queue_element->phdr = *phdr;
+ queue_element->pd = (u_char *)g_malloc(phdr->caplen);
+ memcpy(queue_element->pd, pd, phdr->caplen);
+ g_async_queue_push(pcap_queue, queue_element);
+ g_log(LOG_DOMAIN_CAPTURE_CHILD, G_LOG_LEVEL_INFO,
+ "Queued a packet of length %d captured on interface %u.",
+ phdr->caplen, interface_id);
+}
/* And now our feature presentation... [ fade to music ] */
int
#define OPTSTRING_d ""
#endif
-#define OPTSTRING "a:" OPTSTRING_A "b:" OPTSTRING_B "c:" OPTSTRING_d "Df:ghi:" OPTSTRING_I "L" OPTSTRING_m "Mnpq" OPTSTRING_r "Ss:" OPTSTRING_u "vw:y:Z:"
+#define OPTSTRING "a:" OPTSTRING_A "b:" OPTSTRING_B "c:" OPTSTRING_d "Df:ghi:" OPTSTRING_I "L" OPTSTRING_m "Mnpq" OPTSTRING_r "Ss:t" OPTSTRING_u "vw:y:Z:"
#ifdef DEBUG_CHILD_DUMPCAP
if ((debug_log = ws_fopen("dumpcap_debug_log.tmp","w")) == NULL) {
quiet = TRUE;
break;
+ case 't':
+ use_threads = TRUE;
+ break;
/*** all non capture option specific ***/
case 'D': /* Print a list of capture devices and exit */
list_interfaces = TRUE;