]> bbs.cooldavid.org Git - net-next-2.6.git/blob - fs/btrfs/async-thread.c
Btrfs: reduce worker thread spin_lock_irq hold times
[net-next-2.6.git] / fs / btrfs / async-thread.c
1 /*
2  * Copyright (C) 2007 Oracle.  All rights reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public
6  * License v2 as published by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11  * General Public License for more details.
12  *
13  * You should have received a copy of the GNU General Public
14  * License along with this program; if not, write to the
15  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
16  * Boston, MA 021110-1307, USA.
17  */
18
19 #include <linux/kthread.h>
20 #include <linux/list.h>
21 #include <linux/spinlock.h>
22 #include <linux/freezer.h>
23 #include "async-thread.h"
24
25 #define WORK_QUEUED_BIT 0
26 #define WORK_DONE_BIT 1
27 #define WORK_ORDER_DONE_BIT 2
28 #define WORK_HIGH_PRIO_BIT 3
29
30 /*
31  * container for the kthread task pointer and the list of pending work
32  * One of these is allocated per thread.
33  */
34 struct btrfs_worker_thread {
35         /* pool we belong to */
36         struct btrfs_workers *workers;
37
38         /* list of struct btrfs_work that are waiting for service */
39         struct list_head pending;
40         struct list_head prio_pending;
41
42         /* list of worker threads from struct btrfs_workers */
43         struct list_head worker_list;
44
45         /* kthread */
46         struct task_struct *task;
47
48         /* number of things on the pending list */
49         atomic_t num_pending;
50
51         /* reference counter for this struct */
52         atomic_t refs;
53
54         unsigned long sequence;
55
56         /* protects the pending list. */
57         spinlock_t lock;
58
59         /* set to non-zero when this thread is already awake and kicking */
60         int working;
61
62         /* are we currently idle */
63         int idle;
64 };
65
66 /*
67  * helper function to move a thread onto the idle list after it
68  * has finished some requests.
69  */
70 static void check_idle_worker(struct btrfs_worker_thread *worker)
71 {
72         if (!worker->idle && atomic_read(&worker->num_pending) <
73             worker->workers->idle_thresh / 2) {
74                 unsigned long flags;
75                 spin_lock_irqsave(&worker->workers->lock, flags);
76                 worker->idle = 1;
77                 list_move(&worker->worker_list, &worker->workers->idle_list);
78                 spin_unlock_irqrestore(&worker->workers->lock, flags);
79         }
80 }
81
82 /*
83  * helper function to move a thread off the idle list after new
84  * pending work is added.
85  */
86 static void check_busy_worker(struct btrfs_worker_thread *worker)
87 {
88         if (worker->idle && atomic_read(&worker->num_pending) >=
89             worker->workers->idle_thresh) {
90                 unsigned long flags;
91                 spin_lock_irqsave(&worker->workers->lock, flags);
92                 worker->idle = 0;
93                 list_move_tail(&worker->worker_list,
94                                &worker->workers->worker_list);
95                 spin_unlock_irqrestore(&worker->workers->lock, flags);
96         }
97 }
98
99 static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
100 {
101         struct btrfs_workers *workers = worker->workers;
102         unsigned long flags;
103
104         rmb();
105         if (!workers->atomic_start_pending)
106                 return;
107
108         spin_lock_irqsave(&workers->lock, flags);
109         if (!workers->atomic_start_pending)
110                 goto out;
111
112         workers->atomic_start_pending = 0;
113         if (workers->num_workers >= workers->max_workers)
114                 goto out;
115
116         spin_unlock_irqrestore(&workers->lock, flags);
117         btrfs_start_workers(workers, 1);
118         return;
119
120 out:
121         spin_unlock_irqrestore(&workers->lock, flags);
122 }
123
124 static noinline int run_ordered_completions(struct btrfs_workers *workers,
125                                             struct btrfs_work *work)
126 {
127         if (!workers->ordered)
128                 return 0;
129
130         set_bit(WORK_DONE_BIT, &work->flags);
131
132         spin_lock(&workers->order_lock);
133
134         while (1) {
135                 if (!list_empty(&workers->prio_order_list)) {
136                         work = list_entry(workers->prio_order_list.next,
137                                           struct btrfs_work, order_list);
138                 } else if (!list_empty(&workers->order_list)) {
139                         work = list_entry(workers->order_list.next,
140                                           struct btrfs_work, order_list);
141                 } else {
142                         break;
143                 }
144                 if (!test_bit(WORK_DONE_BIT, &work->flags))
145                         break;
146
147                 /* we are going to call the ordered done function, but
148                  * we leave the work item on the list as a barrier so
149                  * that later work items that are done don't have their
150                  * functions called before this one returns
151                  */
152                 if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
153                         break;
154
155                 spin_unlock(&workers->order_lock);
156
157                 work->ordered_func(work);
158
159                 /* now take the lock again and call the freeing code */
160                 spin_lock(&workers->order_lock);
161                 list_del(&work->order_list);
162                 work->ordered_free(work);
163         }
164
165         spin_unlock(&workers->order_lock);
166         return 0;
167 }
168
169 static void put_worker(struct btrfs_worker_thread *worker)
170 {
171         if (atomic_dec_and_test(&worker->refs))
172                 kfree(worker);
173 }
174
175 static int try_worker_shutdown(struct btrfs_worker_thread *worker)
176 {
177         int freeit = 0;
178
179         spin_lock_irq(&worker->lock);
180         spin_lock_irq(&worker->workers->lock);
181         if (worker->workers->num_workers > 1 &&
182             worker->idle &&
183             !worker->working &&
184             !list_empty(&worker->worker_list) &&
185             list_empty(&worker->prio_pending) &&
186             list_empty(&worker->pending)) {
187                 freeit = 1;
188                 list_del_init(&worker->worker_list);
189                 worker->workers->num_workers--;
190         }
191         spin_unlock_irq(&worker->workers->lock);
192         spin_unlock_irq(&worker->lock);
193
194         if (freeit)
195                 put_worker(worker);
196         return freeit;
197 }
198
199 static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
200                                         struct list_head *prio_head,
201                                         struct list_head *head)
202 {
203         struct btrfs_work *work = NULL;
204         struct list_head *cur = NULL;
205
206         if(!list_empty(prio_head))
207                 cur = prio_head->next;
208
209         smp_mb();
210         if (!list_empty(&worker->prio_pending))
211                 goto refill;
212
213         if (!list_empty(head))
214                 cur = head->next;
215
216         if (cur)
217                 goto out;
218
219 refill:
220         spin_lock_irq(&worker->lock);
221         list_splice_tail_init(&worker->prio_pending, prio_head);
222         list_splice_tail_init(&worker->pending, head);
223
224         if (!list_empty(prio_head))
225                 cur = prio_head->next;
226         else if (!list_empty(head))
227                 cur = head->next;
228         spin_unlock_irq(&worker->lock);
229
230         if (!cur)
231                 goto out_fail;
232
233 out:
234         work = list_entry(cur, struct btrfs_work, list);
235
236 out_fail:
237         return work;
238 }
239
240 /*
241  * main loop for servicing work items
242  */
243 static int worker_loop(void *arg)
244 {
245         struct btrfs_worker_thread *worker = arg;
246         struct list_head head;
247         struct list_head prio_head;
248         struct btrfs_work *work;
249
250         INIT_LIST_HEAD(&head);
251         INIT_LIST_HEAD(&prio_head);
252
253         do {
254 again:
255                 while (1) {
256
257
258                         work = get_next_work(worker, &prio_head, &head);
259                         if (!work)
260                                 break;
261
262                         list_del(&work->list);
263                         clear_bit(WORK_QUEUED_BIT, &work->flags);
264
265                         work->worker = worker;
266
267                         work->func(work);
268
269                         atomic_dec(&worker->num_pending);
270                         /*
271                          * unless this is an ordered work queue,
272                          * 'work' was probably freed by func above.
273                          */
274                         run_ordered_completions(worker->workers, work);
275
276                         check_pending_worker_creates(worker);
277
278                 }
279
280                 spin_lock_irq(&worker->lock);
281                 check_idle_worker(worker);
282
283                 if (freezing(current)) {
284                         worker->working = 0;
285                         spin_unlock_irq(&worker->lock);
286                         refrigerator();
287                 } else {
288                         spin_unlock_irq(&worker->lock);
289                         if (!kthread_should_stop()) {
290                                 cpu_relax();
291                                 /*
292                                  * we've dropped the lock, did someone else
293                                  * jump_in?
294                                  */
295                                 smp_mb();
296                                 if (!list_empty(&worker->pending) ||
297                                     !list_empty(&worker->prio_pending))
298                                         continue;
299
300                                 /*
301                                  * this short schedule allows more work to
302                                  * come in without the queue functions
303                                  * needing to go through wake_up_process()
304                                  *
305                                  * worker->working is still 1, so nobody
306                                  * is going to try and wake us up
307                                  */
308                                 schedule_timeout(1);
309                                 smp_mb();
310                                 if (!list_empty(&worker->pending) ||
311                                     !list_empty(&worker->prio_pending))
312                                         continue;
313
314                                 if (kthread_should_stop())
315                                         break;
316
317                                 /* still no more work?, sleep for real */
318                                 spin_lock_irq(&worker->lock);
319                                 set_current_state(TASK_INTERRUPTIBLE);
320                                 if (!list_empty(&worker->pending) ||
321                                     !list_empty(&worker->prio_pending)) {
322                                         spin_unlock_irq(&worker->lock);
323                                         goto again;
324                                 }
325
326                                 /*
327                                  * this makes sure we get a wakeup when someone
328                                  * adds something new to the queue
329                                  */
330                                 worker->working = 0;
331                                 spin_unlock_irq(&worker->lock);
332
333                                 if (!kthread_should_stop()) {
334                                         schedule_timeout(HZ * 120);
335                                         if (!worker->working &&
336                                             try_worker_shutdown(worker)) {
337                                                 return 0;
338                                         }
339                                 }
340                         }
341                         __set_current_state(TASK_RUNNING);
342                 }
343         } while (!kthread_should_stop());
344         return 0;
345 }
346
347 /*
348  * this will wait for all the worker threads to shutdown
349  */
350 int btrfs_stop_workers(struct btrfs_workers *workers)
351 {
352         struct list_head *cur;
353         struct btrfs_worker_thread *worker;
354         int can_stop;
355
356         spin_lock_irq(&workers->lock);
357         list_splice_init(&workers->idle_list, &workers->worker_list);
358         while (!list_empty(&workers->worker_list)) {
359                 cur = workers->worker_list.next;
360                 worker = list_entry(cur, struct btrfs_worker_thread,
361                                     worker_list);
362
363                 atomic_inc(&worker->refs);
364                 workers->num_workers -= 1;
365                 if (!list_empty(&worker->worker_list)) {
366                         list_del_init(&worker->worker_list);
367                         put_worker(worker);
368                         can_stop = 1;
369                 } else
370                         can_stop = 0;
371                 spin_unlock_irq(&workers->lock);
372                 if (can_stop)
373                         kthread_stop(worker->task);
374                 spin_lock_irq(&workers->lock);
375                 put_worker(worker);
376         }
377         spin_unlock_irq(&workers->lock);
378         return 0;
379 }
380
381 /*
382  * simple init on struct btrfs_workers
383  */
384 void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
385 {
386         workers->num_workers = 0;
387         INIT_LIST_HEAD(&workers->worker_list);
388         INIT_LIST_HEAD(&workers->idle_list);
389         INIT_LIST_HEAD(&workers->order_list);
390         INIT_LIST_HEAD(&workers->prio_order_list);
391         spin_lock_init(&workers->lock);
392         spin_lock_init(&workers->order_lock);
393         workers->max_workers = max;
394         workers->idle_thresh = 32;
395         workers->name = name;
396         workers->ordered = 0;
397         workers->atomic_start_pending = 0;
398         workers->atomic_worker_start = 0;
399 }
400
401 /*
402  * starts new worker threads.  This does not enforce the max worker
403  * count in case you need to temporarily go past it.
404  */
405 int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
406 {
407         struct btrfs_worker_thread *worker;
408         int ret = 0;
409         int i;
410
411         for (i = 0; i < num_workers; i++) {
412                 worker = kzalloc(sizeof(*worker), GFP_NOFS);
413                 if (!worker) {
414                         ret = -ENOMEM;
415                         goto fail;
416                 }
417
418                 INIT_LIST_HEAD(&worker->pending);
419                 INIT_LIST_HEAD(&worker->prio_pending);
420                 INIT_LIST_HEAD(&worker->worker_list);
421                 spin_lock_init(&worker->lock);
422
423                 atomic_set(&worker->num_pending, 0);
424                 atomic_set(&worker->refs, 1);
425                 worker->workers = workers;
426                 worker->task = kthread_run(worker_loop, worker,
427                                            "btrfs-%s-%d", workers->name,
428                                            workers->num_workers + i);
429                 if (IS_ERR(worker->task)) {
430                         ret = PTR_ERR(worker->task);
431                         kfree(worker);
432                         goto fail;
433                 }
434                 spin_lock_irq(&workers->lock);
435                 list_add_tail(&worker->worker_list, &workers->idle_list);
436                 worker->idle = 1;
437                 workers->num_workers++;
438                 spin_unlock_irq(&workers->lock);
439         }
440         return 0;
441 fail:
442         btrfs_stop_workers(workers);
443         return ret;
444 }
445
446 /*
447  * run through the list and find a worker thread that doesn't have a lot
448  * to do right now.  This can return null if we aren't yet at the thread
449  * count limit and all of the threads are busy.
450  */
451 static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
452 {
453         struct btrfs_worker_thread *worker;
454         struct list_head *next;
455         int enforce_min = workers->num_workers < workers->max_workers;
456
457         /*
458          * if we find an idle thread, don't move it to the end of the
459          * idle list.  This improves the chance that the next submission
460          * will reuse the same thread, and maybe catch it while it is still
461          * working
462          */
463         if (!list_empty(&workers->idle_list)) {
464                 next = workers->idle_list.next;
465                 worker = list_entry(next, struct btrfs_worker_thread,
466                                     worker_list);
467                 return worker;
468         }
469         if (enforce_min || list_empty(&workers->worker_list))
470                 return NULL;
471
472         /*
473          * if we pick a busy task, move the task to the end of the list.
474          * hopefully this will keep things somewhat evenly balanced.
475          * Do the move in batches based on the sequence number.  This groups
476          * requests submitted at roughly the same time onto the same worker.
477          */
478         next = workers->worker_list.next;
479         worker = list_entry(next, struct btrfs_worker_thread, worker_list);
480         atomic_inc(&worker->num_pending);
481         worker->sequence++;
482
483         if (worker->sequence % workers->idle_thresh == 0)
484                 list_move_tail(next, &workers->worker_list);
485         return worker;
486 }
487
488 /*
489  * selects a worker thread to take the next job.  This will either find
490  * an idle worker, start a new worker up to the max count, or just return
491  * one of the existing busy workers.
492  */
493 static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
494 {
495         struct btrfs_worker_thread *worker;
496         unsigned long flags;
497         struct list_head *fallback;
498
499 again:
500         spin_lock_irqsave(&workers->lock, flags);
501         worker = next_worker(workers);
502
503         if (!worker) {
504                 if (workers->num_workers >= workers->max_workers) {
505                         goto fallback;
506                 } else if (workers->atomic_worker_start) {
507                         workers->atomic_start_pending = 1;
508                         goto fallback;
509                 } else {
510                         spin_unlock_irqrestore(&workers->lock, flags);
511                         /* we're below the limit, start another worker */
512                         btrfs_start_workers(workers, 1);
513                         goto again;
514                 }
515         }
516         spin_unlock_irqrestore(&workers->lock, flags);
517         return worker;
518
519 fallback:
520         fallback = NULL;
521         /*
522          * we have failed to find any workers, just
523          * return the first one we can find.
524          */
525         if (!list_empty(&workers->worker_list))
526                 fallback = workers->worker_list.next;
527         if (!list_empty(&workers->idle_list))
528                 fallback = workers->idle_list.next;
529         BUG_ON(!fallback);
530         worker = list_entry(fallback,
531                   struct btrfs_worker_thread, worker_list);
532         spin_unlock_irqrestore(&workers->lock, flags);
533         return worker;
534 }
535
536 /*
537  * btrfs_requeue_work just puts the work item back on the tail of the list
538  * it was taken from.  It is intended for use with long running work functions
539  * that make some progress and want to give the cpu up for others.
540  */
541 int btrfs_requeue_work(struct btrfs_work *work)
542 {
543         struct btrfs_worker_thread *worker = work->worker;
544         unsigned long flags;
545         int wake = 0;
546
547         if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
548                 goto out;
549
550         spin_lock_irqsave(&worker->lock, flags);
551         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
552                 list_add_tail(&work->list, &worker->prio_pending);
553         else
554                 list_add_tail(&work->list, &worker->pending);
555         atomic_inc(&worker->num_pending);
556
557         /* by definition we're busy, take ourselves off the idle
558          * list
559          */
560         if (worker->idle) {
561                 spin_lock(&worker->workers->lock);
562                 worker->idle = 0;
563                 list_move_tail(&worker->worker_list,
564                                &worker->workers->worker_list);
565                 spin_unlock(&worker->workers->lock);
566         }
567         if (!worker->working) {
568                 wake = 1;
569                 worker->working = 1;
570         }
571
572         if (wake)
573                 wake_up_process(worker->task);
574         spin_unlock_irqrestore(&worker->lock, flags);
575 out:
576
577         return 0;
578 }
579
580 void btrfs_set_work_high_prio(struct btrfs_work *work)
581 {
582         set_bit(WORK_HIGH_PRIO_BIT, &work->flags);
583 }
584
585 /*
586  * places a struct btrfs_work into the pending queue of one of the kthreads
587  */
588 int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
589 {
590         struct btrfs_worker_thread *worker;
591         unsigned long flags;
592         int wake = 0;
593
594         /* don't requeue something already on a list */
595         if (test_and_set_bit(WORK_QUEUED_BIT, &work->flags))
596                 goto out;
597
598         worker = find_worker(workers);
599         if (workers->ordered) {
600                 /*
601                  * you're not allowed to do ordered queues from an
602                  * interrupt handler
603                  */
604                 spin_lock(&workers->order_lock);
605                 if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
606                         list_add_tail(&work->order_list,
607                                       &workers->prio_order_list);
608                 } else {
609                         list_add_tail(&work->order_list, &workers->order_list);
610                 }
611                 spin_unlock(&workers->order_lock);
612         } else {
613                 INIT_LIST_HEAD(&work->order_list);
614         }
615
616         spin_lock_irqsave(&worker->lock, flags);
617
618         if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags))
619                 list_add_tail(&work->list, &worker->prio_pending);
620         else
621                 list_add_tail(&work->list, &worker->pending);
622         atomic_inc(&worker->num_pending);
623         check_busy_worker(worker);
624
625         /*
626          * avoid calling into wake_up_process if this thread has already
627          * been kicked
628          */
629         if (!worker->working)
630                 wake = 1;
631         worker->working = 1;
632
633         if (wake)
634                 wake_up_process(worker->task);
635         spin_unlock_irqrestore(&worker->lock, flags);
636
637 out:
638         return 0;
639 }