File: | pr/Linux4.19_x86_64_gcc_glibc_PTH_64_DBG.OBJ/pr/src/misc/../../../../pr/src/misc/prtpool.c |
Warning: | line 329, column 34 Dereference of null pointer |
Press '?' to see keyboard shortcuts
Keyboard shortcuts:
1 | /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ | |||
2 | /* This Source Code Form is subject to the terms of the Mozilla Public | |||
3 | * License, v. 2.0. If a copy of the MPL was not distributed with this | |||
4 | * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ | |||
5 | ||||
6 | #include "nspr.h" | |||
7 | ||||
8 | /* | |||
9 | * Thread pools | |||
10 | * Thread pools create and manage threads to provide support for | |||
11 | * scheduling jobs onto one or more threads. | |||
12 | * | |||
13 | */ | |||
14 | #ifdef OPT_WINNT | |||
15 | #include <windows.h> | |||
16 | #endif | |||
17 | ||||
18 | /* | |||
19 | * worker thread | |||
20 | */ | |||
21 | typedef struct wthread { | |||
22 | PRCList links; | |||
23 | PRThread *thread; | |||
24 | } wthread; | |||
25 | ||||
26 | /* | |||
27 | * queue of timer jobs | |||
28 | */ | |||
29 | typedef struct timer_jobq { | |||
30 | PRCList list; | |||
31 | PRLock *lock; | |||
32 | PRCondVar *cv; | |||
33 | PRInt32 cnt; | |||
34 | PRCList wthreads; | |||
35 | } timer_jobq; | |||
36 | ||||
37 | /* | |||
38 | * queue of jobs | |||
39 | */ | |||
40 | typedef struct tp_jobq { | |||
41 | PRCList list; | |||
42 | PRInt32 cnt; | |||
43 | PRLock *lock; | |||
44 | PRCondVar *cv; | |||
45 | PRCList wthreads; | |||
46 | #ifdef OPT_WINNT | |||
47 | HANDLE nt_completion_port; | |||
48 | #endif | |||
49 | } tp_jobq; | |||
50 | ||||
51 | /* | |||
52 | * queue of IO jobs | |||
53 | */ | |||
54 | typedef struct io_jobq { | |||
55 | PRCList list; | |||
56 | PRPollDesc *pollfds; | |||
57 | PRInt32 npollfds; | |||
58 | PRJob **polljobs; | |||
59 | PRLock *lock; | |||
60 | PRInt32 cnt; | |||
61 | PRFileDesc *notify_fd; | |||
62 | PRCList wthreads; | |||
63 | } io_jobq; | |||
64 | ||||
65 | /* | |||
66 | * Threadpool | |||
67 | */ | |||
68 | struct PRThreadPool { | |||
69 | PRInt32 init_threads; | |||
70 | PRInt32 max_threads; | |||
71 | PRInt32 current_threads; | |||
72 | PRInt32 idle_threads; | |||
73 | PRUint32 stacksize; | |||
74 | tp_jobq jobq; | |||
75 | io_jobq ioq; | |||
76 | timer_jobq timerq; | |||
77 | PRLock *join_lock; /* used with jobp->join_cv */ | |||
78 | PRCondVar *shutdown_cv; | |||
79 | PRBool shutdown; | |||
80 | }; | |||
81 | ||||
82 | typedef enum io_op_type | |||
83 | { JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type; | |||
84 | ||||
85 | #ifdef OPT_WINNT | |||
86 | typedef struct NT_notifier { | |||
87 | OVERLAPPED overlapped; /* must be first */ | |||
88 | PRJob *jobp; | |||
89 | } NT_notifier; | |||
90 | #endif | |||
91 | ||||
92 | struct PRJob { | |||
93 | PRCList links; /* for linking jobs */ | |||
94 | PRBool on_ioq; /* job on ioq */ | |||
95 | PRBool on_timerq; /* job on timerq */ | |||
96 | PRJobFn job_func; | |||
97 | void *job_arg; | |||
98 | PRCondVar *join_cv; | |||
99 | PRBool join_wait; /* == PR_TRUE, when waiting to join */ | |||
100 | PRCondVar *cancel_cv; /* for cancelling IO jobs */ | |||
101 | PRBool cancel_io; /* for cancelling IO jobs */ | |||
102 | PRThreadPool *tpool; /* back pointer to thread pool */ | |||
103 | PRJobIoDesc *iod; | |||
104 | io_op_type io_op; | |||
105 | PRInt16 io_poll_flags; | |||
106 | PRNetAddr *netaddr; | |||
107 | PRIntervalTime timeout; /* relative value */ | |||
108 | PRIntervalTime absolute; | |||
109 | #ifdef OPT_WINNT | |||
110 | NT_notifier nt_notifier; | |||
111 | #endif | |||
112 | }; | |||
113 | ||||
114 | #define JOB_LINKS_PTR(_qp)((PRJob *) ((char *) (_qp) - __builtin_offsetof(PRJob, links) )) \ | |||
115 | ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)__builtin_offsetof(PRJob, links))) | |||
116 | ||||
117 | #define WTHREAD_LINKS_PTR(_qp)((wthread *) ((char *) (_qp) - __builtin_offsetof(wthread, links ))) \ | |||
118 | ((wthread *) ((char *) (_qp) - offsetof(wthread, links)__builtin_offsetof(wthread, links))) | |||
119 | ||||
120 | #define JOINABLE_JOB(_jobp)(((void*)0) != (_jobp)->join_cv) (NULL((void*)0) != (_jobp)->join_cv) | |||
121 | ||||
122 | #define JOIN_NOTIFY(_jobp)do { PR_Lock(_jobp->tpool->join_lock); _jobp->join_wait = 0; PR_NotifyCondVar(_jobp->join_cv); PR_Unlock(_jobp-> tpool->join_lock); } while (0) \ | |||
123 | PR_BEGIN_MACROdo { \ | |||
124 | PR_Lock(_jobp->tpool->join_lock); \ | |||
125 | _jobp->join_wait = PR_FALSE0; \ | |||
126 | PR_NotifyCondVar(_jobp->join_cv); \ | |||
127 | PR_Unlock(_jobp->tpool->join_lock); \ | |||
128 | PR_END_MACRO} while (0) | |||
129 | ||||
130 | #define CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (& jobp->links)->prev->next = (&jobp->links)-> next; (&jobp->links)->next->prev = (&jobp-> links)->prev; (&jobp->links)->next = (&jobp-> links); (&jobp->links)->prev = (&jobp->links ); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv ); } while (0) \ | |||
131 | PR_BEGIN_MACROdo { \ | |||
132 | jobp->cancel_io = PR_FALSE0; \ | |||
133 | jobp->on_ioq = PR_FALSE0; \ | |||
134 | PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp-> links)->next; (&jobp->links)->next->prev = (& jobp->links)->prev; (&jobp->links)->next = (& jobp->links); (&jobp->links)->prev = (&jobp-> links); } while (0); \ | |||
135 | tp->ioq.cnt--; \ | |||
136 | PR_NotifyCondVar(jobp->cancel_cv); \ | |||
137 | PR_END_MACRO} while (0) | |||
138 | ||||
139 | static void delete_job(PRJob *jobp); | |||
140 | static PRThreadPool * alloc_threadpool(void); | |||
141 | static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp); | |||
142 | static void notify_ioq(PRThreadPool *tp); | |||
143 | static void notify_timerq(PRThreadPool *tp); | |||
144 | ||||
145 | /* | |||
146 | * locks are acquired in the following order | |||
147 | * | |||
148 | * tp->ioq.lock,tp->timerq.lock | |||
149 | * | | |||
150 | * V | |||
151 | * tp->jobq->lock | |||
152 | */ | |||
153 | ||||
154 | /* | |||
155 | * worker thread function | |||
156 | */ | |||
157 | static void wstart(void *arg) | |||
158 | { | |||
159 | PRThreadPool *tp = (PRThreadPool *) arg; | |||
160 | PRCList *head; | |||
161 | ||||
162 | /* | |||
163 | * execute jobs until shutdown | |||
164 | */ | |||
165 | while (!tp->shutdown) { | |||
166 | PRJob *jobp; | |||
167 | #ifdef OPT_WINNT | |||
168 | BOOL rv; | |||
169 | DWORD unused, shutdown; | |||
170 | LPOVERLAPPED olp; | |||
171 | ||||
172 | PR_Lock(tp->jobq.lock); | |||
173 | tp->idle_threads++; | |||
174 | PR_Unlock(tp->jobq.lock); | |||
175 | rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port, | |||
176 | &unused, &shutdown, &olp, INFINITE); | |||
177 | ||||
178 | PR_ASSERT(rv)((rv)?((void)0):PR_Assert("rv","../../../../pr/src/misc/prtpool.c" ,178)); | |||
179 | if (shutdown) { | |||
180 | break; | |||
181 | } | |||
182 | jobp = ((NT_notifier *) olp)->jobp; | |||
183 | PR_Lock(tp->jobq.lock); | |||
184 | tp->idle_threads--; | |||
185 | tp->jobq.cnt--; | |||
186 | PR_Unlock(tp->jobq.lock); | |||
187 | #else | |||
188 | ||||
189 | PR_Lock(tp->jobq.lock); | |||
190 | while (PR_CLIST_IS_EMPTY(&tp->jobq.list)((&tp->jobq.list)->next == (&tp->jobq.list)) && (!tp->shutdown)) { | |||
191 | tp->idle_threads++; | |||
192 | PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL); | |||
193 | tp->idle_threads--; | |||
194 | } | |||
195 | if (tp->shutdown) { | |||
196 | PR_Unlock(tp->jobq.lock); | |||
197 | break; | |||
198 | } | |||
199 | head = PR_LIST_HEAD(&tp->jobq.list)(&tp->jobq.list)->next; | |||
200 | /* | |||
201 | * remove job from queue | |||
202 | */ | |||
203 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
204 | tp->jobq.cnt--; | |||
205 | jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links ))); | |||
206 | PR_Unlock(tp->jobq.lock); | |||
207 | #endif | |||
208 | ||||
209 | jobp->job_func(jobp->job_arg); | |||
210 | if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) { | |||
211 | delete_job(jobp); | |||
212 | } else { | |||
213 | JOIN_NOTIFY(jobp)do { PR_Lock(jobp->tpool->join_lock); jobp->join_wait = 0; PR_NotifyCondVar(jobp->join_cv); PR_Unlock(jobp-> tpool->join_lock); } while (0); | |||
214 | } | |||
215 | } | |||
216 | PR_Lock(tp->jobq.lock); | |||
217 | tp->current_threads--; | |||
218 | PR_Unlock(tp->jobq.lock); | |||
219 | } | |||
220 | ||||
221 | /* | |||
222 | * add a job to the work queue | |||
223 | */ | |||
224 | static void | |||
225 | add_to_jobq(PRThreadPool *tp, PRJob *jobp) | |||
226 | { | |||
227 | /* | |||
228 | * add to jobq | |||
229 | */ | |||
230 | #ifdef OPT_WINNT | |||
231 | PR_Lock(tp->jobq.lock); | |||
232 | tp->jobq.cnt++; | |||
233 | PR_Unlock(tp->jobq.lock); | |||
234 | /* | |||
235 | * notify worker thread(s) | |||
236 | */ | |||
237 | PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0, | |||
238 | FALSE, &jobp->nt_notifier.overlapped); | |||
239 | #else | |||
240 | PR_Lock(tp->jobq.lock); | |||
241 | PR_APPEND_LINK(&jobp->links,&tp->jobq.list)do { (&jobp->links)->next = (&tp->jobq.list) ; (&jobp->links)->prev = (&tp->jobq.list)-> prev; (&tp->jobq.list)->prev->next = (&jobp-> links); (&tp->jobq.list)->prev = (&jobp->links ); } while (0); | |||
242 | tp->jobq.cnt++; | |||
243 | if ((tp->idle_threads < tp->jobq.cnt) && | |||
244 | (tp->current_threads < tp->max_threads)) { | |||
245 | wthread *wthrp; | |||
246 | /* | |||
247 | * increment thread count and unlock the jobq lock | |||
248 | */ | |||
249 | tp->current_threads++; | |||
250 | PR_Unlock(tp->jobq.lock); | |||
251 | /* create new worker thread */ | |||
252 | wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread))); | |||
253 | if (wthrp) { | |||
254 | wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart, | |||
255 | tp, PR_PRIORITY_NORMAL, | |||
256 | PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize); | |||
257 | if (NULL((void*)0) == wthrp->thread) { | |||
258 | PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); }; /* this sets wthrp to NULL */ | |||
259 | } | |||
260 | } | |||
261 | PR_Lock(tp->jobq.lock); | |||
262 | if (NULL((void*)0) == wthrp) { | |||
263 | tp->current_threads--; | |||
264 | } else { | |||
265 | PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads)do { (&wthrp->links)->next = (&tp->jobq.wthreads ); (&wthrp->links)->prev = (&tp->jobq.wthreads )->prev; (&tp->jobq.wthreads)->prev->next = ( &wthrp->links); (&tp->jobq.wthreads)->prev = (&wthrp->links); } while (0); | |||
266 | } | |||
267 | } | |||
268 | /* | |||
269 | * wakeup a worker thread | |||
270 | */ | |||
271 | PR_NotifyCondVar(tp->jobq.cv); | |||
272 | PR_Unlock(tp->jobq.lock); | |||
273 | #endif | |||
274 | } | |||
275 | ||||
276 | /* | |||
277 | * io worker thread function | |||
278 | */ | |||
279 | static void io_wstart(void *arg) | |||
280 | { | |||
281 | PRThreadPool *tp = (PRThreadPool *) arg; | |||
282 | int pollfd_cnt, pollfds_used; | |||
283 | int rv; | |||
284 | PRCList *qp, *nextqp; | |||
285 | PRPollDesc *pollfds = NULL((void*)0); | |||
| ||||
286 | PRJob **polljobs = NULL((void*)0); | |||
287 | int poll_timeout; | |||
288 | PRIntervalTime now; | |||
289 | ||||
290 | /* | |||
291 | * scan io_jobq | |||
292 | * construct poll list | |||
293 | * call PR_Poll | |||
294 | * for all fds, for which poll returns true, move the job to | |||
295 | * jobq and wakeup worker thread. | |||
296 | */ | |||
297 | while (!tp->shutdown) { | |||
298 | PRJob *jobp; | |||
299 | ||||
300 | pollfd_cnt = tp->ioq.cnt + 10; | |||
301 | if (pollfd_cnt > tp->ioq.npollfds) { | |||
302 | ||||
303 | /* | |||
304 | * re-allocate pollfd array if the current one is not large | |||
305 | * enough | |||
306 | */ | |||
307 | if (NULL((void*)0) != tp->ioq.pollfds) { | |||
308 | PR_Free(tp->ioq.pollfds); | |||
309 | } | |||
310 | tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt * | |||
311 | (sizeof(PRPollDesc) + sizeof(PRJob *))); | |||
312 | PR_ASSERT(NULL != tp->ioq.pollfds)((((void*)0) != tp->ioq.pollfds)?((void)0):PR_Assert("NULL != tp->ioq.pollfds" ,"../../../../pr/src/misc/prtpool.c",312)); | |||
313 | /* | |||
314 | * array of pollfds | |||
315 | */ | |||
316 | pollfds = tp->ioq.pollfds; | |||
317 | tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]); | |||
318 | /* | |||
319 | * parallel array of jobs | |||
320 | */ | |||
321 | polljobs = tp->ioq.polljobs; | |||
322 | tp->ioq.npollfds = pollfd_cnt; | |||
323 | } | |||
324 | ||||
325 | pollfds_used = 0; | |||
326 | /* | |||
327 | * add the notify fd; used for unblocking io thread(s) | |||
328 | */ | |||
329 | pollfds[pollfds_used].fd = tp->ioq.notify_fd; | |||
| ||||
330 | pollfds[pollfds_used].in_flags = PR_POLL_READ0x1; | |||
331 | pollfds[pollfds_used].out_flags = 0; | |||
332 | polljobs[pollfds_used] = NULL((void*)0); | |||
333 | pollfds_used++; | |||
334 | /* | |||
335 | * fill in the pollfd array | |||
336 | */ | |||
337 | PR_Lock(tp->ioq.lock); | |||
338 | for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { | |||
339 | nextqp = qp->next; | |||
340 | jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
341 | if (jobp->cancel_io) { | |||
342 | CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (& jobp->links)->prev->next = (&jobp->links)-> next; (&jobp->links)->next->prev = (&jobp-> links)->prev; (&jobp->links)->next = (&jobp-> links); (&jobp->links)->prev = (&jobp->links ); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv ); } while (0); | |||
343 | continue; | |||
344 | } | |||
345 | if (pollfds_used == (pollfd_cnt)) { | |||
346 | break; | |||
347 | } | |||
348 | pollfds[pollfds_used].fd = jobp->iod->socket; | |||
349 | pollfds[pollfds_used].in_flags = jobp->io_poll_flags; | |||
350 | pollfds[pollfds_used].out_flags = 0; | |||
351 | polljobs[pollfds_used] = jobp; | |||
352 | ||||
353 | pollfds_used++; | |||
354 | } | |||
355 | if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)((&tp->ioq.list)->next == (&tp->ioq.list))) { | |||
356 | qp = tp->ioq.list.next; | |||
357 | jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
358 | if (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == jobp->timeout) { | |||
359 | poll_timeout = PR_INTERVAL_NO_TIMEOUT0xffffffffUL; | |||
360 | } | |||
361 | else if (PR_INTERVAL_NO_WAIT0UL == jobp->timeout) { | |||
362 | poll_timeout = PR_INTERVAL_NO_WAIT0UL; | |||
363 | } | |||
364 | else { | |||
365 | poll_timeout = jobp->absolute - PR_IntervalNow(); | |||
366 | if (poll_timeout <= 0) { /* already timed out */ | |||
367 | poll_timeout = PR_INTERVAL_NO_WAIT0UL; | |||
368 | } | |||
369 | } | |||
370 | } else { | |||
371 | poll_timeout = PR_INTERVAL_NO_TIMEOUT0xffffffffUL; | |||
372 | } | |||
373 | PR_Unlock(tp->ioq.lock); | |||
374 | ||||
375 | /* | |||
376 | * XXXX | |||
377 | * should retry if more jobs have been added to the queue? | |||
378 | * | |||
379 | */ | |||
380 | PR_ASSERT(pollfds_used <= pollfd_cnt)((pollfds_used <= pollfd_cnt)?((void)0):PR_Assert("pollfds_used <= pollfd_cnt" ,"../../../../pr/src/misc/prtpool.c",380)); | |||
381 | rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout); | |||
382 | ||||
383 | if (tp->shutdown) { | |||
384 | break; | |||
385 | } | |||
386 | ||||
387 | if (rv > 0) { | |||
388 | /* | |||
389 | * at least one io event is set | |||
390 | */ | |||
391 | PRStatus rval_status; | |||
392 | PRInt32 index; | |||
393 | ||||
394 | PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd)((pollfds[0].fd == tp->ioq.notify_fd)?((void)0):PR_Assert( "pollfds[0].fd == tp->ioq.notify_fd","../../../../pr/src/misc/prtpool.c" ,394)); | |||
395 | /* | |||
396 | * reset the pollable event, if notified | |||
397 | */ | |||
398 | if (pollfds[0].out_flags & PR_POLL_READ0x1) { | |||
399 | rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd); | |||
400 | PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status" ,"../../../../pr/src/misc/prtpool.c",400)); | |||
401 | } | |||
402 | ||||
403 | for(index = 1; index < (pollfds_used); index++) { | |||
404 | PRInt16 events = pollfds[index].in_flags; | |||
405 | PRInt16 revents = pollfds[index].out_flags; | |||
406 | jobp = polljobs[index]; | |||
407 | ||||
408 | if ((revents & PR_POLL_NVAL0x10) || /* busted in all cases */ | |||
409 | (revents & PR_POLL_ERR0x8) || | |||
410 | ((events & PR_POLL_WRITE0x2) && | |||
411 | (revents & PR_POLL_HUP0x20))) { /* write op & hup */ | |||
412 | PR_Lock(tp->ioq.lock); | |||
413 | if (jobp->cancel_io) { | |||
414 | CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (& jobp->links)->prev->next = (&jobp->links)-> next; (&jobp->links)->next->prev = (&jobp-> links)->prev; (&jobp->links)->next = (&jobp-> links); (&jobp->links)->prev = (&jobp->links ); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv ); } while (0); | |||
415 | PR_Unlock(tp->ioq.lock); | |||
416 | continue; | |||
417 | } | |||
418 | PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp-> links)->next; (&jobp->links)->next->prev = (& jobp->links)->prev; (&jobp->links)->next = (& jobp->links); (&jobp->links)->prev = (&jobp-> links); } while (0); | |||
419 | tp->ioq.cnt--; | |||
420 | jobp->on_ioq = PR_FALSE0; | |||
421 | PR_Unlock(tp->ioq.lock); | |||
422 | ||||
423 | /* set error */ | |||
424 | if (PR_POLL_NVAL0x10 & revents) { | |||
425 | jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR(-5999L); | |||
426 | } | |||
427 | else if (PR_POLL_HUP0x20 & revents) { | |||
428 | jobp->iod->error = PR_CONNECT_RESET_ERROR(-5961L); | |||
429 | } | |||
430 | else { | |||
431 | jobp->iod->error = PR_IO_ERROR(-5991L); | |||
432 | } | |||
433 | ||||
434 | /* | |||
435 | * add to jobq | |||
436 | */ | |||
437 | add_to_jobq(tp, jobp); | |||
438 | } else if (revents) { | |||
439 | /* | |||
440 | * add to jobq | |||
441 | */ | |||
442 | PR_Lock(tp->ioq.lock); | |||
443 | if (jobp->cancel_io) { | |||
444 | CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (& jobp->links)->prev->next = (&jobp->links)-> next; (&jobp->links)->next->prev = (&jobp-> links)->prev; (&jobp->links)->next = (&jobp-> links); (&jobp->links)->prev = (&jobp->links ); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv ); } while (0); | |||
445 | PR_Unlock(tp->ioq.lock); | |||
446 | continue; | |||
447 | } | |||
448 | PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp-> links)->next; (&jobp->links)->next->prev = (& jobp->links)->prev; (&jobp->links)->next = (& jobp->links); (&jobp->links)->prev = (&jobp-> links); } while (0); | |||
449 | tp->ioq.cnt--; | |||
450 | jobp->on_ioq = PR_FALSE0; | |||
451 | PR_Unlock(tp->ioq.lock); | |||
452 | ||||
453 | if (jobp->io_op == JOB_IO_CONNECT) { | |||
454 | if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) { | |||
455 | jobp->iod->error = 0; | |||
456 | } | |||
457 | else { | |||
458 | jobp->iod->error = PR_GetError(); | |||
459 | } | |||
460 | } else { | |||
461 | jobp->iod->error = 0; | |||
462 | } | |||
463 | ||||
464 | add_to_jobq(tp, jobp); | |||
465 | } | |||
466 | } | |||
467 | } | |||
468 | /* | |||
469 | * timeout processing | |||
470 | */ | |||
471 | now = PR_IntervalNow(); | |||
472 | PR_Lock(tp->ioq.lock); | |||
473 | for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) { | |||
474 | nextqp = qp->next; | |||
475 | jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
476 | if (jobp->cancel_io) { | |||
477 | CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (& jobp->links)->prev->next = (&jobp->links)-> next; (&jobp->links)->next->prev = (&jobp-> links)->prev; (&jobp->links)->next = (&jobp-> links); (&jobp->links)->prev = (&jobp->links ); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv ); } while (0); | |||
478 | continue; | |||
479 | } | |||
480 | if (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == jobp->timeout) { | |||
481 | break; | |||
482 | } | |||
483 | if ((PR_INTERVAL_NO_WAIT0UL != jobp->timeout) && | |||
484 | ((PRInt32)(jobp->absolute - now) > 0)) { | |||
485 | break; | |||
486 | } | |||
487 | PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp-> links)->next; (&jobp->links)->next->prev = (& jobp->links)->prev; (&jobp->links)->next = (& jobp->links); (&jobp->links)->prev = (&jobp-> links); } while (0); | |||
488 | tp->ioq.cnt--; | |||
489 | jobp->on_ioq = PR_FALSE0; | |||
490 | jobp->iod->error = PR_IO_TIMEOUT_ERROR(-5990L); | |||
491 | add_to_jobq(tp, jobp); | |||
492 | } | |||
493 | PR_Unlock(tp->ioq.lock); | |||
494 | } | |||
495 | } | |||
496 | ||||
497 | /* | |||
498 | * timer worker thread function | |||
499 | */ | |||
500 | static void timer_wstart(void *arg) | |||
501 | { | |||
502 | PRThreadPool *tp = (PRThreadPool *) arg; | |||
503 | PRCList *qp; | |||
504 | PRIntervalTime timeout; | |||
505 | PRIntervalTime now; | |||
506 | ||||
507 | /* | |||
508 | * call PR_WaitCondVar with minimum value of all timeouts | |||
509 | */ | |||
510 | while (!tp->shutdown) { | |||
511 | PRJob *jobp; | |||
512 | ||||
513 | PR_Lock(tp->timerq.lock); | |||
514 | if (PR_CLIST_IS_EMPTY(&tp->timerq.list)((&tp->timerq.list)->next == (&tp->timerq.list ))) { | |||
515 | timeout = PR_INTERVAL_NO_TIMEOUT0xffffffffUL; | |||
516 | } else { | |||
517 | PRCList *qp; | |||
518 | ||||
519 | qp = tp->timerq.list.next; | |||
520 | jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
521 | ||||
522 | timeout = jobp->absolute - PR_IntervalNow(); | |||
523 | if (timeout <= 0) { | |||
524 | timeout = PR_INTERVAL_NO_WAIT0UL; /* already timed out */ | |||
525 | } | |||
526 | } | |||
527 | if (PR_INTERVAL_NO_WAIT0UL != timeout) { | |||
528 | PR_WaitCondVar(tp->timerq.cv, timeout); | |||
529 | } | |||
530 | if (tp->shutdown) { | |||
531 | PR_Unlock(tp->timerq.lock); | |||
532 | break; | |||
533 | } | |||
534 | /* | |||
535 | * move expired-timer jobs to jobq | |||
536 | */ | |||
537 | now = PR_IntervalNow(); | |||
538 | while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)((&tp->timerq.list)->next == (&tp->timerq.list ))) { | |||
539 | qp = tp->timerq.list.next; | |||
540 | jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
541 | ||||
542 | if ((PRInt32)(jobp->absolute - now) > 0) { | |||
543 | break; | |||
544 | } | |||
545 | /* | |||
546 | * job timed out | |||
547 | */ | |||
548 | PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp-> links)->next; (&jobp->links)->next->prev = (& jobp->links)->prev; (&jobp->links)->next = (& jobp->links); (&jobp->links)->prev = (&jobp-> links); } while (0); | |||
549 | tp->timerq.cnt--; | |||
550 | jobp->on_timerq = PR_FALSE0; | |||
551 | add_to_jobq(tp, jobp); | |||
552 | } | |||
553 | PR_Unlock(tp->timerq.lock); | |||
554 | } | |||
555 | } | |||
556 | ||||
557 | static void | |||
558 | delete_threadpool(PRThreadPool *tp) | |||
559 | { | |||
560 | if (NULL((void*)0) != tp) { | |||
561 | if (NULL((void*)0) != tp->shutdown_cv) { | |||
562 | PR_DestroyCondVar(tp->shutdown_cv); | |||
563 | } | |||
564 | if (NULL((void*)0) != tp->jobq.cv) { | |||
565 | PR_DestroyCondVar(tp->jobq.cv); | |||
566 | } | |||
567 | if (NULL((void*)0) != tp->jobq.lock) { | |||
568 | PR_DestroyLock(tp->jobq.lock); | |||
569 | } | |||
570 | if (NULL((void*)0) != tp->join_lock) { | |||
571 | PR_DestroyLock(tp->join_lock); | |||
572 | } | |||
573 | #ifdef OPT_WINNT | |||
574 | if (NULL((void*)0) != tp->jobq.nt_completion_port) { | |||
575 | CloseHandle(tp->jobq.nt_completion_port); | |||
576 | } | |||
577 | #endif | |||
578 | /* Timer queue */ | |||
579 | if (NULL((void*)0) != tp->timerq.cv) { | |||
580 | PR_DestroyCondVar(tp->timerq.cv); | |||
581 | } | |||
582 | if (NULL((void*)0) != tp->timerq.lock) { | |||
583 | PR_DestroyLock(tp->timerq.lock); | |||
584 | } | |||
585 | ||||
586 | if (NULL((void*)0) != tp->ioq.lock) { | |||
587 | PR_DestroyLock(tp->ioq.lock); | |||
588 | } | |||
589 | if (NULL((void*)0) != tp->ioq.pollfds) { | |||
590 | PR_Free(tp->ioq.pollfds); | |||
591 | } | |||
592 | if (NULL((void*)0) != tp->ioq.notify_fd) { | |||
593 | PR_DestroyPollableEvent(tp->ioq.notify_fd); | |||
594 | } | |||
595 | PR_Free(tp); | |||
596 | } | |||
597 | return; | |||
598 | } | |||
599 | ||||
600 | static PRThreadPool * | |||
601 | alloc_threadpool(void) | |||
602 | { | |||
603 | PRThreadPool *tp; | |||
604 | ||||
605 | tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp))(PR_Calloc(1, (sizeof(*tp)))); | |||
606 | if (NULL((void*)0) == tp) { | |||
607 | goto failed; | |||
608 | } | |||
609 | tp->jobq.lock = PR_NewLock(); | |||
610 | if (NULL((void*)0) == tp->jobq.lock) { | |||
611 | goto failed; | |||
612 | } | |||
613 | tp->jobq.cv = PR_NewCondVar(tp->jobq.lock); | |||
614 | if (NULL((void*)0) == tp->jobq.cv) { | |||
615 | goto failed; | |||
616 | } | |||
617 | tp->join_lock = PR_NewLock(); | |||
618 | if (NULL((void*)0) == tp->join_lock) { | |||
619 | goto failed; | |||
620 | } | |||
621 | #ifdef OPT_WINNT | |||
622 | tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, | |||
623 | NULL((void*)0), 0, 0); | |||
624 | if (NULL((void*)0) == tp->jobq.nt_completion_port) { | |||
625 | goto failed; | |||
626 | } | |||
627 | #endif | |||
628 | ||||
629 | tp->ioq.lock = PR_NewLock(); | |||
630 | if (NULL((void*)0) == tp->ioq.lock) { | |||
631 | goto failed; | |||
632 | } | |||
633 | ||||
634 | /* Timer queue */ | |||
635 | ||||
636 | tp->timerq.lock = PR_NewLock(); | |||
637 | if (NULL((void*)0) == tp->timerq.lock) { | |||
638 | goto failed; | |||
639 | } | |||
640 | tp->timerq.cv = PR_NewCondVar(tp->timerq.lock); | |||
641 | if (NULL((void*)0) == tp->timerq.cv) { | |||
642 | goto failed; | |||
643 | } | |||
644 | ||||
645 | tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock); | |||
646 | if (NULL((void*)0) == tp->shutdown_cv) { | |||
647 | goto failed; | |||
648 | } | |||
649 | tp->ioq.notify_fd = PR_NewPollableEvent(); | |||
650 | if (NULL((void*)0) == tp->ioq.notify_fd) { | |||
651 | goto failed; | |||
652 | } | |||
653 | return tp; | |||
654 | failed: | |||
655 | delete_threadpool(tp); | |||
656 | PR_SetError(PR_OUT_OF_MEMORY_ERROR(-6000L), 0); | |||
657 | return NULL((void*)0); | |||
658 | } | |||
659 | ||||
660 | /* Create thread pool */ | |||
661 | PR_IMPLEMENT(PRThreadPool *)__attribute__((visibility("default"))) PRThreadPool * | |||
662 | PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads, | |||
663 | PRUint32 stacksize) | |||
664 | { | |||
665 | PRThreadPool *tp; | |||
666 | PRThread *thr; | |||
667 | int i; | |||
668 | wthread *wthrp; | |||
669 | ||||
670 | tp = alloc_threadpool(); | |||
671 | if (NULL((void*)0) == tp) { | |||
672 | return NULL((void*)0); | |||
673 | } | |||
674 | ||||
675 | tp->init_threads = initial_threads; | |||
676 | tp->max_threads = max_threads; | |||
677 | tp->stacksize = stacksize; | |||
678 | PR_INIT_CLIST(&tp->jobq.list)do { (&tp->jobq.list)->next = (&tp->jobq.list ); (&tp->jobq.list)->prev = (&tp->jobq.list) ; } while (0); | |||
679 | PR_INIT_CLIST(&tp->ioq.list)do { (&tp->ioq.list)->next = (&tp->ioq.list) ; (&tp->ioq.list)->prev = (&tp->ioq.list); } while (0); | |||
680 | PR_INIT_CLIST(&tp->timerq.list)do { (&tp->timerq.list)->next = (&tp->timerq .list); (&tp->timerq.list)->prev = (&tp->timerq .list); } while (0); | |||
681 | PR_INIT_CLIST(&tp->jobq.wthreads)do { (&tp->jobq.wthreads)->next = (&tp->jobq .wthreads); (&tp->jobq.wthreads)->prev = (&tp-> jobq.wthreads); } while (0); | |||
682 | PR_INIT_CLIST(&tp->ioq.wthreads)do { (&tp->ioq.wthreads)->next = (&tp->ioq.wthreads ); (&tp->ioq.wthreads)->prev = (&tp->ioq.wthreads ); } while (0); | |||
683 | PR_INIT_CLIST(&tp->timerq.wthreads)do { (&tp->timerq.wthreads)->next = (&tp->timerq .wthreads); (&tp->timerq.wthreads)->prev = (&tp ->timerq.wthreads); } while (0); | |||
684 | tp->shutdown = PR_FALSE0; | |||
685 | ||||
686 | PR_Lock(tp->jobq.lock); | |||
687 | for(i=0; i < initial_threads; ++i) { | |||
688 | ||||
689 | thr = PR_CreateThread(PR_USER_THREAD, wstart, | |||
690 | tp, PR_PRIORITY_NORMAL, | |||
691 | PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize); | |||
692 | PR_ASSERT(thr)((thr)?((void)0):PR_Assert("thr","../../../../pr/src/misc/prtpool.c" ,692)); | |||
693 | wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread))); | |||
694 | PR_ASSERT(wthrp)((wthrp)?((void)0):PR_Assert("wthrp","../../../../pr/src/misc/prtpool.c" ,694)); | |||
695 | wthrp->thread = thr; | |||
696 | PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads)do { (&wthrp->links)->next = (&tp->jobq.wthreads ); (&wthrp->links)->prev = (&tp->jobq.wthreads )->prev; (&tp->jobq.wthreads)->prev->next = ( &wthrp->links); (&tp->jobq.wthreads)->prev = (&wthrp->links); } while (0); | |||
697 | } | |||
698 | tp->current_threads = initial_threads; | |||
699 | ||||
700 | thr = PR_CreateThread(PR_USER_THREAD, io_wstart, | |||
701 | tp, PR_PRIORITY_NORMAL, | |||
702 | PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); | |||
703 | PR_ASSERT(thr)((thr)?((void)0):PR_Assert("thr","../../../../pr/src/misc/prtpool.c" ,703)); | |||
704 | wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread))); | |||
705 | PR_ASSERT(wthrp)((wthrp)?((void)0):PR_Assert("wthrp","../../../../pr/src/misc/prtpool.c" ,705)); | |||
706 | wthrp->thread = thr; | |||
707 | PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads)do { (&wthrp->links)->next = (&tp->ioq.wthreads ); (&wthrp->links)->prev = (&tp->ioq.wthreads )->prev; (&tp->ioq.wthreads)->prev->next = (& wthrp->links); (&tp->ioq.wthreads)->prev = (& wthrp->links); } while (0); | |||
708 | ||||
709 | thr = PR_CreateThread(PR_USER_THREAD, timer_wstart, | |||
710 | tp, PR_PRIORITY_NORMAL, | |||
711 | PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize); | |||
712 | PR_ASSERT(thr)((thr)?((void)0):PR_Assert("thr","../../../../pr/src/misc/prtpool.c" ,712)); | |||
713 | wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread))); | |||
714 | PR_ASSERT(wthrp)((wthrp)?((void)0):PR_Assert("wthrp","../../../../pr/src/misc/prtpool.c" ,714)); | |||
715 | wthrp->thread = thr; | |||
716 | PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads)do { (&wthrp->links)->next = (&tp->timerq.wthreads ); (&wthrp->links)->prev = (&tp->timerq.wthreads )->prev; (&tp->timerq.wthreads)->prev->next = (&wthrp->links); (&tp->timerq.wthreads)->prev = (&wthrp->links); } while (0); | |||
717 | ||||
718 | PR_Unlock(tp->jobq.lock); | |||
719 | return tp; | |||
720 | } | |||
721 | ||||
722 | static void | |||
723 | delete_job(PRJob *jobp) | |||
724 | { | |||
725 | if (NULL((void*)0) != jobp) { | |||
726 | if (NULL((void*)0) != jobp->join_cv) { | |||
727 | PR_DestroyCondVar(jobp->join_cv); | |||
728 | jobp->join_cv = NULL((void*)0); | |||
729 | } | |||
730 | if (NULL((void*)0) != jobp->cancel_cv) { | |||
731 | PR_DestroyCondVar(jobp->cancel_cv); | |||
732 | jobp->cancel_cv = NULL((void*)0); | |||
733 | } | |||
734 | PR_DELETE(jobp){ PR_Free(jobp); (jobp) = ((void*)0); }; | |||
735 | } | |||
736 | } | |||
737 | ||||
738 | static PRJob * | |||
739 | alloc_job(PRBool joinable, PRThreadPool *tp) | |||
740 | { | |||
741 | PRJob *jobp; | |||
742 | ||||
743 | jobp = PR_NEWZAP(PRJob)((PRJob*)PR_Calloc(1, sizeof(PRJob))); | |||
744 | if (NULL((void*)0) == jobp) { | |||
745 | goto failed; | |||
746 | } | |||
747 | if (joinable) { | |||
748 | jobp->join_cv = PR_NewCondVar(tp->join_lock); | |||
749 | jobp->join_wait = PR_TRUE1; | |||
750 | if (NULL((void*)0) == jobp->join_cv) { | |||
751 | goto failed; | |||
752 | } | |||
753 | } else { | |||
754 | jobp->join_cv = NULL((void*)0); | |||
755 | } | |||
756 | #ifdef OPT_WINNT | |||
757 | jobp->nt_notifier.jobp = jobp; | |||
758 | #endif | |||
759 | return jobp; | |||
760 | failed: | |||
761 | delete_job(jobp); | |||
762 | PR_SetError(PR_OUT_OF_MEMORY_ERROR(-6000L), 0); | |||
763 | return NULL((void*)0); | |||
764 | } | |||
765 | ||||
766 | /* queue a job */ | |||
767 | PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob * | |||
768 | PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable) | |||
769 | { | |||
770 | PRJob *jobp; | |||
771 | ||||
772 | jobp = alloc_job(joinable, tpool); | |||
773 | if (NULL((void*)0) == jobp) { | |||
774 | return NULL((void*)0); | |||
775 | } | |||
776 | ||||
777 | jobp->job_func = fn; | |||
778 | jobp->job_arg = arg; | |||
779 | jobp->tpool = tpool; | |||
780 | ||||
781 | add_to_jobq(tpool, jobp); | |||
782 | return jobp; | |||
783 | } | |||
784 | ||||
785 | /* queue a job, when a socket is readable or writeable */ | |||
786 | static PRJob * | |||
787 | queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, | |||
788 | PRBool joinable, io_op_type op) | |||
789 | { | |||
790 | PRJob *jobp; | |||
791 | PRIntervalTime now; | |||
792 | ||||
793 | jobp = alloc_job(joinable, tpool); | |||
794 | if (NULL((void*)0) == jobp) { | |||
795 | return NULL((void*)0); | |||
796 | } | |||
797 | ||||
798 | /* | |||
799 | * Add a new job to io_jobq | |||
800 | * wakeup io worker thread | |||
801 | */ | |||
802 | ||||
803 | jobp->job_func = fn; | |||
804 | jobp->job_arg = arg; | |||
805 | jobp->tpool = tpool; | |||
806 | jobp->iod = iod; | |||
807 | if (JOB_IO_READ == op) { | |||
808 | jobp->io_op = JOB_IO_READ; | |||
809 | jobp->io_poll_flags = PR_POLL_READ0x1; | |||
810 | } else if (JOB_IO_WRITE == op) { | |||
811 | jobp->io_op = JOB_IO_WRITE; | |||
812 | jobp->io_poll_flags = PR_POLL_WRITE0x2; | |||
813 | } else if (JOB_IO_ACCEPT == op) { | |||
814 | jobp->io_op = JOB_IO_ACCEPT; | |||
815 | jobp->io_poll_flags = PR_POLL_READ0x1; | |||
816 | } else if (JOB_IO_CONNECT == op) { | |||
817 | jobp->io_op = JOB_IO_CONNECT; | |||
818 | jobp->io_poll_flags = PR_POLL_WRITE0x2|PR_POLL_EXCEPT0x4; | |||
819 | } else { | |||
820 | delete_job(jobp); | |||
821 | PR_SetError(PR_INVALID_ARGUMENT_ERROR(-5987L), 0); | |||
822 | return NULL((void*)0); | |||
823 | } | |||
824 | ||||
825 | jobp->timeout = iod->timeout; | |||
826 | if ((PR_INTERVAL_NO_TIMEOUT0xffffffffUL == iod->timeout) || | |||
827 | (PR_INTERVAL_NO_WAIT0UL == iod->timeout)) { | |||
828 | jobp->absolute = iod->timeout; | |||
829 | } else { | |||
830 | now = PR_IntervalNow(); | |||
831 | jobp->absolute = now + iod->timeout; | |||
832 | } | |||
833 | ||||
834 | ||||
835 | PR_Lock(tpool->ioq.lock); | |||
836 | ||||
837 | if (PR_CLIST_IS_EMPTY(&tpool->ioq.list)((&tpool->ioq.list)->next == (&tpool->ioq.list )) || | |||
838 | (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == iod->timeout)) { | |||
839 | PR_APPEND_LINK(&jobp->links,&tpool->ioq.list)do { (&jobp->links)->next = (&tpool->ioq.list ); (&jobp->links)->prev = (&tpool->ioq.list) ->prev; (&tpool->ioq.list)->prev->next = (& jobp->links); (&tpool->ioq.list)->prev = (&jobp ->links); } while (0); | |||
840 | } else if (PR_INTERVAL_NO_WAIT0UL == iod->timeout) { | |||
841 | PR_INSERT_LINK(&jobp->links,&tpool->ioq.list)do { (&jobp->links)->next = (&tpool->ioq.list )->next; (&jobp->links)->prev = (&tpool-> ioq.list); (&tpool->ioq.list)->next->prev = (& jobp->links); (&tpool->ioq.list)->next = (&jobp ->links); } while (0); | |||
842 | } else { | |||
843 | PRCList *qp; | |||
844 | PRJob *tmp_jobp; | |||
845 | /* | |||
846 | * insert into the timeout-sorted ioq | |||
847 | */ | |||
848 | for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list; | |||
849 | qp = qp->prev) { | |||
850 | tmp_jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
851 | if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { | |||
852 | break; | |||
853 | } | |||
854 | } | |||
855 | PR_INSERT_AFTER(&jobp->links,qp)do { (&jobp->links)->next = (qp)->next; (&jobp ->links)->prev = (qp); (qp)->next->prev = (&jobp ->links); (qp)->next = (&jobp->links); } while ( 0); | |||
856 | } | |||
857 | ||||
858 | jobp->on_ioq = PR_TRUE1; | |||
859 | tpool->ioq.cnt++; | |||
860 | /* | |||
861 | * notify io worker thread(s) | |||
862 | */ | |||
863 | PR_Unlock(tpool->ioq.lock); | |||
864 | notify_ioq(tpool); | |||
865 | return jobp; | |||
866 | } | |||
867 | ||||
868 | /* queue a job, when a socket is readable */ | |||
869 | PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob * | |||
870 | PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg, | |||
871 | PRBool joinable) | |||
872 | { | |||
873 | return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ)); | |||
874 | } | |||
875 | ||||
876 | /* queue a job, when a socket is writeable */ | |||
877 | PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob * | |||
878 | PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg, | |||
879 | PRBool joinable) | |||
880 | { | |||
881 | return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE)); | |||
882 | } | |||
883 | ||||
884 | ||||
885 | /* queue a job, when a socket has a pending connection */ | |||
886 | PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob * | |||
887 | PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, | |||
888 | void * arg, PRBool joinable) | |||
889 | { | |||
890 | return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT)); | |||
891 | } | |||
892 | ||||
893 | /* queue a job, when a socket can be connected */ | |||
894 | PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob * | |||
895 | PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod, | |||
896 | const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable) | |||
897 | { | |||
898 | PRStatus rv; | |||
899 | PRErrorCode err; | |||
900 | ||||
901 | rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT0UL); | |||
902 | if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR(-5934L))) { | |||
903 | /* connection pending */ | |||
904 | return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT)); | |||
905 | } | |||
906 | /* | |||
907 | * connection succeeded or failed; add to jobq right away | |||
908 | */ | |||
909 | if (rv == PR_FAILURE) { | |||
910 | iod->error = err; | |||
911 | } | |||
912 | else { | |||
913 | iod->error = 0; | |||
914 | } | |||
915 | return(PR_QueueJob(tpool, fn, arg, joinable)); | |||
916 | ||||
917 | } | |||
918 | ||||
919 | /* queue a job, when a timer expires */ | |||
920 | PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob * | |||
921 | PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout, | |||
922 | PRJobFn fn, void * arg, PRBool joinable) | |||
923 | { | |||
924 | PRIntervalTime now; | |||
925 | PRJob *jobp; | |||
926 | ||||
927 | if (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == timeout) { | |||
928 | PR_SetError(PR_INVALID_ARGUMENT_ERROR(-5987L), 0); | |||
929 | return NULL((void*)0); | |||
930 | } | |||
931 | if (PR_INTERVAL_NO_WAIT0UL == timeout) { | |||
932 | /* | |||
933 | * no waiting; add to jobq right away | |||
934 | */ | |||
935 | return(PR_QueueJob(tpool, fn, arg, joinable)); | |||
936 | } | |||
937 | jobp = alloc_job(joinable, tpool); | |||
938 | if (NULL((void*)0) == jobp) { | |||
939 | return NULL((void*)0); | |||
940 | } | |||
941 | ||||
942 | /* | |||
943 | * Add a new job to timer_jobq | |||
944 | * wakeup timer worker thread | |||
945 | */ | |||
946 | ||||
947 | jobp->job_func = fn; | |||
948 | jobp->job_arg = arg; | |||
949 | jobp->tpool = tpool; | |||
950 | jobp->timeout = timeout; | |||
951 | ||||
952 | now = PR_IntervalNow(); | |||
953 | jobp->absolute = now + timeout; | |||
954 | ||||
955 | ||||
956 | PR_Lock(tpool->timerq.lock); | |||
957 | jobp->on_timerq = PR_TRUE1; | |||
958 | if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)((&tpool->timerq.list)->next == (&tpool->timerq .list))) { | |||
959 | PR_APPEND_LINK(&jobp->links,&tpool->timerq.list)do { (&jobp->links)->next = (&tpool->timerq. list); (&jobp->links)->prev = (&tpool->timerq .list)->prev; (&tpool->timerq.list)->prev->next = (&jobp->links); (&tpool->timerq.list)->prev = (&jobp->links); } while (0); | |||
960 | } | |||
961 | else { | |||
962 | PRCList *qp; | |||
963 | PRJob *tmp_jobp; | |||
964 | /* | |||
965 | * insert into the sorted timer jobq | |||
966 | */ | |||
967 | for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list; | |||
968 | qp = qp->prev) { | |||
969 | tmp_jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links)) ); | |||
970 | if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) { | |||
971 | break; | |||
972 | } | |||
973 | } | |||
974 | PR_INSERT_AFTER(&jobp->links,qp)do { (&jobp->links)->next = (qp)->next; (&jobp ->links)->prev = (qp); (qp)->next->prev = (&jobp ->links); (qp)->next = (&jobp->links); } while ( 0); | |||
975 | } | |||
976 | tpool->timerq.cnt++; | |||
977 | /* | |||
978 | * notify timer worker thread(s) | |||
979 | */ | |||
980 | notify_timerq(tpool); | |||
981 | PR_Unlock(tpool->timerq.lock); | |||
982 | return jobp; | |||
983 | } | |||
984 | ||||
985 | static void | |||
986 | notify_timerq(PRThreadPool *tp) | |||
987 | { | |||
988 | /* | |||
989 | * wakeup the timer thread(s) | |||
990 | */ | |||
991 | PR_NotifyCondVar(tp->timerq.cv); | |||
992 | } | |||
993 | ||||
994 | static void | |||
995 | notify_ioq(PRThreadPool *tp) | |||
996 | { | |||
997 | PRStatus rval_status; | |||
998 | ||||
999 | /* | |||
1000 | * wakeup the io thread(s) | |||
1001 | */ | |||
1002 | rval_status = PR_SetPollableEvent(tp->ioq.notify_fd); | |||
1003 | PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status" ,"../../../../pr/src/misc/prtpool.c",1003)); | |||
1004 | } | |||
1005 | ||||
1006 | /* | |||
1007 | * cancel a job | |||
1008 | * | |||
1009 | * XXXX: is this needed? likely to be removed | |||
1010 | */ | |||
1011 | PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus | |||
1012 | PR_CancelJob(PRJob *jobp) { | |||
1013 | ||||
1014 | PRStatus rval = PR_FAILURE; | |||
1015 | PRThreadPool *tp; | |||
1016 | ||||
1017 | if (jobp->on_timerq) { | |||
1018 | /* | |||
1019 | * now, check again while holding the timerq lock | |||
1020 | */ | |||
1021 | tp = jobp->tpool; | |||
1022 | PR_Lock(tp->timerq.lock); | |||
1023 | if (jobp->on_timerq) { | |||
1024 | jobp->on_timerq = PR_FALSE0; | |||
1025 | PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp-> links)->next; (&jobp->links)->next->prev = (& jobp->links)->prev; (&jobp->links)->next = (& jobp->links); (&jobp->links)->prev = (&jobp-> links); } while (0); | |||
1026 | tp->timerq.cnt--; | |||
1027 | PR_Unlock(tp->timerq.lock); | |||
1028 | if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) { | |||
1029 | delete_job(jobp); | |||
1030 | } else { | |||
1031 | JOIN_NOTIFY(jobp)do { PR_Lock(jobp->tpool->join_lock); jobp->join_wait = 0; PR_NotifyCondVar(jobp->join_cv); PR_Unlock(jobp-> tpool->join_lock); } while (0); | |||
1032 | } | |||
1033 | rval = PR_SUCCESS; | |||
1034 | } else { | |||
1035 | PR_Unlock(tp->timerq.lock); | |||
1036 | } | |||
1037 | } else if (jobp->on_ioq) { | |||
1038 | /* | |||
1039 | * now, check again while holding the ioq lock | |||
1040 | */ | |||
1041 | tp = jobp->tpool; | |||
1042 | PR_Lock(tp->ioq.lock); | |||
1043 | if (jobp->on_ioq) { | |||
1044 | jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock); | |||
1045 | if (NULL((void*)0) == jobp->cancel_cv) { | |||
1046 | PR_Unlock(tp->ioq.lock); | |||
1047 | PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR(-5974L), 0); | |||
1048 | return PR_FAILURE; | |||
1049 | } | |||
1050 | /* | |||
1051 | * mark job 'cancelled' and notify io thread(s) | |||
1052 | * XXXX: | |||
1053 | * this assumes there is only one io thread; when there | |||
1054 | * are multiple threads, the io thread processing this job | |||
1055 | * must be notified. | |||
1056 | */ | |||
1057 | jobp->cancel_io = PR_TRUE1; | |||
1058 | PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */ | |||
1059 | notify_ioq(tp); | |||
1060 | PR_Lock(tp->ioq.lock); | |||
1061 | while (jobp->cancel_io) { | |||
1062 | PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL); | |||
1063 | } | |||
1064 | PR_Unlock(tp->ioq.lock); | |||
1065 | PR_ASSERT(!jobp->on_ioq)((!jobp->on_ioq)?((void)0):PR_Assert("!jobp->on_ioq","../../../../pr/src/misc/prtpool.c" ,1065)); | |||
1066 | if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) { | |||
1067 | delete_job(jobp); | |||
1068 | } else { | |||
1069 | JOIN_NOTIFY(jobp)do { PR_Lock(jobp->tpool->join_lock); jobp->join_wait = 0; PR_NotifyCondVar(jobp->join_cv); PR_Unlock(jobp-> tpool->join_lock); } while (0); | |||
1070 | } | |||
1071 | rval = PR_SUCCESS; | |||
1072 | } else { | |||
1073 | PR_Unlock(tp->ioq.lock); | |||
1074 | } | |||
1075 | } | |||
1076 | if (PR_FAILURE == rval) { | |||
1077 | PR_SetError(PR_INVALID_STATE_ERROR(-5931L), 0); | |||
1078 | } | |||
1079 | return rval; | |||
1080 | } | |||
1081 | ||||
1082 | /* join a job, wait until completion */ | |||
1083 | PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus | |||
1084 | PR_JoinJob(PRJob *jobp) | |||
1085 | { | |||
1086 | if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) { | |||
1087 | PR_SetError(PR_INVALID_ARGUMENT_ERROR(-5987L), 0); | |||
1088 | return PR_FAILURE; | |||
1089 | } | |||
1090 | PR_Lock(jobp->tpool->join_lock); | |||
1091 | while(jobp->join_wait) { | |||
1092 | PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL); | |||
1093 | } | |||
1094 | PR_Unlock(jobp->tpool->join_lock); | |||
1095 | delete_job(jobp); | |||
1096 | return PR_SUCCESS; | |||
1097 | } | |||
1098 | ||||
1099 | /* shutdown threadpool */ | |||
1100 | PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus | |||
1101 | PR_ShutdownThreadPool(PRThreadPool *tpool) | |||
1102 | { | |||
1103 | PRStatus rval = PR_SUCCESS; | |||
1104 | ||||
1105 | PR_Lock(tpool->jobq.lock); | |||
1106 | tpool->shutdown = PR_TRUE1; | |||
1107 | PR_NotifyAllCondVar(tpool->shutdown_cv); | |||
1108 | PR_Unlock(tpool->jobq.lock); | |||
1109 | ||||
1110 | return rval; | |||
1111 | } | |||
1112 | ||||
1113 | /* | |||
1114 | * join thread pool | |||
1115 | * wait for termination of worker threads | |||
1116 | * reclaim threadpool resources | |||
1117 | */ | |||
1118 | PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus | |||
1119 | PR_JoinThreadPool(PRThreadPool *tpool) | |||
1120 | { | |||
1121 | PRStatus rval = PR_SUCCESS; | |||
1122 | PRCList *head; | |||
1123 | PRStatus rval_status; | |||
1124 | ||||
1125 | PR_Lock(tpool->jobq.lock); | |||
1126 | while (!tpool->shutdown) { | |||
1127 | PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL); | |||
1128 | } | |||
1129 | ||||
1130 | /* | |||
1131 | * wakeup worker threads | |||
1132 | */ | |||
1133 | #ifdef OPT_WINNT | |||
1134 | /* | |||
1135 | * post shutdown notification for all threads | |||
1136 | */ | |||
1137 | { | |||
1138 | int i; | |||
1139 | for(i=0; i < tpool->current_threads; i++) { | |||
1140 | PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0, | |||
1141 | TRUE, NULL((void*)0)); | |||
1142 | } | |||
1143 | } | |||
1144 | #else | |||
1145 | PR_NotifyAllCondVar(tpool->jobq.cv); | |||
1146 | #endif | |||
1147 | ||||
1148 | /* | |||
1149 | * wakeup io thread(s) | |||
1150 | */ | |||
1151 | notify_ioq(tpool); | |||
1152 | ||||
1153 | /* | |||
1154 | * wakeup timer thread(s) | |||
1155 | */ | |||
1156 | PR_Lock(tpool->timerq.lock); | |||
1157 | notify_timerq(tpool); | |||
1158 | PR_Unlock(tpool->timerq.lock); | |||
1159 | ||||
1160 | while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)((&tpool->jobq.wthreads)->next == (&tpool->jobq .wthreads))) { | |||
1161 | wthread *wthrp; | |||
1162 | ||||
1163 | head = PR_LIST_HEAD(&tpool->jobq.wthreads)(&tpool->jobq.wthreads)->next; | |||
1164 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
1165 | PR_Unlock(tpool->jobq.lock); | |||
1166 | wthrp = WTHREAD_LINKS_PTR(head)((wthread *) ((char *) (head) - __builtin_offsetof(wthread, links ))); | |||
1167 | rval_status = PR_JoinThread(wthrp->thread); | |||
1168 | PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status" ,"../../../../pr/src/misc/prtpool.c",1168)); | |||
1169 | PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); }; | |||
1170 | PR_Lock(tpool->jobq.lock); | |||
1171 | } | |||
1172 | PR_Unlock(tpool->jobq.lock); | |||
1173 | while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)((&tpool->ioq.wthreads)->next == (&tpool->ioq .wthreads))) { | |||
1174 | wthread *wthrp; | |||
1175 | ||||
1176 | head = PR_LIST_HEAD(&tpool->ioq.wthreads)(&tpool->ioq.wthreads)->next; | |||
1177 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
1178 | wthrp = WTHREAD_LINKS_PTR(head)((wthread *) ((char *) (head) - __builtin_offsetof(wthread, links ))); | |||
1179 | rval_status = PR_JoinThread(wthrp->thread); | |||
1180 | PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status" ,"../../../../pr/src/misc/prtpool.c",1180)); | |||
1181 | PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); }; | |||
1182 | } | |||
1183 | ||||
1184 | while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)((&tpool->timerq.wthreads)->next == (&tpool-> timerq.wthreads))) { | |||
1185 | wthread *wthrp; | |||
1186 | ||||
1187 | head = PR_LIST_HEAD(&tpool->timerq.wthreads)(&tpool->timerq.wthreads)->next; | |||
1188 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
1189 | wthrp = WTHREAD_LINKS_PTR(head)((wthread *) ((char *) (head) - __builtin_offsetof(wthread, links ))); | |||
1190 | rval_status = PR_JoinThread(wthrp->thread); | |||
1191 | PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status" ,"../../../../pr/src/misc/prtpool.c",1191)); | |||
1192 | PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); }; | |||
1193 | } | |||
1194 | ||||
1195 | /* | |||
1196 | * Delete queued jobs | |||
1197 | */ | |||
1198 | while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)((&tpool->jobq.list)->next == (&tpool->jobq. list))) { | |||
1199 | PRJob *jobp; | |||
1200 | ||||
1201 | head = PR_LIST_HEAD(&tpool->jobq.list)(&tpool->jobq.list)->next; | |||
1202 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
1203 | jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links ))); | |||
1204 | tpool->jobq.cnt--; | |||
1205 | delete_job(jobp); | |||
1206 | } | |||
1207 | ||||
1208 | /* delete io jobs */ | |||
1209 | while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)((&tpool->ioq.list)->next == (&tpool->ioq.list ))) { | |||
1210 | PRJob *jobp; | |||
1211 | ||||
1212 | head = PR_LIST_HEAD(&tpool->ioq.list)(&tpool->ioq.list)->next; | |||
1213 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
1214 | tpool->ioq.cnt--; | |||
1215 | jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links ))); | |||
1216 | delete_job(jobp); | |||
1217 | } | |||
1218 | ||||
1219 | /* delete timer jobs */ | |||
1220 | while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)((&tpool->timerq.list)->next == (&tpool->timerq .list))) { | |||
1221 | PRJob *jobp; | |||
1222 | ||||
1223 | head = PR_LIST_HEAD(&tpool->timerq.list)(&tpool->timerq.list)->next; | |||
1224 | PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next ->prev = (head)->prev; (head)->next = (head); (head) ->prev = (head); } while (0); | |||
1225 | tpool->timerq.cnt--; | |||
1226 | jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links ))); | |||
1227 | delete_job(jobp); | |||
1228 | } | |||
1229 | ||||
1230 | PR_ASSERT(0 == tpool->jobq.cnt)((0 == tpool->jobq.cnt)?((void)0):PR_Assert("0 == tpool->jobq.cnt" ,"../../../../pr/src/misc/prtpool.c",1230)); | |||
1231 | PR_ASSERT(0 == tpool->ioq.cnt)((0 == tpool->ioq.cnt)?((void)0):PR_Assert("0 == tpool->ioq.cnt" ,"../../../../pr/src/misc/prtpool.c",1231)); | |||
1232 | PR_ASSERT(0 == tpool->timerq.cnt)((0 == tpool->timerq.cnt)?((void)0):PR_Assert("0 == tpool->timerq.cnt" ,"../../../../pr/src/misc/prtpool.c",1232)); | |||
1233 | ||||
1234 | delete_threadpool(tpool); | |||
1235 | return rval; | |||
1236 | } |