Re: [GIT] Experimental threaded udev

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Mon, Jun 1, 2009 at 15:57, Kay Sievers <kay.sievers@xxxxxxxx> wrote:

>> The code looked like you freed the worker, but left the event RUNNING, and
>> it would never be released.  I would delete the event instead, just like the
>> old system.
>>
>> I haven't read V2 yet though, maybe you fixed it.
>
> I just set it back to QUEUED for now. Not sure if droppin git or
> re-trying it a few times would be better.

Version 3, which should clean up events with a worker that died. Also
kills all workers if the config has changed.

I guess we should find out if we go for rtsignals, a pipe and/or
signalfd. Any ideas?

Would be good to start with something in a repository, instead of
sending huge patches, I guess.

It's so much faster than the current clone per event, seems we want to
have something like that in a released version.

Thanks,
Kay
diff --git a/udev/lib/libudev-monitor.c b/udev/lib/libudev-monitor.c
index 395a4d2..54c9576 100644
--- a/udev/lib/libudev-monitor.c
+++ b/udev/lib/libudev-monitor.c
@@ -32,7 +32,6 @@ struct udev_monitor {
 	int refcount;
 	int sock;
 	struct sockaddr_nl snl;
-	struct sockaddr_nl snl_peer;
 	struct sockaddr_un sun;
 	socklen_t addrlen;
 	struct udev_list_node filter_subsystem_list;
@@ -171,8 +170,8 @@ struct udev_monitor *udev_monitor_new_from_netlink(struct udev *udev, const char
 		return NULL;
 
 	if (name == NULL)
-		return NULL;
-	if (strcmp(name, "kernel") == 0)
+		group = 0;
+	else if (strcmp(name, "kernel") == 0)
 		group = UDEV_MONITOR_KERNEL;
 	else if (strcmp(name, "udev") == 0)
 		group = UDEV_MONITOR_UDEV;
@@ -193,8 +192,6 @@ struct udev_monitor *udev_monitor_new_from_netlink(struct udev *udev, const char
 
 	udev_monitor->snl.nl_family = AF_NETLINK;
 	udev_monitor->snl.nl_groups = group;
-	udev_monitor->snl_peer.nl_family = AF_NETLINK;
-	udev_monitor->snl_peer.nl_groups = UDEV_MONITOR_UDEV;
 
 	dbg(udev, "monitor %p created with NETLINK_KOBJECT_UEVENT (%u)\n", udev_monitor, group);
 	return udev_monitor;
@@ -434,7 +431,6 @@ struct udev_device *udev_monitor_receive_device(struct udev_monitor *udev_monito
 	struct iovec iov;
 	char cred_msg[CMSG_SPACE(sizeof(struct ucred))];
 	struct cmsghdr *cmsg;
-	struct sockaddr_nl snl;
 	struct ucred *cred;
 	char buf[8192];
 	ssize_t buflen;
@@ -459,11 +455,6 @@ retry:
 	smsg.msg_control = cred_msg;
 	smsg.msg_controllen = sizeof(cred_msg);
 
-	if (udev_monitor->snl.nl_family != 0) {
-		smsg.msg_name = &snl;
-		smsg.msg_namelen = sizeof(snl);
-	}
-
 	buflen = recvmsg(udev_monitor->sock, &smsg, 0);
 	if (buflen < 0) {
 		if (errno != EINTR)
@@ -476,20 +467,6 @@ retry:
 		return NULL;
 	}
 
-	if (udev_monitor->snl.nl_family != 0) {
-		if (snl.nl_groups == 0) {
-			info(udev_monitor->udev, "unicast netlink message ignored\n");
-			return NULL;
-		}
-		if (snl.nl_groups == UDEV_MONITOR_KERNEL) {
-			if (snl.nl_pid > 0) {
-				info(udev_monitor->udev, "multicast kernel netlink message from pid %d ignored\n", snl.nl_pid);
-				return NULL;
-			}
-			is_kernel = 1;
-		}
-	}
-
 	cmsg = CMSG_FIRSTHDR(&smsg);
 	if (cmsg == NULL || cmsg->cmsg_type != SCM_CREDENTIALS) {
 		info(udev_monitor->udev, "no sender credentials received, message ignored\n");
@@ -621,7 +598,7 @@ retry:
 	return udev_device;
 }
 
-int udev_monitor_send_device(struct udev_monitor *udev_monitor, struct udev_device *udev_device)
+int udev_monitor_send_device(struct udev_monitor *udev_monitor, struct udev_device *udev_device, pid_t pid)
 {
 	struct msghdr smsg;
 	struct iovec iov[2];
@@ -660,6 +637,7 @@ int udev_monitor_send_device(struct udev_monitor *udev_monitor, struct udev_devi
 	} else if (udev_monitor->snl.nl_family != 0) {
 		const char *val;
 		struct udev_monitor_netlink_header nlh;
+		struct sockaddr_nl snl_peer;
 
 
 		/* add versioned header */
@@ -680,11 +658,18 @@ int udev_monitor_send_device(struct udev_monitor *udev_monitor, struct udev_devi
 		iov[1].iov_base = (char *)buf;
 		iov[1].iov_len = blen;
 
+		/* we will always get ECONNREFUSED when sending to the muticast group */
+		memset(&snl_peer, 0x00, sizeof(struct sockaddr_nl));
+		snl_peer.nl_family = AF_NETLINK;
+		if (pid > 0)
+			snl_peer.nl_pid = pid;
+		else
+			snl_peer.nl_groups = UDEV_MONITOR_UDEV;
+
 		memset(&smsg, 0x00, sizeof(struct msghdr));
 		smsg.msg_iov = iov;
 		smsg.msg_iovlen = 2;
-		/* no destination besides the muticast group, we will always get ECONNREFUSED */
-		smsg.msg_name = &udev_monitor->snl_peer;
+		smsg.msg_name = &snl_peer;
 		smsg.msg_namelen = sizeof(struct sockaddr_nl);
 	} else {
 		return -1;
diff --git a/udev/lib/libudev-private.h b/udev/lib/libudev-private.h
index 3eb3d79..3019920 100644
--- a/udev/lib/libudev-private.h
+++ b/udev/lib/libudev-private.h
@@ -86,7 +86,7 @@ int udev_device_delete_db(struct udev_device *udev_device);
 int udev_device_rename_db(struct udev_device *udev_device, const char *devpath);
 
 /* libudev-monitor - netlink/unix socket communication  */
-int udev_monitor_send_device(struct udev_monitor *udev_monitor, struct udev_device *udev_device);
+int udev_monitor_send_device(struct udev_monitor *udev_monitor, struct udev_device *udev_device, pid_t pid);
 int udev_monitor_set_receive_buffer_size(struct udev_monitor *udev_monitor, int size);
 
 /* libudev-ctrl - daemon runtime setup */
diff --git a/udev/udev-event.c b/udev/udev-event.c
index d521251..8ab262a 100644
--- a/udev/udev-event.c
+++ b/udev/udev-event.c
@@ -734,18 +734,13 @@ int udev_event_execute_run(struct udev_event *event)
 			monitor = udev_monitor_new_from_socket(event->udev, &cmd[strlen("socket:")]);
 			if (monitor == NULL)
 				continue;
-			udev_monitor_send_device(monitor, event->dev);
+			udev_monitor_send_device(monitor, event->dev, 0);
 			udev_monitor_unref(monitor);
 		} else {
 			char program[UTIL_PATH_SIZE];
 			char **envp;
 
 			udev_event_apply_format(event, cmd, program, sizeof(program));
-			if (event->trace)
-				fprintf(stderr, "run  %s (%llu) '%s'\n",
-				       udev_device_get_syspath(event->dev),
-				       udev_device_get_seqnum(event->dev),
-				       program);
 			envp = udev_device_get_properties_envp(event->dev);
 			if (util_run_program(event->udev, program, envp, NULL, 0, NULL) != 0) {
 				if (!udev_list_entry_get_flag(list_entry))
diff --git a/udev/udev.h b/udev/udev.h
index 8f2c1c6..ed29c4b 100644
--- a/udev/udev.h
+++ b/udev/udev.h
@@ -53,7 +53,6 @@ static inline void logging_close(void)
 }
 
 struct udev_event {
-	struct udev_list_node node;
 	struct udev *udev;
 	struct udev_device *dev;
 	struct udev_device *dev_parent;
@@ -64,10 +63,6 @@ struct udev_event {
 	uid_t uid;
 	gid_t gid;
 	struct udev_list_node run_list;
-	pid_t pid;
-	int exitstatus;
-	time_t queue_time;
-	unsigned long long int delaying_seqnum;
 	unsigned int group_final:1;
 	unsigned int owner_final:1;
 	unsigned int mode_final:1;
@@ -76,7 +71,6 @@ struct udev_event {
 	unsigned int run_final:1;
 	unsigned int ignore_device:1;
 	unsigned int inotify_watch:1;
-	unsigned int trace:1;
 };
 
 struct udev_watch {
diff --git a/udev/udevd.c b/udev/udevd.c
index 37b547a..8890ee0 100644
--- a/udev/udevd.c
+++ b/udev/udevd.c
@@ -44,8 +44,7 @@
 #define UDEVD_PRIORITY			-4
 #define UDEV_PRIORITY			-2
 
-/* maximum limit of forked childs */
-#define UDEVD_MAX_CHILDS		256
+#define SIGRT_WORKER			SIGRTMIN+1
 
 static int debug;
 
@@ -61,34 +60,75 @@ static void log_fn(struct udev *udev, int priority,
 	}
 }
 
-static void reap_sigchilds(void);
-
 static int debug_trace;
 static struct udev_rules *rules;
 static struct udev_queue_export *udev_queue_export;
 static struct udev_ctrl *udev_ctrl;
-static struct udev_monitor *kernel_monitor;
-static volatile sig_atomic_t sigchilds_waiting;
+static struct udev_monitor *monitor;
+static pid_t main_pid;
+static volatile sig_atomic_t event_finished;
+static volatile sig_atomic_t worker_dead;
 static volatile sig_atomic_t udev_exit;
 static volatile sig_atomic_t reload_config;
 static volatile sig_atomic_t signal_received;
-static volatile pid_t settle_pid;
-static int run_exec_q;
+static pid_t settle_pid;
 static int stop_exec_q;
 static int max_childs;
-static int childs;
+static volatile int childs;
 static struct udev_list_node event_list;
+static struct udev_list_node worker_list;
+static volatile sig_atomic_t worker_exit;
+
+enum event_state {
+	EVENT_UNDEF,
+	EVENT_QUEUED,
+	EVENT_RUNNING,
+	EVENT_FINISHED,
+};
+
+struct event {
+	struct udev_list_node node;
+	struct udev *udev;
+	struct udev_device *dev;
+	enum event_state state;
+	int exitstatus;
+	unsigned long long int delaying_seqnum;
+};
 
-static struct udev_event *node_to_event(struct udev_list_node *node)
+static struct event *node_to_event(struct udev_list_node *node)
 {
 	char *event;
 
 	event = (char *)node;
-	event -= offsetof(struct udev_event, node);
-	return (struct udev_event *)event;
+	event -= offsetof(struct event, node);
+	return (struct event *)event;
+}
+
+enum worker_state {
+	WORKER_UNDEF,
+	WORKER_RUNNING,
+	WORKER_IDLE,
+	WORKER_KILLED,
+	WORKER_DEAD,
+};
+
+struct worker {
+	struct udev_list_node node;
+	pid_t pid;
+	enum worker_state state;
+	struct event *event;
+};
+
+static struct worker *node_to_worker(struct udev_list_node *node)
+{
+	char *worker;
+
+	worker = (char *)node;
+	worker -= offsetof(struct worker, node);
+	return (struct worker *)worker;
 }
 
-static void event_queue_delete(struct udev_event *event)
+static void event_queue_delete(struct event *event)
 {
 	udev_list_node_remove(&event->node);
 
@@ -99,125 +139,219 @@ static void event_queue_delete(struct udev_event *event)
 		udev_queue_export_device_finished(udev_queue_export, event->dev);
 
 	udev_device_unref(event->dev);
-	udev_event_unref(event);
+	free(event);
 }
 
 static void event_sig_handler(int signum)
 {
-	if (signum == SIGALRM)
+	switch (signum) {
+	case SIGALRM:
 		_exit(1);
+		break;
+	case SIGTERM:
+		worker_exit = 1;
+		break;
+	}
 }
 
-static void event_fork(struct udev_event *event)
+static void worker_new(struct event *event)
 {
+	struct worker *worker;
 	pid_t pid;
 	struct sigaction act;
-	int err;
-
-#if 0
-	/* single process, no forking, just for testing/profiling */
-	err = udev_event_execute_rules(event, rules);
-	if (err == 0 && !event->ignore_device && udev_get_run(event->udev))
-		udev_event_execute_run(event);
-	info(event->udev, "seq %llu exit with %i\n", udev_device_get_seqnum(event->dev), err);
-	event_queue_delete(event);
-	return;
-#endif
+	sigset_t mask;
 
-	if (debug_trace) {
-		event->trace = 1;
-		fprintf(stderr, "fork %s (%llu)\n",
-		       udev_device_get_syspath(event->dev),
-		       udev_device_get_seqnum(event->dev));
-	}
+	worker = calloc(1, sizeof(struct worker));
+	if (worker == NULL)
+		return;
+
+	/* block WORKER signals, until we joined the list with our new pid */
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGRT_WORKER);
+	sigprocmask(SIG_BLOCK, &mask, NULL);
+
+	event->state = EVENT_RUNNING;
 
 	pid = fork();
 	switch (pid) {
-	case 0:
-		/* child */
+	case 0: {
+		struct udev_device *dev;
+
 		udev_queue_export_unref(udev_queue_export);
 		udev_ctrl_unref(udev_ctrl);
+		udev_monitor_unref(monitor);
 		logging_close();
 		logging_init("udevd-event");
 		setpriority(PRIO_PROCESS, 0, UDEV_PRIORITY);
 
+		/* re-open socket to listen to udevd only, and send back libudev events */
+		monitor = udev_monitor_new_from_netlink(event->udev, NULL);
+		if (monitor == NULL)
+			_exit(2);
+		udev_monitor_enable_receiving(monitor);
+
 		/* set signal handlers */
 		memset(&act, 0x00, sizeof(act));
 		act.sa_handler = event_sig_handler;
 		sigemptyset (&act.sa_mask);
 		act.sa_flags = 0;
+		sigaction(SIGTERM, &act, NULL);
 		sigaction(SIGALRM, &act, NULL);
 
 		/* reset to default */
 		act.sa_handler = SIG_DFL;
 		sigaction(SIGINT, &act, NULL);
-		sigaction(SIGTERM, &act, NULL);
 		sigaction(SIGCHLD, &act, NULL);
 		sigaction(SIGHUP, &act, NULL);
+		sigaction(SIGRT_WORKER, &act, NULL);
 
-		/* set timeout to prevent hanging processes */
-		alarm(UDEV_EVENT_TIMEOUT);
+		/* initial device */
+		dev = event->dev;
 
-		/* apply rules, create node, symlinks */
-		err = udev_event_execute_rules(event, rules);
+		while (!worker_exit) {
+			struct udev_event *udev_event;
+			union sigval sigval;
+			int err;
 
-		/* rules may change/disable the timeout */
-		if (udev_device_get_event_timeout(event->dev) >= 0)
-			alarm(udev_device_get_event_timeout(event->dev));
+			udev_event = udev_event_new(dev);
+			if (udev_event == NULL)
+				_exit(3);
 
-		/* execute RUN= */
-		if (err == 0 && !event->ignore_device && udev_get_run(event->udev))
-			udev_event_execute_run(event);
+			/* set timeout to prevent hanging processes */
+			alarm(UDEV_EVENT_TIMEOUT);
 
-		/* apply/restore inotify watch */
-		if (err == 0 && event->inotify_watch) {
-			udev_watch_begin(event->udev, event->dev);
-			udev_device_update_db(event->dev);
-		}
+			/* apply rules, create node, symlinks */
+			err = udev_event_execute_rules(udev_event, rules);
+
+			/* rules may change/disable the timeout */
+			if (udev_device_get_event_timeout(dev) >= 0)
+				alarm(udev_device_get_event_timeout(dev));
+
+			/* execute RUN= */
+			if (err == 0 && !udev_event->ignore_device && udev_get_run(udev_event->udev))
+				udev_event_execute_run(udev_event);
+
+			/* reset alarm */
+			alarm(0);
+
+			/* apply/restore inotify watch */
+			if (err == 0 && udev_event->inotify_watch) {
+				udev_watch_begin(udev_event->udev, dev);
+				udev_device_update_db(dev);
+			}
+
+			/* send processed event back to libudev listeners */
+			udev_monitor_send_device(monitor, dev, 0);
+
+			info(event->udev, "seq %llu finished with %i\n", udev_device_get_seqnum(dev), err);
+			udev_device_unref(dev);
+			udev_event_unref(udev_event);
 
-		/* send processed event back to the kernel netlink socket */
-		udev_monitor_send_device(kernel_monitor, event->dev);
+			/* send back the result of the event execution */
+			sigval.sival_int = err;
+			/* FIXME: handle EAGAIN */
+			sigqueue(main_pid, SIGRT_WORKER, sigval);
 
-		info(event->udev, "seq %llu exit with %i\n", udev_device_get_seqnum(event->dev), err);
+			/* wait for more device messages from udevd */
+			do
+				dev = udev_monitor_receive_device(monitor);
+			while (!worker_exit && dev == NULL);
+		}
+
+		udev_monitor_unref(monitor);
 		logging_close();
-		if (err != 0)
-			exit(1);
 		exit(0);
+	}
 	case -1:
+		event->state = EVENT_QUEUED;
+		free(worker);
 		err(event->udev, "fork of child failed: %m\n");
-		event_queue_delete(event);
 		break;
 	default:
-		/* get SIGCHLD in main loop */
-		info(event->udev, "seq %llu forked, pid [%d], '%s' '%s', %ld seconds old\n",
-		     udev_device_get_seqnum(event->dev),
-		     pid,
-		     udev_device_get_action(event->dev),
-		     udev_device_get_subsystem(event->dev),
-		     time(NULL) - event->queue_time);
-		event->pid = pid;
+		worker->pid = pid;
+		worker->event = event;
+		worker->state = WORKER_RUNNING;
+		udev_list_node_append(&worker->node, &worker_list);
 		childs++;
+		break;
+	}
+
+	sigprocmask(SIG_UNBLOCK, &mask, NULL);
+}
+
+static void event_run(struct event *event)
+{
+	struct udev_list_node *loop;
+
+	udev_list_node_foreach(loop, &worker_list) {
+		struct worker *worker = node_to_worker(loop);
+		ssize_t count;
+
+		if (worker->state != WORKER_IDLE)
+			continue;
+
+		worker->event = event;
+		worker->state = WORKER_RUNNING;
+		event->state = EVENT_RUNNING;
+		count = udev_monitor_send_device(monitor, event->dev, worker->pid);
+		if (count < 0) {
+			err(event->udev, "worker [%u] did not accept message, kill it\n", worker->pid);
+			event->state = EVENT_QUEUED;
+			worker->state = WORKER_KILLED;
+			kill(worker->pid, SIGKILL);
+			continue;
+		}
+		return;
+	}
+
+	if (childs >= max_childs) {
+		info(event->udev, "maximum number (%i) of childs reached\n", childs);
+		return;
 	}
+
+	/* start new worker and pass initial device */
+	worker_new(event);
 }
 
-static void event_queue_insert(struct udev_event *event)
+static void event_queue_insert(struct udev_device *dev)
 {
-	event->queue_time = time(NULL);
+	struct event *event;
+
+	event = calloc(1, sizeof(struct event));
+	if (event == NULL)
+		return;
 
-	udev_queue_export_device_queued(udev_queue_export, event->dev);
-	info(event->udev, "seq %llu queued, '%s' '%s'\n", udev_device_get_seqnum(event->dev),
-	     udev_device_get_action(event->dev), udev_device_get_subsystem(event->dev));
+	event->udev = udev_device_get_udev(dev);
+	event->dev = dev;
+	udev_queue_export_device_queued(udev_queue_export, dev);
+	info(event->udev, "seq %llu queued, '%s' '%s'\n", udev_device_get_seqnum(dev),
+	     udev_device_get_action(dev), udev_device_get_subsystem(dev));
 
+	event->state = EVENT_QUEUED;
 	udev_list_node_append(&event->node, &event_list);
-	run_exec_q = 1;
 
 	/* run all events with a timeout set immediately */
-	if (udev_device_get_timeout(event->dev) > 0) {
-		event_fork(event);
+	if (udev_device_get_timeout(dev) > 0) {
+		worker_new(event);
 		return;
 	}
 }
 
+static void worker_kill_idle(void)
+{
+	struct udev_list_node *loop;
+
+	udev_list_node_foreach(loop, &worker_list) {
+		struct worker *worker = node_to_worker(loop);
+
+		if (worker->state != WORKER_IDLE)
+			continue;
+
+		worker->state = WORKER_KILLED;
+		kill(worker->pid, SIGTERM);
+	}
+}
+
 static int mem_size_mb(void)
 {
 	FILE *f;
@@ -265,13 +399,13 @@ static int compare_devpath(const char *running, const char *waiting)
 }
 
 /* lookup event for identical, parent, child device */
-static int devpath_busy(struct udev_event *event)
+static int devpath_busy(struct event *event)
 {
 	struct udev_list_node *loop;
 
 	/* check if queue contains events we depend on */
 	udev_list_node_foreach(loop, &event_list) {
-		struct udev_event *loop_event = node_to_event(loop);
+		struct event *loop_event = node_to_event(loop);
 
 		/* we already found a later event, earlier can not block us, no need to check again */
 		if (udev_device_get_seqnum(loop_event->dev) < event->delaying_seqnum)
@@ -312,42 +446,37 @@ static void event_queue_manager(struct udev *udev)
 	struct udev_list_node *tmp;
 
 start_over:
-	if (udev_list_is_empty(&event_list)) {
-		if (childs > 0) {
-			err(udev, "event list empty, but childs count is %i", childs);
-			childs = 0;
-		}
-		return;
-	}
-
 	udev_list_node_foreach_safe(loop, tmp, &event_list) {
-		struct udev_event *loop_event = node_to_event(loop);
+		struct event *event = node_to_event(loop);
 
-		if (childs >= max_childs) {
-			info(udev, "maximum number (%i) of childs reached\n", childs);
-			break;
+		/* cleanup finished events */
+		if (event->state == EVENT_FINISHED) {
+			event_queue_delete(event);
+			continue;
 		}
 
-		if (loop_event->pid != 0)
+		if (stop_exec_q)
+			continue;
+
+		/* skip running events */
+		if (event->state != EVENT_QUEUED)
 			continue;
 
 		/* do not start event if parent or child event is still running */
-		if (devpath_busy(loop_event) != 0) {
+		if (devpath_busy(event) != 0) {
 			dbg(udev, "delay seq %llu (%s)\n",
-			    udev_device_get_seqnum(loop_event->dev),
-			    udev_device_get_devpath(loop_event->dev));
+			    udev_device_get_seqnum(event->dev),
+			    udev_device_get_devpath(event->dev));
 			continue;
 		}
 
-		event_fork(loop_event);
-		dbg(udev, "moved seq %llu to running list\n", udev_device_get_seqnum(loop_event->dev));
+		event_run(event);
+	}
 
-		/* retry if events finished in the meantime */
-		if (sigchilds_waiting) {
-			sigchilds_waiting = 0;
-			reap_sigchilds();
-			goto start_over;
-		}
+	/* keep the incoming queue small, retry if events finished in the meantime */
+	if (event_finished) {
+		event_finished = 0;
+		goto start_over;
 	}
 }
 
@@ -480,69 +609,70 @@ static int handle_inotify(struct udev *udev)
 	return 0;
 }
 
-static void sig_handler(int signum)
+static void sig_handler(int signum, siginfo_t *info, void *ucontext)
 {
 	switch (signum) {
 		case SIGINT:
 		case SIGTERM:
 			udev_exit = 1;
-			break;
+			return;
 		case SIGCHLD:
-			/* set flag, then write to pipe if needed */
-			sigchilds_waiting = 1;
-			break;
-		case SIGHUP:
-			reload_config = 1;
-			break;
-	}
+			while (1) {
+				pid_t pid;
+				struct udev_list_node *loop;
 
-	signal_received = 1;
-}
+				pid = waitpid(-1, NULL, WNOHANG);
+				if (pid <= 0)
+					break;
 
-static void udev_done(int pid, int exitstatus)
-{
-	struct udev_list_node *loop;
+				udev_list_node_foreach(loop, &worker_list) {
+					struct worker *worker = node_to_worker(loop);
 
-	/* find event associated with pid and delete it */
-	udev_list_node_foreach(loop, &event_list) {
-		struct udev_event *loop_event = node_to_event(loop);
-
-		if (loop_event->pid == pid) {
-			info(loop_event->udev, "seq %llu cleanup, pid [%d], status %i, %ld seconds old\n",
-			     udev_device_get_seqnum(loop_event->dev), loop_event->pid,
-			     exitstatus, time(NULL) - loop_event->queue_time);
-			loop_event->exitstatus = exitstatus;
-			if (debug_trace)
-				fprintf(stderr, "exit %s (%llu)\n",
-				       udev_device_get_syspath(loop_event->dev),
-				       udev_device_get_seqnum(loop_event->dev));
-			event_queue_delete(loop_event);
-			childs--;
-
-			/* there may be dependent events waiting */
-			run_exec_q = 1;
-			return;
-		}
-	}
-}
+					if (worker->pid != pid)
+						continue;
 
-static void reap_sigchilds(void)
-{
-	pid_t pid;
-	int status;
+					/* finish event, if worker died unexpectedly */
+					if (worker->event != NULL) {
+						worker->event->exitstatus = 1;
+						worker->event->state = EVENT_FINISHED;
+					}
 
-	while (1) {
-		pid = waitpid(-1, &status, WNOHANG);
-		if (pid <= 0)
-			break;
-		if (WIFEXITED(status))
-			status = WEXITSTATUS(status);
-		else if (WIFSIGNALED(status))
-			status = WTERMSIG(status) + 128;
-		else
-			status = 0;
-		udev_done(pid, status);
+					worker->state = WORKER_DEAD;
+					childs--;
+					break;
+				}
+			}
+			worker_dead = 1;
+			return;
+		case SIGHUP:
+			signal_received = 1;
+			reload_config = 1;
+			return;
+		default:
+			if (signum == SIGRT_WORKER) {
+				struct udev_list_node *loop;
+
+				/* lookup worker who sent the signal */
+				udev_list_node_foreach(loop, &worker_list) {
+					struct worker *worker = node_to_worker(loop);
+
+					if (worker->pid != info->si_pid)
+						continue;
+
+					/* worker returned */
+					worker->event->exitstatus = info->si_value.sival_int;
+					worker->event->state = EVENT_FINISHED;
+					worker->event = NULL;
+					worker->state = WORKER_IDLE;
+					event_finished = 1;
+					break;
+				}
+				return;
+			}
+		break;
 	}
+
+	signal_received = 1;
 }
 
 static void startup_log(struct udev *udev)
@@ -677,21 +807,24 @@ int main(int argc, char *argv[])
 		goto exit;
 	}
 
-	kernel_monitor = udev_monitor_new_from_netlink(udev, "kernel");
-	if (kernel_monitor == NULL || udev_monitor_enable_receiving(kernel_monitor) < 0) {
+	monitor = udev_monitor_new_from_netlink(udev, "kernel");
+	if (monitor == NULL || udev_monitor_enable_receiving(monitor) < 0) {
 		fprintf(stderr, "error initializing netlink socket\n");
 		err(udev, "error initializing netlink socket\n");
 		rc = 3;
 		goto exit;
 	}
-	udev_monitor_set_receive_buffer_size(kernel_monitor, 128*1024*1024);
+	udev_monitor_set_receive_buffer_size(monitor, 128*1024*1024);
 
 	rules = udev_rules_new(udev, resolve_names);
 	if (rules == NULL) {
 		err(udev, "error reading rules\n");
 		goto exit;
 	}
+
 	udev_list_init(&event_list);
+	udev_list_init(&worker_list);
+
 	udev_queue_export = udev_queue_export_new(udev);
 	if (udev_queue_export == NULL) {
 		err(udev, "error creating queue file\n");
@@ -704,19 +837,19 @@ int main(int argc, char *argv[])
 		pid = fork();
 		switch (pid) {
 		case 0:
-			dbg(udev, "daemonized fork running\n");
 			break;
 		case -1:
 			err(udev, "fork of daemon failed: %m\n");
 			rc = 4;
 			goto exit;
 		default:
-			dbg(udev, "child [%u] running, parent exits\n", pid);
 			rc = 0;
 			goto exit;
 		}
 	}
 
+	main_pid = getpid();
+
 	/* redirect std{out,err} */
 	if (!debug && !debug_trace) {
 		dup2(fd, STDIN_FILENO);
@@ -746,13 +879,14 @@ int main(int argc, char *argv[])
 
 	/* set signal handlers */
 	memset(&act, 0x00, sizeof(struct sigaction));
-	act.sa_handler = sig_handler;
+	act.sa_sigaction = sig_handler;
 	sigemptyset(&act.sa_mask);
-	act.sa_flags = SA_RESTART;
+	act.sa_flags = SA_RESTART | SA_SIGINFO;
 	sigaction(SIGINT, &act, NULL);
 	sigaction(SIGTERM, &act, NULL);
 	sigaction(SIGCHLD, &act, NULL);
 	sigaction(SIGHUP, &act, NULL);
+	sigaction(SIGRT_WORKER, &act, NULL);
 
 	/* watch rules directory */
 	udev_watch_init(udev);
@@ -782,10 +916,11 @@ int main(int argc, char *argv[])
 		max_childs = 1;
 	} else {
 		int memsize = mem_size_mb();
+
 		if (memsize > 0)
 			max_childs = 128 + (memsize / 4);
 		else
-			max_childs = UDEVD_MAX_CHILDS;
+			max_childs = 256;
 	}
 	/* possibly overwrite maximum limit of executed events */
 	value = getenv("UDEVD_MAX_CHILDS");
@@ -797,12 +932,15 @@ int main(int argc, char *argv[])
 		sigset_t blocked_mask, orig_mask;
 		struct pollfd pfd[4];
 		struct pollfd *ctrl_poll, *monitor_poll, *inotify_poll = NULL;
+		const struct timespec ts = { 10, 0 };
+		const struct timespec *timeout;
 		int nfds = 0;
 		int fdcount;
 
 		sigfillset(&blocked_mask);
 		sigprocmask(SIG_SETMASK, &blocked_mask, &orig_mask);
 		if (signal_received) {
+			signal_received = 0;
 			sigprocmask(SIG_SETMASK, &orig_mask, NULL);
 			goto handle_signals;
 		}
@@ -812,7 +950,7 @@ int main(int argc, char *argv[])
 		ctrl_poll->events = POLLIN;
 
 		monitor_poll = &pfd[nfds++];
-		monitor_poll->fd = udev_monitor_get_fd(kernel_monitor);
+		monitor_poll->fd = udev_monitor_get_fd(monitor);
 		monitor_poll->events = POLLIN;
 
 		if (inotify_fd >= 0) {
@@ -821,14 +959,20 @@ int main(int argc, char *argv[])
 			inotify_poll->events = POLLIN;
 		}
 
-		fdcount = ppoll(pfd, nfds, NULL, &orig_mask);
+		/* set timeout to kill idle workers */
+		if (udev_list_is_empty(&event_list) && !udev_list_is_empty(&worker_list))
+			timeout = &ts;
+		else
+			timeout = NULL;
+
+		fdcount = ppoll(pfd, nfds, timeout, &orig_mask);
 		sigprocmask(SIG_SETMASK, &orig_mask, NULL);
-		if (fdcount < 0) {
-			if (errno == EINTR)
-				goto handle_signals;
-			err(udev, "error in select: %m\n");
+		if (fdcount < 0 && errno != EINTR)
 			continue;
-		}
+
+		/* timeout or config changed - kill idle workers */
+		if (fdcount == 0)
+			worker_kill_idle();
 
 		/* get control message */
 		if (ctrl_poll->revents & POLLIN)
@@ -838,16 +982,11 @@ int main(int argc, char *argv[])
 		if (monitor_poll->revents & POLLIN) {
 			struct udev_device *dev;
 
-			dev = udev_monitor_receive_device(kernel_monitor);
-			if (dev != NULL) {
-				struct udev_event *event;
-
-				event = udev_event_new(dev);
-				if (event != NULL)
-					event_queue_insert(event);
-				else
-					udev_device_unref(dev);
-			}
+			dev = udev_monitor_receive_device(monitor);
+			if (dev != NULL)
+				event_queue_insert(dev);
+			else
+				udev_device_unref(dev);
 		}
 
 		/* rules directory inotify watch */
@@ -855,13 +994,12 @@ int main(int argc, char *argv[])
 			handle_inotify(udev);
 
 handle_signals:
-		signal_received = 0;
-
 		/* rules changed, set by inotify or a HUP signal */
 		if (reload_config) {
 			struct udev_rules *rules_new;
 
 			reload_config = 0;
+			worker_kill_idle();
 			rules_new = udev_rules_new(udev, resolve_names);
 			if (rules_new != NULL) {
 				udev_rules_unref(rules);
@@ -869,32 +1007,41 @@ handle_signals:
 			}
 		}
 
-		if (sigchilds_waiting) {
-			sigchilds_waiting = 0;
-			reap_sigchilds();
-		}
+		/* cleanup killed worker */
+		if (worker_dead) {
+			struct udev_list_node *loop, *tmp;
+
+			udev_list_node_foreach_safe(loop, tmp, &worker_list) {
+				struct worker *worker = node_to_worker(loop);
+
+				if (worker->state != WORKER_DEAD)
+					continue;
 
-		if (run_exec_q) {
-			run_exec_q = 0;
-			if (!stop_exec_q)
-				event_queue_manager(udev);
+				udev_list_node_remove(&worker->node);
+				free(worker);
+			}
+			worker_dead = 0;
 		}
 
 		if (settle_pid > 0) {
 			kill(settle_pid, SIGUSR1);
 			settle_pid = 0;
 		}
+
+		if (!udev_list_is_empty(&event_list))
+			event_queue_manager(udev);
 	}
+
 	udev_queue_export_cleanup(udev_queue_export);
 	rc = 0;
+	killpg(0, SIGTERM);
 exit:
-
 	udev_queue_export_unref(udev_queue_export);
 	udev_rules_unref(rules);
 	udev_ctrl_unref(udev_ctrl);
 	if (inotify_fd >= 0)
 		close(inotify_fd);
-	udev_monitor_unref(kernel_monitor);
+	udev_monitor_unref(monitor);
 	udev_selinux_exit(udev);
 	udev_unref(udev);
 	logging_close();

[Index of Archives]     [Linux Kernel]     [Linux DVB]     [Asterisk Internet PBX]     [DCCP]     [Netdev]     [X.org]     [Util Linux NG]     [Fedora Women]     [ALSA Devel]     [Linux USB]

  Powered by Linux