OLD | NEW |
| (Empty) |
1 // Copyright (c) 2011 The Native Client Authors. All rights reserved. | |
2 // Use of this source code is governed by a BSD-style license that can be | |
3 // found in the LICENSE file. | |
4 | |
5 #ifndef SHARED_QUEUE_H | |
6 #define SHARED_QUEUE_H | |
7 | |
8 #include <pthread.h> | |
9 #include <cassert> | |
10 #include <deque> | |
11 | |
12 #include "thread_safe_ref_count.h" | |
13 | |
14 namespace { | |
15 | |
16 // This file provides a queue that uses a mutex and condition variable so that | |
17 // one thread can put pointers into the queue and another thread can pull items | |
18 // out of the queue. | |
19 | |
20 const int kPthreadMutexSuccess = 0; | |
21 | |
22 // Specifies whether we want to wait for the queue. | |
23 enum QueueWaitingFlag { | |
24 kWait = 0, | |
25 kDontWait | |
26 }; | |
27 | |
28 // Indicates if we got an item, did not wait, or if the queue was cancelled. | |
29 enum QueueGetResult { | |
30 kReturnedItem = 0, | |
31 kDidNotWait = 1, | |
32 kQueueWasCancelled | |
33 }; | |
34 | |
35 // LockingQueue contains a collection of <T>, such as a collection of | |
36 // objects or pointers. The Push() method is used to add items to the | |
37 // queue in a thread-safe manner. The GetItem() is used to retrieve | |
38 // items from the queue in a thread-safe manner. | |
39 template <class T> | |
40 class LockingQueue { | |
41 public: | |
42 LockingQueue() : quit_(false) { | |
43 int result = pthread_mutex_init(&queue_mutex_, NULL); | |
44 assert(result == 0); | |
45 result = pthread_cond_init(&queue_condition_var_, NULL); | |
46 assert(result == 0); | |
47 } | |
48 ~LockingQueue() { | |
49 pthread_mutex_destroy(&queue_mutex_); | |
50 } | |
51 | |
52 // The producer (who instantiates the queue) calls this to tell the | |
53 // consumer that the queue is no longer being used. | |
54 void CancelQueue() { | |
55 ScopedLock scoped_mutex(&queue_mutex_); | |
56 quit_ = true; | |
57 // Signal the condition var so that if a thread is waiting in | |
58 // GetItem the thread will wake up and see that the queue has | |
59 // been cancelled. | |
60 pthread_cond_signal(&queue_condition_var_); | |
61 } | |
62 | |
63 // The consumer calls this to see if the queue has been cancelled by | |
64 // the producer. If so, the thread should not call GetItem and may | |
65 // need to terminate -- i.e. in a case where the producer created | |
66 // the consumer thread. | |
67 bool IsCancelled() { | |
68 ScopedLock scoped_mutex(&queue_mutex_); | |
69 return quit_; | |
70 } | |
71 | |
72 // Grabs the mutex and pushes a new item to the end of the queue if the | |
73 // queue is not full. Signals the condition variable so that a thread | |
74 // that is waiting will wake up and grab the item. | |
75 void Push(const T& item) { | |
76 ScopedLock scoped_mutex(&queue_mutex_); | |
77 the_queue_.push_back(item); | |
78 pthread_cond_signal(&queue_condition_var_); | |
79 } | |
80 | |
81 // Tries to pop the front element from the queue; returns an enum: | |
82 // kReturnedItem if an item is returned in |item_ptr|, | |
83 // kDidNotWait if |wait| was kDontWait and the queue was empty, | |
84 // kQueueWasCancelled if the producer called CancelQueue(). | |
85 // If |wait| is kWait, GetItem will wait to return until the queue | |
86 // contains an item (unless the queue is cancelled). | |
87 QueueGetResult GetItem(T* item_ptr, QueueWaitingFlag wait) { | |
88 // Because we use both pthread_mutex_lock and pthread_cond_wait, | |
89 // we directly use the mutex instead of using ScopedLock. | |
90 ScopedLock scoped_mutex(&queue_mutex_); | |
91 // Use a while loop to get an item. If the user does not want to wait, | |
92 // we will exit from the loop anyway, unlocking the mutex. | |
93 // If the user does want to wait, we will wait for pthread_cond_wait, | |
94 // and the while loop will check is_empty_no_locking() one more | |
95 // time so that a spurious wake-up of pthread_cond_wait is handled. | |
96 // If |quit_| has been set, break out of the loop. | |
97 while (!quit_ && is_empty_no_locking()) { | |
98 // If user doesn't want to wait, return... | |
99 if (kDontWait == wait) { | |
100 return kDidNotWait; | |
101 } | |
102 // Wait for signal to occur. | |
103 pthread_cond_wait(&queue_condition_var_, &queue_mutex_); | |
104 } | |
105 // Check to see if quit_ woke us up | |
106 if (quit_) { | |
107 return kQueueWasCancelled; | |
108 } | |
109 | |
110 // At this point, the queue was either not empty or, if it was empty, | |
111 // we called pthread_cond_wait (which released the mutex, waited for the | |
112 // signal to occur, and then atomically reacquired the mutex). | |
113 // Thus, if we are here, the queue cannot be empty because we either | |
114 // had the mutex and verified it was not empty, or we waited for the | |
115 // producer to put an item in and signal a single thread (us). | |
116 T& item = the_queue_.front(); | |
117 *item_ptr = item; | |
118 the_queue_.pop_front(); | |
119 return kReturnedItem; | |
120 } | |
121 | |
122 private: | |
123 std::deque<T> the_queue_; | |
124 bool quit_; | |
125 pthread_mutex_t queue_mutex_; | |
126 pthread_cond_t queue_condition_var_; | |
127 | |
128 // This is used by methods that already have the lock. | |
129 bool is_empty_no_locking() const { | |
130 return the_queue_.empty(); | |
131 } | |
132 }; | |
133 | |
134 } // end of unnamed namespace | |
135 | |
136 #endif // SHARED_QUEUE_H | |
137 | |
OLD | NEW |