]> bbs.cooldavid.org Git - net-next-2.6.git/blame - kernel/workqueue.c
[NET] fib_rules: goto rule action
[net-next-2.6.git] / kernel / workqueue.c
CommitLineData
1da177e4
LT
1/*
2 * linux/kernel/workqueue.c
3 *
4 * Generic mechanism for defining kernel helper threads for running
5 * arbitrary tasks in process context.
6 *
7 * Started by Ingo Molnar, Copyright (C) 2002
8 *
9 * Derived from the taskqueue/keventd code by:
10 *
11 * David Woodhouse <dwmw2@infradead.org>
12 * Andrew Morton <andrewm@uow.edu.au>
13 * Kai Petzke <wpp@marie.physik.tu-berlin.de>
14 * Theodore Ts'o <tytso@mit.edu>
89ada679
CL
15 *
16 * Made to use alloc_percpu by Christoph Lameter <clameter@sgi.com>.
1da177e4
LT
17 */
18
19#include <linux/module.h>
20#include <linux/kernel.h>
21#include <linux/sched.h>
22#include <linux/init.h>
23#include <linux/signal.h>
24#include <linux/completion.h>
25#include <linux/workqueue.h>
26#include <linux/slab.h>
27#include <linux/cpu.h>
28#include <linux/notifier.h>
29#include <linux/kthread.h>
1fa44eca 30#include <linux/hardirq.h>
46934023 31#include <linux/mempolicy.h>
341a5958 32#include <linux/freezer.h>
d5abe669
PZ
33#include <linux/kallsyms.h>
34#include <linux/debug_locks.h>
1da177e4
LT
35
36/*
f756d5e2
NL
37 * The per-CPU workqueue (if single thread, we always use the first
38 * possible cpu).
1da177e4
LT
39 *
40 * The sequence counters are for flush_scheduled_work(). It wants to wait
9f5d785e 41 * until all currently-scheduled works are completed, but it doesn't
1da177e4
LT
42 * want to be livelocked by new, incoming ones. So it waits until
43 * remove_sequence is >= the insert_sequence which pertained when
44 * flush_scheduled_work() was called.
45 */
46struct cpu_workqueue_struct {
47
48 spinlock_t lock;
49
50 long remove_sequence; /* Least-recently added (next to run) */
51 long insert_sequence; /* Next to add */
52
53 struct list_head worklist;
54 wait_queue_head_t more_work;
55 wait_queue_head_t work_done;
56
57 struct workqueue_struct *wq;
36c8b586 58 struct task_struct *thread;
1da177e4
LT
59
60 int run_depth; /* Detect run_workqueue() recursion depth */
341a5958
RW
61
62 int freezeable; /* Freeze the thread during suspend */
1da177e4
LT
63} ____cacheline_aligned;
64
65/*
66 * The externally visible workqueue abstraction is an array of
67 * per-CPU workqueues:
68 */
69struct workqueue_struct {
89ada679 70 struct cpu_workqueue_struct *cpu_wq;
1da177e4
LT
71 const char *name;
72 struct list_head list; /* Empty if single thread */
73};
74
75/* All the per-cpu workqueues on the system, for hotplug cpu to add/remove
76 threads to each one as cpus come/go. */
9b41ea72 77static DEFINE_MUTEX(workqueue_mutex);
1da177e4
LT
78static LIST_HEAD(workqueues);
79
f756d5e2
NL
80static int singlethread_cpu;
81
1da177e4
LT
82/* If it's single threaded, it isn't in the list of workqueues. */
83static inline int is_single_threaded(struct workqueue_struct *wq)
84{
85 return list_empty(&wq->list);
86}
87
4594bf15
DH
88/*
89 * Set the workqueue on which a work item is to be run
90 * - Must *only* be called if the pending flag is set
91 */
365970a1
DH
92static inline void set_wq_data(struct work_struct *work, void *wq)
93{
4594bf15
DH
94 unsigned long new;
95
96 BUG_ON(!work_pending(work));
365970a1 97
365970a1 98 new = (unsigned long) wq | (1UL << WORK_STRUCT_PENDING);
a08727ba
LT
99 new |= WORK_STRUCT_FLAG_MASK & *work_data_bits(work);
100 atomic_long_set(&work->data, new);
365970a1
DH
101}
102
103static inline void *get_wq_data(struct work_struct *work)
104{
a08727ba 105 return (void *) (atomic_long_read(&work->data) & WORK_STRUCT_WQ_DATA_MASK);
365970a1
DH
106}
107
68380b58
LT
108static int __run_work(struct cpu_workqueue_struct *cwq, struct work_struct *work)
109{
110 int ret = 0;
111 unsigned long flags;
112
113 spin_lock_irqsave(&cwq->lock, flags);
114 /*
115 * We need to re-validate the work info after we've gotten
116 * the cpu_workqueue lock. We can run the work now iff:
117 *
118 * - the wq_data still matches the cpu_workqueue_struct
119 * - AND the work is still marked pending
120 * - AND the work is still on a list (which will be this
121 * workqueue_struct list)
122 *
123 * All these conditions are important, because we
124 * need to protect against the work being run right
125 * now on another CPU (all but the last one might be
126 * true if it's currently running and has not been
127 * released yet, for example).
128 */
129 if (get_wq_data(work) == cwq
130 && work_pending(work)
131 && !list_empty(&work->entry)) {
132 work_func_t f = work->func;
133 list_del_init(&work->entry);
134 spin_unlock_irqrestore(&cwq->lock, flags);
135
a08727ba 136 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
68380b58
LT
137 work_release(work);
138 f(work);
139
140 spin_lock_irqsave(&cwq->lock, flags);
141 cwq->remove_sequence++;
142 wake_up(&cwq->work_done);
143 ret = 1;
144 }
145 spin_unlock_irqrestore(&cwq->lock, flags);
146 return ret;
147}
148
149/**
150 * run_scheduled_work - run scheduled work synchronously
151 * @work: work to run
152 *
153 * This checks if the work was pending, and runs it
154 * synchronously if so. It returns a boolean to indicate
155 * whether it had any scheduled work to run or not.
156 *
157 * NOTE! This _only_ works for normal work_structs. You
158 * CANNOT use this for delayed work, because the wq data
159 * for delayed work will not point properly to the per-
160 * CPU workqueue struct, but will change!
161 */
162int fastcall run_scheduled_work(struct work_struct *work)
163{
164 for (;;) {
165 struct cpu_workqueue_struct *cwq;
166
167 if (!work_pending(work))
168 return 0;
169 if (list_empty(&work->entry))
170 return 0;
171 /* NOTE! This depends intimately on __queue_work! */
172 cwq = get_wq_data(work);
173 if (!cwq)
174 return 0;
175 if (__run_work(cwq, work))
176 return 1;
177 }
178}
179EXPORT_SYMBOL(run_scheduled_work);
180
1da177e4
LT
181/* Preempt must be disabled. */
182static void __queue_work(struct cpu_workqueue_struct *cwq,
183 struct work_struct *work)
184{
185 unsigned long flags;
186
187 spin_lock_irqsave(&cwq->lock, flags);
365970a1 188 set_wq_data(work, cwq);
1da177e4
LT
189 list_add_tail(&work->entry, &cwq->worklist);
190 cwq->insert_sequence++;
191 wake_up(&cwq->more_work);
192 spin_unlock_irqrestore(&cwq->lock, flags);
193}
194
0fcb78c2
REB
195/**
196 * queue_work - queue work on a workqueue
197 * @wq: workqueue to use
198 * @work: work to queue
199 *
057647fc 200 * Returns 0 if @work was already on a queue, non-zero otherwise.
1da177e4
LT
201 *
202 * We queue the work to the CPU it was submitted, but there is no
203 * guarantee that it will be processed by that CPU.
204 */
205int fastcall queue_work(struct workqueue_struct *wq, struct work_struct *work)
206{
207 int ret = 0, cpu = get_cpu();
208
a08727ba 209 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
1da177e4 210 if (unlikely(is_single_threaded(wq)))
f756d5e2 211 cpu = singlethread_cpu;
1da177e4 212 BUG_ON(!list_empty(&work->entry));
89ada679 213 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
1da177e4
LT
214 ret = 1;
215 }
216 put_cpu();
217 return ret;
218}
ae90dd5d 219EXPORT_SYMBOL_GPL(queue_work);
1da177e4 220
82f67cd9 221void delayed_work_timer_fn(unsigned long __data)
1da177e4 222{
52bad64d 223 struct delayed_work *dwork = (struct delayed_work *)__data;
365970a1 224 struct workqueue_struct *wq = get_wq_data(&dwork->work);
1da177e4
LT
225 int cpu = smp_processor_id();
226
227 if (unlikely(is_single_threaded(wq)))
f756d5e2 228 cpu = singlethread_cpu;
1da177e4 229
52bad64d 230 __queue_work(per_cpu_ptr(wq->cpu_wq, cpu), &dwork->work);
1da177e4
LT
231}
232
0fcb78c2
REB
233/**
234 * queue_delayed_work - queue work on a workqueue after delay
235 * @wq: workqueue to use
af9997e4 236 * @dwork: delayable work to queue
0fcb78c2
REB
237 * @delay: number of jiffies to wait before queueing
238 *
057647fc 239 * Returns 0 if @work was already on a queue, non-zero otherwise.
0fcb78c2 240 */
1da177e4 241int fastcall queue_delayed_work(struct workqueue_struct *wq,
52bad64d 242 struct delayed_work *dwork, unsigned long delay)
1da177e4
LT
243{
244 int ret = 0;
52bad64d
DH
245 struct timer_list *timer = &dwork->timer;
246 struct work_struct *work = &dwork->work;
247
82f67cd9 248 timer_stats_timer_set_start_info(timer);
52bad64d
DH
249 if (delay == 0)
250 return queue_work(wq, work);
1da177e4 251
a08727ba 252 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
1da177e4
LT
253 BUG_ON(timer_pending(timer));
254 BUG_ON(!list_empty(&work->entry));
255
256 /* This stores wq for the moment, for the timer_fn */
365970a1 257 set_wq_data(work, wq);
1da177e4 258 timer->expires = jiffies + delay;
52bad64d 259 timer->data = (unsigned long)dwork;
1da177e4
LT
260 timer->function = delayed_work_timer_fn;
261 add_timer(timer);
262 ret = 1;
263 }
264 return ret;
265}
ae90dd5d 266EXPORT_SYMBOL_GPL(queue_delayed_work);
1da177e4 267
0fcb78c2
REB
268/**
269 * queue_delayed_work_on - queue work on specific CPU after delay
270 * @cpu: CPU number to execute work on
271 * @wq: workqueue to use
af9997e4 272 * @dwork: work to queue
0fcb78c2
REB
273 * @delay: number of jiffies to wait before queueing
274 *
057647fc 275 * Returns 0 if @work was already on a queue, non-zero otherwise.
0fcb78c2 276 */
7a6bc1cd 277int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
52bad64d 278 struct delayed_work *dwork, unsigned long delay)
7a6bc1cd
VP
279{
280 int ret = 0;
52bad64d
DH
281 struct timer_list *timer = &dwork->timer;
282 struct work_struct *work = &dwork->work;
7a6bc1cd 283
a08727ba 284 if (!test_and_set_bit(WORK_STRUCT_PENDING, work_data_bits(work))) {
7a6bc1cd
VP
285 BUG_ON(timer_pending(timer));
286 BUG_ON(!list_empty(&work->entry));
287
288 /* This stores wq for the moment, for the timer_fn */
365970a1 289 set_wq_data(work, wq);
7a6bc1cd 290 timer->expires = jiffies + delay;
52bad64d 291 timer->data = (unsigned long)dwork;
7a6bc1cd
VP
292 timer->function = delayed_work_timer_fn;
293 add_timer_on(timer, cpu);
294 ret = 1;
295 }
296 return ret;
297}
ae90dd5d 298EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1da177e4 299
858119e1 300static void run_workqueue(struct cpu_workqueue_struct *cwq)
1da177e4
LT
301{
302 unsigned long flags;
303
304 /*
305 * Keep taking off work from the queue until
306 * done.
307 */
308 spin_lock_irqsave(&cwq->lock, flags);
309 cwq->run_depth++;
310 if (cwq->run_depth > 3) {
311 /* morton gets to eat his hat */
312 printk("%s: recursion depth exceeded: %d\n",
313 __FUNCTION__, cwq->run_depth);
314 dump_stack();
315 }
316 while (!list_empty(&cwq->worklist)) {
317 struct work_struct *work = list_entry(cwq->worklist.next,
318 struct work_struct, entry);
6bb49e59 319 work_func_t f = work->func;
1da177e4
LT
320
321 list_del_init(cwq->worklist.next);
322 spin_unlock_irqrestore(&cwq->lock, flags);
323
365970a1 324 BUG_ON(get_wq_data(work) != cwq);
a08727ba 325 if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
65f27f38
DH
326 work_release(work);
327 f(work);
1da177e4 328
d5abe669
PZ
329 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
330 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
331 "%s/0x%08x/%d\n",
332 current->comm, preempt_count(),
333 current->pid);
334 printk(KERN_ERR " last function: ");
335 print_symbol("%s\n", (unsigned long)f);
336 debug_show_held_locks(current);
337 dump_stack();
338 }
339
1da177e4
LT
340 spin_lock_irqsave(&cwq->lock, flags);
341 cwq->remove_sequence++;
342 wake_up(&cwq->work_done);
343 }
344 cwq->run_depth--;
345 spin_unlock_irqrestore(&cwq->lock, flags);
346}
347
348static int worker_thread(void *__cwq)
349{
350 struct cpu_workqueue_struct *cwq = __cwq;
351 DECLARE_WAITQUEUE(wait, current);
352 struct k_sigaction sa;
353 sigset_t blocked;
354
341a5958
RW
355 if (!cwq->freezeable)
356 current->flags |= PF_NOFREEZE;
1da177e4
LT
357
358 set_user_nice(current, -5);
359
360 /* Block and flush all signals */
361 sigfillset(&blocked);
362 sigprocmask(SIG_BLOCK, &blocked, NULL);
363 flush_signals(current);
364
46934023
CL
365 /*
366 * We inherited MPOL_INTERLEAVE from the booting kernel.
367 * Set MPOL_DEFAULT to insure node local allocations.
368 */
369 numa_default_policy();
370
1da177e4
LT
371 /* SIG_IGN makes children autoreap: see do_notify_parent(). */
372 sa.sa.sa_handler = SIG_IGN;
373 sa.sa.sa_flags = 0;
374 siginitset(&sa.sa.sa_mask, sigmask(SIGCHLD));
375 do_sigaction(SIGCHLD, &sa, (struct k_sigaction *)0);
376
377 set_current_state(TASK_INTERRUPTIBLE);
378 while (!kthread_should_stop()) {
341a5958
RW
379 if (cwq->freezeable)
380 try_to_freeze();
381
1da177e4
LT
382 add_wait_queue(&cwq->more_work, &wait);
383 if (list_empty(&cwq->worklist))
384 schedule();
385 else
386 __set_current_state(TASK_RUNNING);
387 remove_wait_queue(&cwq->more_work, &wait);
388
389 if (!list_empty(&cwq->worklist))
390 run_workqueue(cwq);
391 set_current_state(TASK_INTERRUPTIBLE);
392 }
393 __set_current_state(TASK_RUNNING);
394 return 0;
395}
396
397static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
398{
399 if (cwq->thread == current) {
400 /*
401 * Probably keventd trying to flush its own queue. So simply run
402 * it by hand rather than deadlocking.
403 */
404 run_workqueue(cwq);
405 } else {
406 DEFINE_WAIT(wait);
407 long sequence_needed;
408
409 spin_lock_irq(&cwq->lock);
410 sequence_needed = cwq->insert_sequence;
411
412 while (sequence_needed - cwq->remove_sequence > 0) {
413 prepare_to_wait(&cwq->work_done, &wait,
414 TASK_UNINTERRUPTIBLE);
415 spin_unlock_irq(&cwq->lock);
416 schedule();
417 spin_lock_irq(&cwq->lock);
418 }
419 finish_wait(&cwq->work_done, &wait);
420 spin_unlock_irq(&cwq->lock);
421 }
422}
423
0fcb78c2 424/**
1da177e4 425 * flush_workqueue - ensure that any scheduled work has run to completion.
0fcb78c2 426 * @wq: workqueue to flush
1da177e4
LT
427 *
428 * Forces execution of the workqueue and blocks until its completion.
429 * This is typically used in driver shutdown handlers.
430 *
431 * This function will sample each workqueue's current insert_sequence number and
432 * will sleep until the head sequence is greater than or equal to that. This
433 * means that we sleep until all works which were queued on entry have been
434 * handled, but we are not livelocked by new incoming ones.
435 *
436 * This function used to run the workqueues itself. Now we just wait for the
437 * helper threads to do it.
438 */
439void fastcall flush_workqueue(struct workqueue_struct *wq)
440{
441 might_sleep();
442
443 if (is_single_threaded(wq)) {
bce61dd4 444 /* Always use first cpu's area. */
f756d5e2 445 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
1da177e4
LT
446 } else {
447 int cpu;
448
9b41ea72 449 mutex_lock(&workqueue_mutex);
1da177e4 450 for_each_online_cpu(cpu)
89ada679 451 flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
9b41ea72 452 mutex_unlock(&workqueue_mutex);
1da177e4
LT
453 }
454}
ae90dd5d 455EXPORT_SYMBOL_GPL(flush_workqueue);
1da177e4
LT
456
457static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
341a5958 458 int cpu, int freezeable)
1da177e4 459{
89ada679 460 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
1da177e4
LT
461 struct task_struct *p;
462
463 spin_lock_init(&cwq->lock);
464 cwq->wq = wq;
465 cwq->thread = NULL;
466 cwq->insert_sequence = 0;
467 cwq->remove_sequence = 0;
341a5958 468 cwq->freezeable = freezeable;
1da177e4
LT
469 INIT_LIST_HEAD(&cwq->worklist);
470 init_waitqueue_head(&cwq->more_work);
471 init_waitqueue_head(&cwq->work_done);
472
473 if (is_single_threaded(wq))
474 p = kthread_create(worker_thread, cwq, "%s", wq->name);
475 else
476 p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
477 if (IS_ERR(p))
478 return NULL;
479 cwq->thread = p;
480 return p;
481}
482
483struct workqueue_struct *__create_workqueue(const char *name,
341a5958 484 int singlethread, int freezeable)
1da177e4
LT
485{
486 int cpu, destroy = 0;
487 struct workqueue_struct *wq;
488 struct task_struct *p;
489
dd392710 490 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
1da177e4
LT
491 if (!wq)
492 return NULL;
1da177e4 493
89ada679 494 wq->cpu_wq = alloc_percpu(struct cpu_workqueue_struct);
676121fc
BC
495 if (!wq->cpu_wq) {
496 kfree(wq);
497 return NULL;
498 }
499
1da177e4 500 wq->name = name;
9b41ea72 501 mutex_lock(&workqueue_mutex);
1da177e4
LT
502 if (singlethread) {
503 INIT_LIST_HEAD(&wq->list);
341a5958 504 p = create_workqueue_thread(wq, singlethread_cpu, freezeable);
1da177e4
LT
505 if (!p)
506 destroy = 1;
507 else
508 wake_up_process(p);
509 } else {
1da177e4 510 list_add(&wq->list, &workqueues);
1da177e4 511 for_each_online_cpu(cpu) {
341a5958 512 p = create_workqueue_thread(wq, cpu, freezeable);
1da177e4
LT
513 if (p) {
514 kthread_bind(p, cpu);
515 wake_up_process(p);
516 } else
517 destroy = 1;
518 }
519 }
9b41ea72 520 mutex_unlock(&workqueue_mutex);
1da177e4
LT
521
522 /*
523 * Was there any error during startup? If yes then clean up:
524 */
525 if (destroy) {
526 destroy_workqueue(wq);
527 wq = NULL;
528 }
529 return wq;
530}
ae90dd5d 531EXPORT_SYMBOL_GPL(__create_workqueue);
1da177e4
LT
532
533static void cleanup_workqueue_thread(struct workqueue_struct *wq, int cpu)
534{
535 struct cpu_workqueue_struct *cwq;
536 unsigned long flags;
537 struct task_struct *p;
538
89ada679 539 cwq = per_cpu_ptr(wq->cpu_wq, cpu);
1da177e4
LT
540 spin_lock_irqsave(&cwq->lock, flags);
541 p = cwq->thread;
542 cwq->thread = NULL;
543 spin_unlock_irqrestore(&cwq->lock, flags);
544 if (p)
545 kthread_stop(p);
546}
547
0fcb78c2
REB
548/**
549 * destroy_workqueue - safely terminate a workqueue
550 * @wq: target workqueue
551 *
552 * Safely destroy a workqueue. All work currently pending will be done first.
553 */
1da177e4
LT
554void destroy_workqueue(struct workqueue_struct *wq)
555{
556 int cpu;
557
558 flush_workqueue(wq);
559
560 /* We don't need the distraction of CPUs appearing and vanishing. */
9b41ea72 561 mutex_lock(&workqueue_mutex);
1da177e4 562 if (is_single_threaded(wq))
f756d5e2 563 cleanup_workqueue_thread(wq, singlethread_cpu);
1da177e4
LT
564 else {
565 for_each_online_cpu(cpu)
566 cleanup_workqueue_thread(wq, cpu);
1da177e4 567 list_del(&wq->list);
1da177e4 568 }
9b41ea72 569 mutex_unlock(&workqueue_mutex);
89ada679 570 free_percpu(wq->cpu_wq);
1da177e4
LT
571 kfree(wq);
572}
ae90dd5d 573EXPORT_SYMBOL_GPL(destroy_workqueue);
1da177e4
LT
574
575static struct workqueue_struct *keventd_wq;
576
0fcb78c2
REB
577/**
578 * schedule_work - put work task in global workqueue
579 * @work: job to be done
580 *
581 * This puts a job in the kernel-global workqueue.
582 */
1da177e4
LT
583int fastcall schedule_work(struct work_struct *work)
584{
585 return queue_work(keventd_wq, work);
586}
ae90dd5d 587EXPORT_SYMBOL(schedule_work);
1da177e4 588
0fcb78c2
REB
589/**
590 * schedule_delayed_work - put work task in global workqueue after delay
52bad64d
DH
591 * @dwork: job to be done
592 * @delay: number of jiffies to wait or 0 for immediate execution
0fcb78c2
REB
593 *
594 * After waiting for a given time this puts a job in the kernel-global
595 * workqueue.
596 */
82f67cd9
IM
597int fastcall schedule_delayed_work(struct delayed_work *dwork,
598 unsigned long delay)
1da177e4 599{
82f67cd9 600 timer_stats_timer_set_start_info(&dwork->timer);
52bad64d 601 return queue_delayed_work(keventd_wq, dwork, delay);
1da177e4 602}
ae90dd5d 603EXPORT_SYMBOL(schedule_delayed_work);
1da177e4 604
0fcb78c2
REB
605/**
606 * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
607 * @cpu: cpu to use
52bad64d 608 * @dwork: job to be done
0fcb78c2
REB
609 * @delay: number of jiffies to wait
610 *
611 * After waiting for a given time this puts a job in the kernel-global
612 * workqueue on the specified CPU.
613 */
1da177e4 614int schedule_delayed_work_on(int cpu,
52bad64d 615 struct delayed_work *dwork, unsigned long delay)
1da177e4 616{
52bad64d 617 return queue_delayed_work_on(cpu, keventd_wq, dwork, delay);
1da177e4 618}
ae90dd5d 619EXPORT_SYMBOL(schedule_delayed_work_on);
1da177e4 620
b6136773
AM
621/**
622 * schedule_on_each_cpu - call a function on each online CPU from keventd
623 * @func: the function to call
b6136773
AM
624 *
625 * Returns zero on success.
626 * Returns -ve errno on failure.
627 *
628 * Appears to be racy against CPU hotplug.
629 *
630 * schedule_on_each_cpu() is very slow.
631 */
65f27f38 632int schedule_on_each_cpu(work_func_t func)
15316ba8
CL
633{
634 int cpu;
b6136773 635 struct work_struct *works;
15316ba8 636
b6136773
AM
637 works = alloc_percpu(struct work_struct);
638 if (!works)
15316ba8 639 return -ENOMEM;
b6136773 640
9b41ea72 641 mutex_lock(&workqueue_mutex);
15316ba8 642 for_each_online_cpu(cpu) {
9bfb1839
IM
643 struct work_struct *work = per_cpu_ptr(works, cpu);
644
645 INIT_WORK(work, func);
646 set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
647 __queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
15316ba8 648 }
9b41ea72 649 mutex_unlock(&workqueue_mutex);
15316ba8 650 flush_workqueue(keventd_wq);
b6136773 651 free_percpu(works);
15316ba8
CL
652 return 0;
653}
654
1da177e4
LT
655void flush_scheduled_work(void)
656{
657 flush_workqueue(keventd_wq);
658}
ae90dd5d 659EXPORT_SYMBOL(flush_scheduled_work);
1da177e4
LT
660
661/**
72fd4a35 662 * cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
1da177e4 663 * @wq: the controlling workqueue structure
52bad64d 664 * @dwork: the delayed work struct
1da177e4 665 */
81ddef77 666void cancel_rearming_delayed_workqueue(struct workqueue_struct *wq,
52bad64d 667 struct delayed_work *dwork)
1da177e4 668{
52bad64d 669 while (!cancel_delayed_work(dwork))
1da177e4
LT
670 flush_workqueue(wq);
671}
81ddef77 672EXPORT_SYMBOL(cancel_rearming_delayed_workqueue);
1da177e4
LT
673
674/**
72fd4a35 675 * cancel_rearming_delayed_work - reliably kill off a delayed keventd work whose handler rearms the delayed work.
52bad64d 676 * @dwork: the delayed work struct
1da177e4 677 */
52bad64d 678void cancel_rearming_delayed_work(struct delayed_work *dwork)
1da177e4 679{
52bad64d 680 cancel_rearming_delayed_workqueue(keventd_wq, dwork);
1da177e4
LT
681}
682EXPORT_SYMBOL(cancel_rearming_delayed_work);
683
1fa44eca
JB
684/**
685 * execute_in_process_context - reliably execute the routine with user context
686 * @fn: the function to execute
1fa44eca
JB
687 * @ew: guaranteed storage for the execute work structure (must
688 * be available when the work executes)
689 *
690 * Executes the function immediately if process context is available,
691 * otherwise schedules the function for delayed execution.
692 *
693 * Returns: 0 - function was executed
694 * 1 - function was scheduled for execution
695 */
65f27f38 696int execute_in_process_context(work_func_t fn, struct execute_work *ew)
1fa44eca
JB
697{
698 if (!in_interrupt()) {
65f27f38 699 fn(&ew->work);
1fa44eca
JB
700 return 0;
701 }
702
65f27f38 703 INIT_WORK(&ew->work, fn);
1fa44eca
JB
704 schedule_work(&ew->work);
705
706 return 1;
707}
708EXPORT_SYMBOL_GPL(execute_in_process_context);
709
1da177e4
LT
710int keventd_up(void)
711{
712 return keventd_wq != NULL;
713}
714
715int current_is_keventd(void)
716{
717 struct cpu_workqueue_struct *cwq;
718 int cpu = smp_processor_id(); /* preempt-safe: keventd is per-cpu */
719 int ret = 0;
720
721 BUG_ON(!keventd_wq);
722
89ada679 723 cwq = per_cpu_ptr(keventd_wq->cpu_wq, cpu);
1da177e4
LT
724 if (current == cwq->thread)
725 ret = 1;
726
727 return ret;
728
729}
730
1da177e4
LT
731/* Take the work from this (downed) CPU. */
732static void take_over_work(struct workqueue_struct *wq, unsigned int cpu)
733{
89ada679 734 struct cpu_workqueue_struct *cwq = per_cpu_ptr(wq->cpu_wq, cpu);
626ab0e6 735 struct list_head list;
1da177e4
LT
736 struct work_struct *work;
737
738 spin_lock_irq(&cwq->lock);
626ab0e6 739 list_replace_init(&cwq->worklist, &list);
1da177e4
LT
740
741 while (!list_empty(&list)) {
742 printk("Taking work for %s\n", wq->name);
743 work = list_entry(list.next,struct work_struct,entry);
744 list_del(&work->entry);
89ada679 745 __queue_work(per_cpu_ptr(wq->cpu_wq, smp_processor_id()), work);
1da177e4
LT
746 }
747 spin_unlock_irq(&cwq->lock);
748}
749
750/* We're holding the cpucontrol mutex here */
9c7b216d 751static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
1da177e4
LT
752 unsigned long action,
753 void *hcpu)
754{
755 unsigned int hotcpu = (unsigned long)hcpu;
756 struct workqueue_struct *wq;
757
758 switch (action) {
759 case CPU_UP_PREPARE:
9b41ea72 760 mutex_lock(&workqueue_mutex);
1da177e4
LT
761 /* Create a new workqueue thread for it. */
762 list_for_each_entry(wq, &workqueues, list) {
341a5958 763 if (!create_workqueue_thread(wq, hotcpu, 0)) {
1da177e4
LT
764 printk("workqueue for %i failed\n", hotcpu);
765 return NOTIFY_BAD;
766 }
767 }
768 break;
769
770 case CPU_ONLINE:
771 /* Kick off worker threads. */
772 list_for_each_entry(wq, &workqueues, list) {
89ada679
CL
773 struct cpu_workqueue_struct *cwq;
774
775 cwq = per_cpu_ptr(wq->cpu_wq, hotcpu);
776 kthread_bind(cwq->thread, hotcpu);
777 wake_up_process(cwq->thread);
1da177e4 778 }
9b41ea72 779 mutex_unlock(&workqueue_mutex);
1da177e4
LT
780 break;
781
782 case CPU_UP_CANCELED:
783 list_for_each_entry(wq, &workqueues, list) {
fc75cdfa
HC
784 if (!per_cpu_ptr(wq->cpu_wq, hotcpu)->thread)
785 continue;
1da177e4 786 /* Unbind so it can run. */
89ada679 787 kthread_bind(per_cpu_ptr(wq->cpu_wq, hotcpu)->thread,
a4c4af7c 788 any_online_cpu(cpu_online_map));
1da177e4
LT
789 cleanup_workqueue_thread(wq, hotcpu);
790 }
9b41ea72
AM
791 mutex_unlock(&workqueue_mutex);
792 break;
793
794 case CPU_DOWN_PREPARE:
795 mutex_lock(&workqueue_mutex);
796 break;
797
798 case CPU_DOWN_FAILED:
799 mutex_unlock(&workqueue_mutex);
1da177e4
LT
800 break;
801
802 case CPU_DEAD:
803 list_for_each_entry(wq, &workqueues, list)
804 cleanup_workqueue_thread(wq, hotcpu);
805 list_for_each_entry(wq, &workqueues, list)
806 take_over_work(wq, hotcpu);
9b41ea72 807 mutex_unlock(&workqueue_mutex);
1da177e4
LT
808 break;
809 }
810
811 return NOTIFY_OK;
812}
1da177e4
LT
813
814void init_workqueues(void)
815{
f756d5e2 816 singlethread_cpu = first_cpu(cpu_possible_map);
1da177e4
LT
817 hotcpu_notifier(workqueue_cpu_callback, 0);
818 keventd_wq = create_workqueue("events");
819 BUG_ON(!keventd_wq);
820}
821