Bug Summary

File:pr/Linux4.19_x86_64_gcc_glibc_PTH_64_DBG.OBJ/pr/src/misc/../../../../pr/src/misc/prtpool.c
Warning:line 329, column 34
Dereference of null pointer

Annotated Source Code

Press '?' to see keyboard shortcuts

clang -cc1 -cc1 -triple x86_64-pc-linux-gnu -analyze -disable-free -clear-ast-before-backend -disable-llvm-verifier -discard-value-names -main-file-name prtpool.c -analyzer-checker=core -analyzer-checker=apiModeling -analyzer-checker=unix -analyzer-checker=deadcode -analyzer-checker=security.insecureAPI.UncheckedReturn -analyzer-checker=security.insecureAPI.getpw -analyzer-checker=security.insecureAPI.gets -analyzer-checker=security.insecureAPI.mktemp -analyzer-checker=security.insecureAPI.mkstemp -analyzer-checker=security.insecureAPI.vfork -analyzer-checker=nullability.NullPassedToNonnull -analyzer-checker=nullability.NullReturnedFromNonnull -analyzer-output plist -w -setup-static-analyzer -analyzer-config-compatibility-mode=true -mrelocation-model pic -pic-level 2 -fhalf-no-semantic-interposition -mframe-pointer=all -fmath-errno -ffp-contract=on -fno-rounding-math -mconstructor-aliases -funwind-tables=2 -target-cpu x86-64 -tune-cpu generic -debugger-tuning=gdb -fdebug-compilation-dir=/var/lib/jenkins/workspace/nss-scan-build/nspr/Linux4.19_x86_64_gcc_glibc_PTH_64_DBG.OBJ/pr/src/misc -fcoverage-compilation-dir=/var/lib/jenkins/workspace/nss-scan-build/nspr/Linux4.19_x86_64_gcc_glibc_PTH_64_DBG.OBJ/pr/src/misc -resource-dir /usr/lib/llvm-18/lib/clang/18 -U NDEBUG -D DEBUG_jenkins -D PACKAGE_NAME="" -D PACKAGE_TARNAME="" -D PACKAGE_VERSION="" -D PACKAGE_STRING="" -D PACKAGE_BUGREPORT="" -D PACKAGE_URL="" -D DEBUG=1 -D HAVE_VISIBILITY_HIDDEN_ATTRIBUTE=1 -D HAVE_VISIBILITY_PRAGMA=1 -D XP_UNIX=1 -D _GNU_SOURCE=1 -D HAVE_FCNTL_FILE_LOCKING=1 -D HAVE_POINTER_LOCALTIME_R=1 -D LINUX=1 -D HAVE_DLADDR=1 -D HAVE_GETTID=1 -D HAVE_LCHOWN=1 -D HAVE_SETPRIORITY=1 -D HAVE_STRERROR=1 -D HAVE_SYSCALL=1 -D HAVE_SECURE_GETENV=1 -D _REENTRANT=1 -D FORCE_PR_LOG -D _PR_PTHREADS -U HAVE_CVAR_BUILT_ON_SEM -D _NSPR_BUILD_ -I /var/lib/jenkins/workspace/nss-scan-build/nss/../dist/Linux4.19_x86_64_gcc_glibc_PTH_64_DBG.OBJ/include -I ../../../../pr/include -I ../../../../pr/include/private -internal-isystem /usr/lib/llvm-18/lib/clang/18/include -internal-isystem /usr/local/include -internal-isystem /usr/lib/gcc/x86_64-linux-gnu/14/../../../../x86_64-linux-gnu/include -internal-externc-isystem /usr/include/x86_64-linux-gnu -internal-externc-isystem /include -internal-externc-isystem /usr/include -ferror-limit 19 -fvisibility=hidden -fgnuc-version=4.2.1 -fno-inline -analyzer-output=html -analyzer-config stable-report-filename=true -faddrsig -D__GCC_HAVE_DWARF2_CFI_ASM=1 -o /tmp/scan-build-2024-05-18-082241-28900-1 -x c ../../../../pr/src/misc/prtpool.c
1/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2/* This Source Code Form is subject to the terms of the Mozilla Public
3 * License, v. 2.0. If a copy of the MPL was not distributed with this
4 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
5
6#include "nspr.h"
7
8/*
9 * Thread pools
10 * Thread pools create and manage threads to provide support for
11 * scheduling jobs onto one or more threads.
12 *
13 */
14#ifdef OPT_WINNT
15#include <windows.h>
16#endif
17
18/*
19 * worker thread
20 */
21typedef struct wthread {
22 PRCList links;
23 PRThread *thread;
24} wthread;
25
26/*
27 * queue of timer jobs
28 */
29typedef struct timer_jobq {
30 PRCList list;
31 PRLock *lock;
32 PRCondVar *cv;
33 PRInt32 cnt;
34 PRCList wthreads;
35} timer_jobq;
36
37/*
38 * queue of jobs
39 */
40typedef struct tp_jobq {
41 PRCList list;
42 PRInt32 cnt;
43 PRLock *lock;
44 PRCondVar *cv;
45 PRCList wthreads;
46#ifdef OPT_WINNT
47 HANDLE nt_completion_port;
48#endif
49} tp_jobq;
50
51/*
52 * queue of IO jobs
53 */
54typedef struct io_jobq {
55 PRCList list;
56 PRPollDesc *pollfds;
57 PRInt32 npollfds;
58 PRJob **polljobs;
59 PRLock *lock;
60 PRInt32 cnt;
61 PRFileDesc *notify_fd;
62 PRCList wthreads;
63} io_jobq;
64
65/*
66 * Threadpool
67 */
68struct PRThreadPool {
69 PRInt32 init_threads;
70 PRInt32 max_threads;
71 PRInt32 current_threads;
72 PRInt32 idle_threads;
73 PRUint32 stacksize;
74 tp_jobq jobq;
75 io_jobq ioq;
76 timer_jobq timerq;
77 PRLock *join_lock; /* used with jobp->join_cv */
78 PRCondVar *shutdown_cv;
79 PRBool shutdown;
80};
81
82typedef enum io_op_type
83{ JOB_IO_READ, JOB_IO_WRITE, JOB_IO_CONNECT, JOB_IO_ACCEPT } io_op_type;
84
85#ifdef OPT_WINNT
86typedef struct NT_notifier {
87 OVERLAPPED overlapped; /* must be first */
88 PRJob *jobp;
89} NT_notifier;
90#endif
91
92struct PRJob {
93 PRCList links; /* for linking jobs */
94 PRBool on_ioq; /* job on ioq */
95 PRBool on_timerq; /* job on timerq */
96 PRJobFn job_func;
97 void *job_arg;
98 PRCondVar *join_cv;
99 PRBool join_wait; /* == PR_TRUE, when waiting to join */
100 PRCondVar *cancel_cv; /* for cancelling IO jobs */
101 PRBool cancel_io; /* for cancelling IO jobs */
102 PRThreadPool *tpool; /* back pointer to thread pool */
103 PRJobIoDesc *iod;
104 io_op_type io_op;
105 PRInt16 io_poll_flags;
106 PRNetAddr *netaddr;
107 PRIntervalTime timeout; /* relative value */
108 PRIntervalTime absolute;
109#ifdef OPT_WINNT
110 NT_notifier nt_notifier;
111#endif
112};
113
114#define JOB_LINKS_PTR(_qp)((PRJob *) ((char *) (_qp) - __builtin_offsetof(PRJob, links)
))
\
115 ((PRJob *) ((char *) (_qp) - offsetof(PRJob, links)__builtin_offsetof(PRJob, links)))
116
117#define WTHREAD_LINKS_PTR(_qp)((wthread *) ((char *) (_qp) - __builtin_offsetof(wthread, links
)))
\
118 ((wthread *) ((char *) (_qp) - offsetof(wthread, links)__builtin_offsetof(wthread, links)))
119
120#define JOINABLE_JOB(_jobp)(((void*)0) != (_jobp)->join_cv) (NULL((void*)0) != (_jobp)->join_cv)
121
122#define JOIN_NOTIFY(_jobp)do { PR_Lock(_jobp->tpool->join_lock); _jobp->join_wait
= 0; PR_NotifyCondVar(_jobp->join_cv); PR_Unlock(_jobp->
tpool->join_lock); } while (0)
\
123 PR_BEGIN_MACROdo { \
124 PR_Lock(_jobp->tpool->join_lock); \
125 _jobp->join_wait = PR_FALSE0; \
126 PR_NotifyCondVar(_jobp->join_cv); \
127 PR_Unlock(_jobp->tpool->join_lock); \
128 PR_END_MACRO} while (0)
129
130#define CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (&
jobp->links)->prev->next = (&jobp->links)->
next; (&jobp->links)->next->prev = (&jobp->
links)->prev; (&jobp->links)->next = (&jobp->
links); (&jobp->links)->prev = (&jobp->links
); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv
); } while (0)
\
131 PR_BEGIN_MACROdo { \
132 jobp->cancel_io = PR_FALSE0; \
133 jobp->on_ioq = PR_FALSE0; \
134 PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp->
links)->next; (&jobp->links)->next->prev = (&
jobp->links)->prev; (&jobp->links)->next = (&
jobp->links); (&jobp->links)->prev = (&jobp->
links); } while (0)
; \
135 tp->ioq.cnt--; \
136 PR_NotifyCondVar(jobp->cancel_cv); \
137 PR_END_MACRO} while (0)
138
139static void delete_job(PRJob *jobp);
140static PRThreadPool * alloc_threadpool(void);
141static PRJob * alloc_job(PRBool joinable, PRThreadPool *tp);
142static void notify_ioq(PRThreadPool *tp);
143static void notify_timerq(PRThreadPool *tp);
144
145/*
146 * locks are acquired in the following order
147 *
148 * tp->ioq.lock,tp->timerq.lock
149 * |
150 * V
151 * tp->jobq->lock
152 */
153
154/*
155 * worker thread function
156 */
157static void wstart(void *arg)
158{
159 PRThreadPool *tp = (PRThreadPool *) arg;
160 PRCList *head;
161
162 /*
163 * execute jobs until shutdown
164 */
165 while (!tp->shutdown) {
166 PRJob *jobp;
167#ifdef OPT_WINNT
168 BOOL rv;
169 DWORD unused, shutdown;
170 LPOVERLAPPED olp;
171
172 PR_Lock(tp->jobq.lock);
173 tp->idle_threads++;
174 PR_Unlock(tp->jobq.lock);
175 rv = GetQueuedCompletionStatus(tp->jobq.nt_completion_port,
176 &unused, &shutdown, &olp, INFINITE);
177
178 PR_ASSERT(rv)((rv)?((void)0):PR_Assert("rv","../../../../pr/src/misc/prtpool.c"
,178))
;
179 if (shutdown) {
180 break;
181 }
182 jobp = ((NT_notifier *) olp)->jobp;
183 PR_Lock(tp->jobq.lock);
184 tp->idle_threads--;
185 tp->jobq.cnt--;
186 PR_Unlock(tp->jobq.lock);
187#else
188
189 PR_Lock(tp->jobq.lock);
190 while (PR_CLIST_IS_EMPTY(&tp->jobq.list)((&tp->jobq.list)->next == (&tp->jobq.list)) && (!tp->shutdown)) {
191 tp->idle_threads++;
192 PR_WaitCondVar(tp->jobq.cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL);
193 tp->idle_threads--;
194 }
195 if (tp->shutdown) {
196 PR_Unlock(tp->jobq.lock);
197 break;
198 }
199 head = PR_LIST_HEAD(&tp->jobq.list)(&tp->jobq.list)->next;
200 /*
201 * remove job from queue
202 */
203 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
204 tp->jobq.cnt--;
205 jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links
)))
;
206 PR_Unlock(tp->jobq.lock);
207#endif
208
209 jobp->job_func(jobp->job_arg);
210 if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) {
211 delete_job(jobp);
212 } else {
213 JOIN_NOTIFY(jobp)do { PR_Lock(jobp->tpool->join_lock); jobp->join_wait
= 0; PR_NotifyCondVar(jobp->join_cv); PR_Unlock(jobp->
tpool->join_lock); } while (0)
;
214 }
215 }
216 PR_Lock(tp->jobq.lock);
217 tp->current_threads--;
218 PR_Unlock(tp->jobq.lock);
219}
220
221/*
222 * add a job to the work queue
223 */
224static void
225add_to_jobq(PRThreadPool *tp, PRJob *jobp)
226{
227 /*
228 * add to jobq
229 */
230#ifdef OPT_WINNT
231 PR_Lock(tp->jobq.lock);
232 tp->jobq.cnt++;
233 PR_Unlock(tp->jobq.lock);
234 /*
235 * notify worker thread(s)
236 */
237 PostQueuedCompletionStatus(tp->jobq.nt_completion_port, 0,
238 FALSE, &jobp->nt_notifier.overlapped);
239#else
240 PR_Lock(tp->jobq.lock);
241 PR_APPEND_LINK(&jobp->links,&tp->jobq.list)do { (&jobp->links)->next = (&tp->jobq.list)
; (&jobp->links)->prev = (&tp->jobq.list)->
prev; (&tp->jobq.list)->prev->next = (&jobp->
links); (&tp->jobq.list)->prev = (&jobp->links
); } while (0)
;
242 tp->jobq.cnt++;
243 if ((tp->idle_threads < tp->jobq.cnt) &&
244 (tp->current_threads < tp->max_threads)) {
245 wthread *wthrp;
246 /*
247 * increment thread count and unlock the jobq lock
248 */
249 tp->current_threads++;
250 PR_Unlock(tp->jobq.lock);
251 /* create new worker thread */
252 wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread)));
253 if (wthrp) {
254 wthrp->thread = PR_CreateThread(PR_USER_THREAD, wstart,
255 tp, PR_PRIORITY_NORMAL,
256 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,tp->stacksize);
257 if (NULL((void*)0) == wthrp->thread) {
258 PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); }; /* this sets wthrp to NULL */
259 }
260 }
261 PR_Lock(tp->jobq.lock);
262 if (NULL((void*)0) == wthrp) {
263 tp->current_threads--;
264 } else {
265 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads)do { (&wthrp->links)->next = (&tp->jobq.wthreads
); (&wthrp->links)->prev = (&tp->jobq.wthreads
)->prev; (&tp->jobq.wthreads)->prev->next = (
&wthrp->links); (&tp->jobq.wthreads)->prev =
(&wthrp->links); } while (0)
;
266 }
267 }
268 /*
269 * wakeup a worker thread
270 */
271 PR_NotifyCondVar(tp->jobq.cv);
272 PR_Unlock(tp->jobq.lock);
273#endif
274}
275
276/*
277 * io worker thread function
278 */
279static void io_wstart(void *arg)
280{
281 PRThreadPool *tp = (PRThreadPool *) arg;
282 int pollfd_cnt, pollfds_used;
283 int rv;
284 PRCList *qp, *nextqp;
285 PRPollDesc *pollfds = NULL((void*)0);
1
'pollfds' initialized to a null pointer value
286 PRJob **polljobs = NULL((void*)0);
287 int poll_timeout;
288 PRIntervalTime now;
289
290 /*
291 * scan io_jobq
292 * construct poll list
293 * call PR_Poll
294 * for all fds, for which poll returns true, move the job to
295 * jobq and wakeup worker thread.
296 */
297 while (!tp->shutdown) {
2
Assuming field 'shutdown' is 0
3
Loop condition is true. Entering loop body
298 PRJob *jobp;
299
300 pollfd_cnt = tp->ioq.cnt + 10;
301 if (pollfd_cnt > tp->ioq.npollfds) {
4
Assuming 'pollfd_cnt' is <= field 'npollfds'
5
Taking false branch
302
303 /*
304 * re-allocate pollfd array if the current one is not large
305 * enough
306 */
307 if (NULL((void*)0) != tp->ioq.pollfds) {
308 PR_Free(tp->ioq.pollfds);
309 }
310 tp->ioq.pollfds = (PRPollDesc *) PR_Malloc(pollfd_cnt *
311 (sizeof(PRPollDesc) + sizeof(PRJob *)));
312 PR_ASSERT(NULL != tp->ioq.pollfds)((((void*)0) != tp->ioq.pollfds)?((void)0):PR_Assert("NULL != tp->ioq.pollfds"
,"../../../../pr/src/misc/prtpool.c",312))
;
313 /*
314 * array of pollfds
315 */
316 pollfds = tp->ioq.pollfds;
317 tp->ioq.polljobs = (PRJob **) (&tp->ioq.pollfds[pollfd_cnt]);
318 /*
319 * parallel array of jobs
320 */
321 polljobs = tp->ioq.polljobs;
322 tp->ioq.npollfds = pollfd_cnt;
323 }
324
325 pollfds_used = 0;
326 /*
327 * add the notify fd; used for unblocking io thread(s)
328 */
329 pollfds[pollfds_used].fd = tp->ioq.notify_fd;
6
Dereference of null pointer
330 pollfds[pollfds_used].in_flags = PR_POLL_READ0x1;
331 pollfds[pollfds_used].out_flags = 0;
332 polljobs[pollfds_used] = NULL((void*)0);
333 pollfds_used++;
334 /*
335 * fill in the pollfd array
336 */
337 PR_Lock(tp->ioq.lock);
338 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
339 nextqp = qp->next;
340 jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
341 if (jobp->cancel_io) {
342 CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (&
jobp->links)->prev->next = (&jobp->links)->
next; (&jobp->links)->next->prev = (&jobp->
links)->prev; (&jobp->links)->next = (&jobp->
links); (&jobp->links)->prev = (&jobp->links
); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv
); } while (0)
;
343 continue;
344 }
345 if (pollfds_used == (pollfd_cnt)) {
346 break;
347 }
348 pollfds[pollfds_used].fd = jobp->iod->socket;
349 pollfds[pollfds_used].in_flags = jobp->io_poll_flags;
350 pollfds[pollfds_used].out_flags = 0;
351 polljobs[pollfds_used] = jobp;
352
353 pollfds_used++;
354 }
355 if (!PR_CLIST_IS_EMPTY(&tp->ioq.list)((&tp->ioq.list)->next == (&tp->ioq.list))) {
356 qp = tp->ioq.list.next;
357 jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
358 if (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == jobp->timeout) {
359 poll_timeout = PR_INTERVAL_NO_TIMEOUT0xffffffffUL;
360 }
361 else if (PR_INTERVAL_NO_WAIT0UL == jobp->timeout) {
362 poll_timeout = PR_INTERVAL_NO_WAIT0UL;
363 }
364 else {
365 poll_timeout = jobp->absolute - PR_IntervalNow();
366 if (poll_timeout <= 0) { /* already timed out */
367 poll_timeout = PR_INTERVAL_NO_WAIT0UL;
368 }
369 }
370 } else {
371 poll_timeout = PR_INTERVAL_NO_TIMEOUT0xffffffffUL;
372 }
373 PR_Unlock(tp->ioq.lock);
374
375 /*
376 * XXXX
377 * should retry if more jobs have been added to the queue?
378 *
379 */
380 PR_ASSERT(pollfds_used <= pollfd_cnt)((pollfds_used <= pollfd_cnt)?((void)0):PR_Assert("pollfds_used <= pollfd_cnt"
,"../../../../pr/src/misc/prtpool.c",380))
;
381 rv = PR_Poll(tp->ioq.pollfds, pollfds_used, poll_timeout);
382
383 if (tp->shutdown) {
384 break;
385 }
386
387 if (rv > 0) {
388 /*
389 * at least one io event is set
390 */
391 PRStatus rval_status;
392 PRInt32 index;
393
394 PR_ASSERT(pollfds[0].fd == tp->ioq.notify_fd)((pollfds[0].fd == tp->ioq.notify_fd)?((void)0):PR_Assert(
"pollfds[0].fd == tp->ioq.notify_fd","../../../../pr/src/misc/prtpool.c"
,394))
;
395 /*
396 * reset the pollable event, if notified
397 */
398 if (pollfds[0].out_flags & PR_POLL_READ0x1) {
399 rval_status = PR_WaitForPollableEvent(tp->ioq.notify_fd);
400 PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status"
,"../../../../pr/src/misc/prtpool.c",400))
;
401 }
402
403 for(index = 1; index < (pollfds_used); index++) {
404 PRInt16 events = pollfds[index].in_flags;
405 PRInt16 revents = pollfds[index].out_flags;
406 jobp = polljobs[index];
407
408 if ((revents & PR_POLL_NVAL0x10) || /* busted in all cases */
409 (revents & PR_POLL_ERR0x8) ||
410 ((events & PR_POLL_WRITE0x2) &&
411 (revents & PR_POLL_HUP0x20))) { /* write op & hup */
412 PR_Lock(tp->ioq.lock);
413 if (jobp->cancel_io) {
414 CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (&
jobp->links)->prev->next = (&jobp->links)->
next; (&jobp->links)->next->prev = (&jobp->
links)->prev; (&jobp->links)->next = (&jobp->
links); (&jobp->links)->prev = (&jobp->links
); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv
); } while (0)
;
415 PR_Unlock(tp->ioq.lock);
416 continue;
417 }
418 PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp->
links)->next; (&jobp->links)->next->prev = (&
jobp->links)->prev; (&jobp->links)->next = (&
jobp->links); (&jobp->links)->prev = (&jobp->
links); } while (0)
;
419 tp->ioq.cnt--;
420 jobp->on_ioq = PR_FALSE0;
421 PR_Unlock(tp->ioq.lock);
422
423 /* set error */
424 if (PR_POLL_NVAL0x10 & revents) {
425 jobp->iod->error = PR_BAD_DESCRIPTOR_ERROR(-5999L);
426 }
427 else if (PR_POLL_HUP0x20 & revents) {
428 jobp->iod->error = PR_CONNECT_RESET_ERROR(-5961L);
429 }
430 else {
431 jobp->iod->error = PR_IO_ERROR(-5991L);
432 }
433
434 /*
435 * add to jobq
436 */
437 add_to_jobq(tp, jobp);
438 } else if (revents) {
439 /*
440 * add to jobq
441 */
442 PR_Lock(tp->ioq.lock);
443 if (jobp->cancel_io) {
444 CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (&
jobp->links)->prev->next = (&jobp->links)->
next; (&jobp->links)->next->prev = (&jobp->
links)->prev; (&jobp->links)->next = (&jobp->
links); (&jobp->links)->prev = (&jobp->links
); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv
); } while (0)
;
445 PR_Unlock(tp->ioq.lock);
446 continue;
447 }
448 PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp->
links)->next; (&jobp->links)->next->prev = (&
jobp->links)->prev; (&jobp->links)->next = (&
jobp->links); (&jobp->links)->prev = (&jobp->
links); } while (0)
;
449 tp->ioq.cnt--;
450 jobp->on_ioq = PR_FALSE0;
451 PR_Unlock(tp->ioq.lock);
452
453 if (jobp->io_op == JOB_IO_CONNECT) {
454 if (PR_GetConnectStatus(&pollfds[index]) == PR_SUCCESS) {
455 jobp->iod->error = 0;
456 }
457 else {
458 jobp->iod->error = PR_GetError();
459 }
460 } else {
461 jobp->iod->error = 0;
462 }
463
464 add_to_jobq(tp, jobp);
465 }
466 }
467 }
468 /*
469 * timeout processing
470 */
471 now = PR_IntervalNow();
472 PR_Lock(tp->ioq.lock);
473 for (qp = tp->ioq.list.next; qp != &tp->ioq.list; qp = nextqp) {
474 nextqp = qp->next;
475 jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
476 if (jobp->cancel_io) {
477 CANCEL_IO_JOB(jobp)do { jobp->cancel_io = 0; jobp->on_ioq = 0; do { (&
jobp->links)->prev->next = (&jobp->links)->
next; (&jobp->links)->next->prev = (&jobp->
links)->prev; (&jobp->links)->next = (&jobp->
links); (&jobp->links)->prev = (&jobp->links
); } while (0); tp->ioq.cnt--; PR_NotifyCondVar(jobp->cancel_cv
); } while (0)
;
478 continue;
479 }
480 if (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == jobp->timeout) {
481 break;
482 }
483 if ((PR_INTERVAL_NO_WAIT0UL != jobp->timeout) &&
484 ((PRInt32)(jobp->absolute - now) > 0)) {
485 break;
486 }
487 PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp->
links)->next; (&jobp->links)->next->prev = (&
jobp->links)->prev; (&jobp->links)->next = (&
jobp->links); (&jobp->links)->prev = (&jobp->
links); } while (0)
;
488 tp->ioq.cnt--;
489 jobp->on_ioq = PR_FALSE0;
490 jobp->iod->error = PR_IO_TIMEOUT_ERROR(-5990L);
491 add_to_jobq(tp, jobp);
492 }
493 PR_Unlock(tp->ioq.lock);
494 }
495}
496
497/*
498 * timer worker thread function
499 */
500static void timer_wstart(void *arg)
501{
502 PRThreadPool *tp = (PRThreadPool *) arg;
503 PRCList *qp;
504 PRIntervalTime timeout;
505 PRIntervalTime now;
506
507 /*
508 * call PR_WaitCondVar with minimum value of all timeouts
509 */
510 while (!tp->shutdown) {
511 PRJob *jobp;
512
513 PR_Lock(tp->timerq.lock);
514 if (PR_CLIST_IS_EMPTY(&tp->timerq.list)((&tp->timerq.list)->next == (&tp->timerq.list
))
) {
515 timeout = PR_INTERVAL_NO_TIMEOUT0xffffffffUL;
516 } else {
517 PRCList *qp;
518
519 qp = tp->timerq.list.next;
520 jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
521
522 timeout = jobp->absolute - PR_IntervalNow();
523 if (timeout <= 0) {
524 timeout = PR_INTERVAL_NO_WAIT0UL; /* already timed out */
525 }
526 }
527 if (PR_INTERVAL_NO_WAIT0UL != timeout) {
528 PR_WaitCondVar(tp->timerq.cv, timeout);
529 }
530 if (tp->shutdown) {
531 PR_Unlock(tp->timerq.lock);
532 break;
533 }
534 /*
535 * move expired-timer jobs to jobq
536 */
537 now = PR_IntervalNow();
538 while (!PR_CLIST_IS_EMPTY(&tp->timerq.list)((&tp->timerq.list)->next == (&tp->timerq.list
))
) {
539 qp = tp->timerq.list.next;
540 jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
541
542 if ((PRInt32)(jobp->absolute - now) > 0) {
543 break;
544 }
545 /*
546 * job timed out
547 */
548 PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp->
links)->next; (&jobp->links)->next->prev = (&
jobp->links)->prev; (&jobp->links)->next = (&
jobp->links); (&jobp->links)->prev = (&jobp->
links); } while (0)
;
549 tp->timerq.cnt--;
550 jobp->on_timerq = PR_FALSE0;
551 add_to_jobq(tp, jobp);
552 }
553 PR_Unlock(tp->timerq.lock);
554 }
555}
556
557static void
558delete_threadpool(PRThreadPool *tp)
559{
560 if (NULL((void*)0) != tp) {
561 if (NULL((void*)0) != tp->shutdown_cv) {
562 PR_DestroyCondVar(tp->shutdown_cv);
563 }
564 if (NULL((void*)0) != tp->jobq.cv) {
565 PR_DestroyCondVar(tp->jobq.cv);
566 }
567 if (NULL((void*)0) != tp->jobq.lock) {
568 PR_DestroyLock(tp->jobq.lock);
569 }
570 if (NULL((void*)0) != tp->join_lock) {
571 PR_DestroyLock(tp->join_lock);
572 }
573#ifdef OPT_WINNT
574 if (NULL((void*)0) != tp->jobq.nt_completion_port) {
575 CloseHandle(tp->jobq.nt_completion_port);
576 }
577#endif
578 /* Timer queue */
579 if (NULL((void*)0) != tp->timerq.cv) {
580 PR_DestroyCondVar(tp->timerq.cv);
581 }
582 if (NULL((void*)0) != tp->timerq.lock) {
583 PR_DestroyLock(tp->timerq.lock);
584 }
585
586 if (NULL((void*)0) != tp->ioq.lock) {
587 PR_DestroyLock(tp->ioq.lock);
588 }
589 if (NULL((void*)0) != tp->ioq.pollfds) {
590 PR_Free(tp->ioq.pollfds);
591 }
592 if (NULL((void*)0) != tp->ioq.notify_fd) {
593 PR_DestroyPollableEvent(tp->ioq.notify_fd);
594 }
595 PR_Free(tp);
596 }
597 return;
598}
599
600static PRThreadPool *
601alloc_threadpool(void)
602{
603 PRThreadPool *tp;
604
605 tp = (PRThreadPool *) PR_CALLOC(sizeof(*tp))(PR_Calloc(1, (sizeof(*tp))));
606 if (NULL((void*)0) == tp) {
607 goto failed;
608 }
609 tp->jobq.lock = PR_NewLock();
610 if (NULL((void*)0) == tp->jobq.lock) {
611 goto failed;
612 }
613 tp->jobq.cv = PR_NewCondVar(tp->jobq.lock);
614 if (NULL((void*)0) == tp->jobq.cv) {
615 goto failed;
616 }
617 tp->join_lock = PR_NewLock();
618 if (NULL((void*)0) == tp->join_lock) {
619 goto failed;
620 }
621#ifdef OPT_WINNT
622 tp->jobq.nt_completion_port = CreateIoCompletionPort(INVALID_HANDLE_VALUE,
623 NULL((void*)0), 0, 0);
624 if (NULL((void*)0) == tp->jobq.nt_completion_port) {
625 goto failed;
626 }
627#endif
628
629 tp->ioq.lock = PR_NewLock();
630 if (NULL((void*)0) == tp->ioq.lock) {
631 goto failed;
632 }
633
634 /* Timer queue */
635
636 tp->timerq.lock = PR_NewLock();
637 if (NULL((void*)0) == tp->timerq.lock) {
638 goto failed;
639 }
640 tp->timerq.cv = PR_NewCondVar(tp->timerq.lock);
641 if (NULL((void*)0) == tp->timerq.cv) {
642 goto failed;
643 }
644
645 tp->shutdown_cv = PR_NewCondVar(tp->jobq.lock);
646 if (NULL((void*)0) == tp->shutdown_cv) {
647 goto failed;
648 }
649 tp->ioq.notify_fd = PR_NewPollableEvent();
650 if (NULL((void*)0) == tp->ioq.notify_fd) {
651 goto failed;
652 }
653 return tp;
654failed:
655 delete_threadpool(tp);
656 PR_SetError(PR_OUT_OF_MEMORY_ERROR(-6000L), 0);
657 return NULL((void*)0);
658}
659
660/* Create thread pool */
661PR_IMPLEMENT(PRThreadPool *)__attribute__((visibility("default"))) PRThreadPool *
662PR_CreateThreadPool(PRInt32 initial_threads, PRInt32 max_threads,
663 PRUint32 stacksize)
664{
665 PRThreadPool *tp;
666 PRThread *thr;
667 int i;
668 wthread *wthrp;
669
670 tp = alloc_threadpool();
671 if (NULL((void*)0) == tp) {
672 return NULL((void*)0);
673 }
674
675 tp->init_threads = initial_threads;
676 tp->max_threads = max_threads;
677 tp->stacksize = stacksize;
678 PR_INIT_CLIST(&tp->jobq.list)do { (&tp->jobq.list)->next = (&tp->jobq.list
); (&tp->jobq.list)->prev = (&tp->jobq.list)
; } while (0)
;
679 PR_INIT_CLIST(&tp->ioq.list)do { (&tp->ioq.list)->next = (&tp->ioq.list)
; (&tp->ioq.list)->prev = (&tp->ioq.list); }
while (0)
;
680 PR_INIT_CLIST(&tp->timerq.list)do { (&tp->timerq.list)->next = (&tp->timerq
.list); (&tp->timerq.list)->prev = (&tp->timerq
.list); } while (0)
;
681 PR_INIT_CLIST(&tp->jobq.wthreads)do { (&tp->jobq.wthreads)->next = (&tp->jobq
.wthreads); (&tp->jobq.wthreads)->prev = (&tp->
jobq.wthreads); } while (0)
;
682 PR_INIT_CLIST(&tp->ioq.wthreads)do { (&tp->ioq.wthreads)->next = (&tp->ioq.wthreads
); (&tp->ioq.wthreads)->prev = (&tp->ioq.wthreads
); } while (0)
;
683 PR_INIT_CLIST(&tp->timerq.wthreads)do { (&tp->timerq.wthreads)->next = (&tp->timerq
.wthreads); (&tp->timerq.wthreads)->prev = (&tp
->timerq.wthreads); } while (0)
;
684 tp->shutdown = PR_FALSE0;
685
686 PR_Lock(tp->jobq.lock);
687 for(i=0; i < initial_threads; ++i) {
688
689 thr = PR_CreateThread(PR_USER_THREAD, wstart,
690 tp, PR_PRIORITY_NORMAL,
691 PR_GLOBAL_THREAD, PR_JOINABLE_THREAD,stacksize);
692 PR_ASSERT(thr)((thr)?((void)0):PR_Assert("thr","../../../../pr/src/misc/prtpool.c"
,692))
;
693 wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread)));
694 PR_ASSERT(wthrp)((wthrp)?((void)0):PR_Assert("wthrp","../../../../pr/src/misc/prtpool.c"
,694))
;
695 wthrp->thread = thr;
696 PR_APPEND_LINK(&wthrp->links, &tp->jobq.wthreads)do { (&wthrp->links)->next = (&tp->jobq.wthreads
); (&wthrp->links)->prev = (&tp->jobq.wthreads
)->prev; (&tp->jobq.wthreads)->prev->next = (
&wthrp->links); (&tp->jobq.wthreads)->prev =
(&wthrp->links); } while (0)
;
697 }
698 tp->current_threads = initial_threads;
699
700 thr = PR_CreateThread(PR_USER_THREAD, io_wstart,
701 tp, PR_PRIORITY_NORMAL,
702 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
703 PR_ASSERT(thr)((thr)?((void)0):PR_Assert("thr","../../../../pr/src/misc/prtpool.c"
,703))
;
704 wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread)));
705 PR_ASSERT(wthrp)((wthrp)?((void)0):PR_Assert("wthrp","../../../../pr/src/misc/prtpool.c"
,705))
;
706 wthrp->thread = thr;
707 PR_APPEND_LINK(&wthrp->links, &tp->ioq.wthreads)do { (&wthrp->links)->next = (&tp->ioq.wthreads
); (&wthrp->links)->prev = (&tp->ioq.wthreads
)->prev; (&tp->ioq.wthreads)->prev->next = (&
wthrp->links); (&tp->ioq.wthreads)->prev = (&
wthrp->links); } while (0)
;
708
709 thr = PR_CreateThread(PR_USER_THREAD, timer_wstart,
710 tp, PR_PRIORITY_NORMAL,
711 PR_GLOBAL_THREAD,PR_JOINABLE_THREAD,stacksize);
712 PR_ASSERT(thr)((thr)?((void)0):PR_Assert("thr","../../../../pr/src/misc/prtpool.c"
,712))
;
713 wthrp = PR_NEWZAP(wthread)((wthread*)PR_Calloc(1, sizeof(wthread)));
714 PR_ASSERT(wthrp)((wthrp)?((void)0):PR_Assert("wthrp","../../../../pr/src/misc/prtpool.c"
,714))
;
715 wthrp->thread = thr;
716 PR_APPEND_LINK(&wthrp->links, &tp->timerq.wthreads)do { (&wthrp->links)->next = (&tp->timerq.wthreads
); (&wthrp->links)->prev = (&tp->timerq.wthreads
)->prev; (&tp->timerq.wthreads)->prev->next =
(&wthrp->links); (&tp->timerq.wthreads)->prev
= (&wthrp->links); } while (0)
;
717
718 PR_Unlock(tp->jobq.lock);
719 return tp;
720}
721
722static void
723delete_job(PRJob *jobp)
724{
725 if (NULL((void*)0) != jobp) {
726 if (NULL((void*)0) != jobp->join_cv) {
727 PR_DestroyCondVar(jobp->join_cv);
728 jobp->join_cv = NULL((void*)0);
729 }
730 if (NULL((void*)0) != jobp->cancel_cv) {
731 PR_DestroyCondVar(jobp->cancel_cv);
732 jobp->cancel_cv = NULL((void*)0);
733 }
734 PR_DELETE(jobp){ PR_Free(jobp); (jobp) = ((void*)0); };
735 }
736}
737
738static PRJob *
739alloc_job(PRBool joinable, PRThreadPool *tp)
740{
741 PRJob *jobp;
742
743 jobp = PR_NEWZAP(PRJob)((PRJob*)PR_Calloc(1, sizeof(PRJob)));
744 if (NULL((void*)0) == jobp) {
745 goto failed;
746 }
747 if (joinable) {
748 jobp->join_cv = PR_NewCondVar(tp->join_lock);
749 jobp->join_wait = PR_TRUE1;
750 if (NULL((void*)0) == jobp->join_cv) {
751 goto failed;
752 }
753 } else {
754 jobp->join_cv = NULL((void*)0);
755 }
756#ifdef OPT_WINNT
757 jobp->nt_notifier.jobp = jobp;
758#endif
759 return jobp;
760failed:
761 delete_job(jobp);
762 PR_SetError(PR_OUT_OF_MEMORY_ERROR(-6000L), 0);
763 return NULL((void*)0);
764}
765
766/* queue a job */
767PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob *
768PR_QueueJob(PRThreadPool *tpool, PRJobFn fn, void *arg, PRBool joinable)
769{
770 PRJob *jobp;
771
772 jobp = alloc_job(joinable, tpool);
773 if (NULL((void*)0) == jobp) {
774 return NULL((void*)0);
775 }
776
777 jobp->job_func = fn;
778 jobp->job_arg = arg;
779 jobp->tpool = tpool;
780
781 add_to_jobq(tpool, jobp);
782 return jobp;
783}
784
785/* queue a job, when a socket is readable or writeable */
786static PRJob *
787queue_io_job(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
788 PRBool joinable, io_op_type op)
789{
790 PRJob *jobp;
791 PRIntervalTime now;
792
793 jobp = alloc_job(joinable, tpool);
794 if (NULL((void*)0) == jobp) {
795 return NULL((void*)0);
796 }
797
798 /*
799 * Add a new job to io_jobq
800 * wakeup io worker thread
801 */
802
803 jobp->job_func = fn;
804 jobp->job_arg = arg;
805 jobp->tpool = tpool;
806 jobp->iod = iod;
807 if (JOB_IO_READ == op) {
808 jobp->io_op = JOB_IO_READ;
809 jobp->io_poll_flags = PR_POLL_READ0x1;
810 } else if (JOB_IO_WRITE == op) {
811 jobp->io_op = JOB_IO_WRITE;
812 jobp->io_poll_flags = PR_POLL_WRITE0x2;
813 } else if (JOB_IO_ACCEPT == op) {
814 jobp->io_op = JOB_IO_ACCEPT;
815 jobp->io_poll_flags = PR_POLL_READ0x1;
816 } else if (JOB_IO_CONNECT == op) {
817 jobp->io_op = JOB_IO_CONNECT;
818 jobp->io_poll_flags = PR_POLL_WRITE0x2|PR_POLL_EXCEPT0x4;
819 } else {
820 delete_job(jobp);
821 PR_SetError(PR_INVALID_ARGUMENT_ERROR(-5987L), 0);
822 return NULL((void*)0);
823 }
824
825 jobp->timeout = iod->timeout;
826 if ((PR_INTERVAL_NO_TIMEOUT0xffffffffUL == iod->timeout) ||
827 (PR_INTERVAL_NO_WAIT0UL == iod->timeout)) {
828 jobp->absolute = iod->timeout;
829 } else {
830 now = PR_IntervalNow();
831 jobp->absolute = now + iod->timeout;
832 }
833
834
835 PR_Lock(tpool->ioq.lock);
836
837 if (PR_CLIST_IS_EMPTY(&tpool->ioq.list)((&tpool->ioq.list)->next == (&tpool->ioq.list
))
||
838 (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == iod->timeout)) {
839 PR_APPEND_LINK(&jobp->links,&tpool->ioq.list)do { (&jobp->links)->next = (&tpool->ioq.list
); (&jobp->links)->prev = (&tpool->ioq.list)
->prev; (&tpool->ioq.list)->prev->next = (&
jobp->links); (&tpool->ioq.list)->prev = (&jobp
->links); } while (0)
;
840 } else if (PR_INTERVAL_NO_WAIT0UL == iod->timeout) {
841 PR_INSERT_LINK(&jobp->links,&tpool->ioq.list)do { (&jobp->links)->next = (&tpool->ioq.list
)->next; (&jobp->links)->prev = (&tpool->
ioq.list); (&tpool->ioq.list)->next->prev = (&
jobp->links); (&tpool->ioq.list)->next = (&jobp
->links); } while (0)
;
842 } else {
843 PRCList *qp;
844 PRJob *tmp_jobp;
845 /*
846 * insert into the timeout-sorted ioq
847 */
848 for (qp = tpool->ioq.list.prev; qp != &tpool->ioq.list;
849 qp = qp->prev) {
850 tmp_jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
851 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
852 break;
853 }
854 }
855 PR_INSERT_AFTER(&jobp->links,qp)do { (&jobp->links)->next = (qp)->next; (&jobp
->links)->prev = (qp); (qp)->next->prev = (&jobp
->links); (qp)->next = (&jobp->links); } while (
0)
;
856 }
857
858 jobp->on_ioq = PR_TRUE1;
859 tpool->ioq.cnt++;
860 /*
861 * notify io worker thread(s)
862 */
863 PR_Unlock(tpool->ioq.lock);
864 notify_ioq(tpool);
865 return jobp;
866}
867
868/* queue a job, when a socket is readable */
869PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob *
870PR_QueueJob_Read(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn, void * arg,
871 PRBool joinable)
872{
873 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_READ));
874}
875
876/* queue a job, when a socket is writeable */
877PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob *
878PR_QueueJob_Write(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,void * arg,
879 PRBool joinable)
880{
881 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_WRITE));
882}
883
884
885/* queue a job, when a socket has a pending connection */
886PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob *
887PR_QueueJob_Accept(PRThreadPool *tpool, PRJobIoDesc *iod, PRJobFn fn,
888 void * arg, PRBool joinable)
889{
890 return (queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_ACCEPT));
891}
892
893/* queue a job, when a socket can be connected */
894PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob *
895PR_QueueJob_Connect(PRThreadPool *tpool, PRJobIoDesc *iod,
896 const PRNetAddr *addr, PRJobFn fn, void * arg, PRBool joinable)
897{
898 PRStatus rv;
899 PRErrorCode err;
900
901 rv = PR_Connect(iod->socket, addr, PR_INTERVAL_NO_WAIT0UL);
902 if ((rv == PR_FAILURE) && ((err = PR_GetError()) == PR_IN_PROGRESS_ERROR(-5934L))) {
903 /* connection pending */
904 return(queue_io_job(tpool, iod, fn, arg, joinable, JOB_IO_CONNECT));
905 }
906 /*
907 * connection succeeded or failed; add to jobq right away
908 */
909 if (rv == PR_FAILURE) {
910 iod->error = err;
911 }
912 else {
913 iod->error = 0;
914 }
915 return(PR_QueueJob(tpool, fn, arg, joinable));
916
917}
918
919/* queue a job, when a timer expires */
920PR_IMPLEMENT(PRJob *)__attribute__((visibility("default"))) PRJob *
921PR_QueueJob_Timer(PRThreadPool *tpool, PRIntervalTime timeout,
922 PRJobFn fn, void * arg, PRBool joinable)
923{
924 PRIntervalTime now;
925 PRJob *jobp;
926
927 if (PR_INTERVAL_NO_TIMEOUT0xffffffffUL == timeout) {
928 PR_SetError(PR_INVALID_ARGUMENT_ERROR(-5987L), 0);
929 return NULL((void*)0);
930 }
931 if (PR_INTERVAL_NO_WAIT0UL == timeout) {
932 /*
933 * no waiting; add to jobq right away
934 */
935 return(PR_QueueJob(tpool, fn, arg, joinable));
936 }
937 jobp = alloc_job(joinable, tpool);
938 if (NULL((void*)0) == jobp) {
939 return NULL((void*)0);
940 }
941
942 /*
943 * Add a new job to timer_jobq
944 * wakeup timer worker thread
945 */
946
947 jobp->job_func = fn;
948 jobp->job_arg = arg;
949 jobp->tpool = tpool;
950 jobp->timeout = timeout;
951
952 now = PR_IntervalNow();
953 jobp->absolute = now + timeout;
954
955
956 PR_Lock(tpool->timerq.lock);
957 jobp->on_timerq = PR_TRUE1;
958 if (PR_CLIST_IS_EMPTY(&tpool->timerq.list)((&tpool->timerq.list)->next == (&tpool->timerq
.list))
) {
959 PR_APPEND_LINK(&jobp->links,&tpool->timerq.list)do { (&jobp->links)->next = (&tpool->timerq.
list); (&jobp->links)->prev = (&tpool->timerq
.list)->prev; (&tpool->timerq.list)->prev->next
= (&jobp->links); (&tpool->timerq.list)->prev
= (&jobp->links); } while (0)
;
960 }
961 else {
962 PRCList *qp;
963 PRJob *tmp_jobp;
964 /*
965 * insert into the sorted timer jobq
966 */
967 for (qp = tpool->timerq.list.prev; qp != &tpool->timerq.list;
968 qp = qp->prev) {
969 tmp_jobp = JOB_LINKS_PTR(qp)((PRJob *) ((char *) (qp) - __builtin_offsetof(PRJob, links))
)
;
970 if ((PRInt32)(jobp->absolute - tmp_jobp->absolute) >= 0) {
971 break;
972 }
973 }
974 PR_INSERT_AFTER(&jobp->links,qp)do { (&jobp->links)->next = (qp)->next; (&jobp
->links)->prev = (qp); (qp)->next->prev = (&jobp
->links); (qp)->next = (&jobp->links); } while (
0)
;
975 }
976 tpool->timerq.cnt++;
977 /*
978 * notify timer worker thread(s)
979 */
980 notify_timerq(tpool);
981 PR_Unlock(tpool->timerq.lock);
982 return jobp;
983}
984
985static void
986notify_timerq(PRThreadPool *tp)
987{
988 /*
989 * wakeup the timer thread(s)
990 */
991 PR_NotifyCondVar(tp->timerq.cv);
992}
993
994static void
995notify_ioq(PRThreadPool *tp)
996{
997 PRStatus rval_status;
998
999 /*
1000 * wakeup the io thread(s)
1001 */
1002 rval_status = PR_SetPollableEvent(tp->ioq.notify_fd);
1003 PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status"
,"../../../../pr/src/misc/prtpool.c",1003))
;
1004}
1005
1006/*
1007 * cancel a job
1008 *
1009 * XXXX: is this needed? likely to be removed
1010 */
1011PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus
1012PR_CancelJob(PRJob *jobp) {
1013
1014 PRStatus rval = PR_FAILURE;
1015 PRThreadPool *tp;
1016
1017 if (jobp->on_timerq) {
1018 /*
1019 * now, check again while holding the timerq lock
1020 */
1021 tp = jobp->tpool;
1022 PR_Lock(tp->timerq.lock);
1023 if (jobp->on_timerq) {
1024 jobp->on_timerq = PR_FALSE0;
1025 PR_REMOVE_AND_INIT_LINK(&jobp->links)do { (&jobp->links)->prev->next = (&jobp->
links)->next; (&jobp->links)->next->prev = (&
jobp->links)->prev; (&jobp->links)->next = (&
jobp->links); (&jobp->links)->prev = (&jobp->
links); } while (0)
;
1026 tp->timerq.cnt--;
1027 PR_Unlock(tp->timerq.lock);
1028 if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) {
1029 delete_job(jobp);
1030 } else {
1031 JOIN_NOTIFY(jobp)do { PR_Lock(jobp->tpool->join_lock); jobp->join_wait
= 0; PR_NotifyCondVar(jobp->join_cv); PR_Unlock(jobp->
tpool->join_lock); } while (0)
;
1032 }
1033 rval = PR_SUCCESS;
1034 } else {
1035 PR_Unlock(tp->timerq.lock);
1036 }
1037 } else if (jobp->on_ioq) {
1038 /*
1039 * now, check again while holding the ioq lock
1040 */
1041 tp = jobp->tpool;
1042 PR_Lock(tp->ioq.lock);
1043 if (jobp->on_ioq) {
1044 jobp->cancel_cv = PR_NewCondVar(tp->ioq.lock);
1045 if (NULL((void*)0) == jobp->cancel_cv) {
1046 PR_Unlock(tp->ioq.lock);
1047 PR_SetError(PR_INSUFFICIENT_RESOURCES_ERROR(-5974L), 0);
1048 return PR_FAILURE;
1049 }
1050 /*
1051 * mark job 'cancelled' and notify io thread(s)
1052 * XXXX:
1053 * this assumes there is only one io thread; when there
1054 * are multiple threads, the io thread processing this job
1055 * must be notified.
1056 */
1057 jobp->cancel_io = PR_TRUE1;
1058 PR_Unlock(tp->ioq.lock); /* release, reacquire ioq lock */
1059 notify_ioq(tp);
1060 PR_Lock(tp->ioq.lock);
1061 while (jobp->cancel_io) {
1062 PR_WaitCondVar(jobp->cancel_cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL);
1063 }
1064 PR_Unlock(tp->ioq.lock);
1065 PR_ASSERT(!jobp->on_ioq)((!jobp->on_ioq)?((void)0):PR_Assert("!jobp->on_ioq","../../../../pr/src/misc/prtpool.c"
,1065))
;
1066 if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) {
1067 delete_job(jobp);
1068 } else {
1069 JOIN_NOTIFY(jobp)do { PR_Lock(jobp->tpool->join_lock); jobp->join_wait
= 0; PR_NotifyCondVar(jobp->join_cv); PR_Unlock(jobp->
tpool->join_lock); } while (0)
;
1070 }
1071 rval = PR_SUCCESS;
1072 } else {
1073 PR_Unlock(tp->ioq.lock);
1074 }
1075 }
1076 if (PR_FAILURE == rval) {
1077 PR_SetError(PR_INVALID_STATE_ERROR(-5931L), 0);
1078 }
1079 return rval;
1080}
1081
1082/* join a job, wait until completion */
1083PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus
1084PR_JoinJob(PRJob *jobp)
1085{
1086 if (!JOINABLE_JOB(jobp)(((void*)0) != (jobp)->join_cv)) {
1087 PR_SetError(PR_INVALID_ARGUMENT_ERROR(-5987L), 0);
1088 return PR_FAILURE;
1089 }
1090 PR_Lock(jobp->tpool->join_lock);
1091 while(jobp->join_wait) {
1092 PR_WaitCondVar(jobp->join_cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL);
1093 }
1094 PR_Unlock(jobp->tpool->join_lock);
1095 delete_job(jobp);
1096 return PR_SUCCESS;
1097}
1098
1099/* shutdown threadpool */
1100PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus
1101PR_ShutdownThreadPool(PRThreadPool *tpool)
1102{
1103 PRStatus rval = PR_SUCCESS;
1104
1105 PR_Lock(tpool->jobq.lock);
1106 tpool->shutdown = PR_TRUE1;
1107 PR_NotifyAllCondVar(tpool->shutdown_cv);
1108 PR_Unlock(tpool->jobq.lock);
1109
1110 return rval;
1111}
1112
1113/*
1114 * join thread pool
1115 * wait for termination of worker threads
1116 * reclaim threadpool resources
1117 */
1118PR_IMPLEMENT(PRStatus)__attribute__((visibility("default"))) PRStatus
1119PR_JoinThreadPool(PRThreadPool *tpool)
1120{
1121 PRStatus rval = PR_SUCCESS;
1122 PRCList *head;
1123 PRStatus rval_status;
1124
1125 PR_Lock(tpool->jobq.lock);
1126 while (!tpool->shutdown) {
1127 PR_WaitCondVar(tpool->shutdown_cv, PR_INTERVAL_NO_TIMEOUT0xffffffffUL);
1128 }
1129
1130 /*
1131 * wakeup worker threads
1132 */
1133#ifdef OPT_WINNT
1134 /*
1135 * post shutdown notification for all threads
1136 */
1137 {
1138 int i;
1139 for(i=0; i < tpool->current_threads; i++) {
1140 PostQueuedCompletionStatus(tpool->jobq.nt_completion_port, 0,
1141 TRUE, NULL((void*)0));
1142 }
1143 }
1144#else
1145 PR_NotifyAllCondVar(tpool->jobq.cv);
1146#endif
1147
1148 /*
1149 * wakeup io thread(s)
1150 */
1151 notify_ioq(tpool);
1152
1153 /*
1154 * wakeup timer thread(s)
1155 */
1156 PR_Lock(tpool->timerq.lock);
1157 notify_timerq(tpool);
1158 PR_Unlock(tpool->timerq.lock);
1159
1160 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.wthreads)((&tpool->jobq.wthreads)->next == (&tpool->jobq
.wthreads))
) {
1161 wthread *wthrp;
1162
1163 head = PR_LIST_HEAD(&tpool->jobq.wthreads)(&tpool->jobq.wthreads)->next;
1164 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
1165 PR_Unlock(tpool->jobq.lock);
1166 wthrp = WTHREAD_LINKS_PTR(head)((wthread *) ((char *) (head) - __builtin_offsetof(wthread, links
)))
;
1167 rval_status = PR_JoinThread(wthrp->thread);
1168 PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status"
,"../../../../pr/src/misc/prtpool.c",1168))
;
1169 PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); };
1170 PR_Lock(tpool->jobq.lock);
1171 }
1172 PR_Unlock(tpool->jobq.lock);
1173 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.wthreads)((&tpool->ioq.wthreads)->next == (&tpool->ioq
.wthreads))
) {
1174 wthread *wthrp;
1175
1176 head = PR_LIST_HEAD(&tpool->ioq.wthreads)(&tpool->ioq.wthreads)->next;
1177 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
1178 wthrp = WTHREAD_LINKS_PTR(head)((wthread *) ((char *) (head) - __builtin_offsetof(wthread, links
)))
;
1179 rval_status = PR_JoinThread(wthrp->thread);
1180 PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status"
,"../../../../pr/src/misc/prtpool.c",1180))
;
1181 PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); };
1182 }
1183
1184 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.wthreads)((&tpool->timerq.wthreads)->next == (&tpool->
timerq.wthreads))
) {
1185 wthread *wthrp;
1186
1187 head = PR_LIST_HEAD(&tpool->timerq.wthreads)(&tpool->timerq.wthreads)->next;
1188 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
1189 wthrp = WTHREAD_LINKS_PTR(head)((wthread *) ((char *) (head) - __builtin_offsetof(wthread, links
)))
;
1190 rval_status = PR_JoinThread(wthrp->thread);
1191 PR_ASSERT(PR_SUCCESS == rval_status)((PR_SUCCESS == rval_status)?((void)0):PR_Assert("PR_SUCCESS == rval_status"
,"../../../../pr/src/misc/prtpool.c",1191))
;
1192 PR_DELETE(wthrp){ PR_Free(wthrp); (wthrp) = ((void*)0); };
1193 }
1194
1195 /*
1196 * Delete queued jobs
1197 */
1198 while (!PR_CLIST_IS_EMPTY(&tpool->jobq.list)((&tpool->jobq.list)->next == (&tpool->jobq.
list))
) {
1199 PRJob *jobp;
1200
1201 head = PR_LIST_HEAD(&tpool->jobq.list)(&tpool->jobq.list)->next;
1202 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
1203 jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links
)))
;
1204 tpool->jobq.cnt--;
1205 delete_job(jobp);
1206 }
1207
1208 /* delete io jobs */
1209 while (!PR_CLIST_IS_EMPTY(&tpool->ioq.list)((&tpool->ioq.list)->next == (&tpool->ioq.list
))
) {
1210 PRJob *jobp;
1211
1212 head = PR_LIST_HEAD(&tpool->ioq.list)(&tpool->ioq.list)->next;
1213 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
1214 tpool->ioq.cnt--;
1215 jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links
)))
;
1216 delete_job(jobp);
1217 }
1218
1219 /* delete timer jobs */
1220 while (!PR_CLIST_IS_EMPTY(&tpool->timerq.list)((&tpool->timerq.list)->next == (&tpool->timerq
.list))
) {
1221 PRJob *jobp;
1222
1223 head = PR_LIST_HEAD(&tpool->timerq.list)(&tpool->timerq.list)->next;
1224 PR_REMOVE_AND_INIT_LINK(head)do { (head)->prev->next = (head)->next; (head)->next
->prev = (head)->prev; (head)->next = (head); (head)
->prev = (head); } while (0)
;
1225 tpool->timerq.cnt--;
1226 jobp = JOB_LINKS_PTR(head)((PRJob *) ((char *) (head) - __builtin_offsetof(PRJob, links
)))
;
1227 delete_job(jobp);
1228 }
1229
1230 PR_ASSERT(0 == tpool->jobq.cnt)((0 == tpool->jobq.cnt)?((void)0):PR_Assert("0 == tpool->jobq.cnt"
,"../../../../pr/src/misc/prtpool.c",1230))
;
1231 PR_ASSERT(0 == tpool->ioq.cnt)((0 == tpool->ioq.cnt)?((void)0):PR_Assert("0 == tpool->ioq.cnt"
,"../../../../pr/src/misc/prtpool.c",1231))
;
1232 PR_ASSERT(0 == tpool->timerq.cnt)((0 == tpool->timerq.cnt)?((void)0):PR_Assert("0 == tpool->timerq.cnt"
,"../../../../pr/src/misc/prtpool.c",1232))
;
1233
1234 delete_threadpool(tpool);
1235 return rval;
1236}