]> git.imager.perl.org - poe-xs-queue-array.git/blame - queue.c
0.006 release
[poe-xs-queue-array.git] / queue.c
CommitLineData
a0e4f61f
TC
1#include "EXTERN.h"\r
2#include "perl.h"\r
3#include "XSUB.h"\r
4\r
5#include "queue.h"\r
1bbcbbe4 6#include "alloc.h"\r
a0e4f61f 7\r
c50a32c9
TC
8/*#define DEBUG(x) x*/\r
9#define DEBUG(x)\r
f78c79b4
TC
10/*#define DEBUG_ERR(x) x*/\r
11#define DEBUG_ERR(x)\r
a0e4f61f 12\r
d5e34ea9
TC
13/*#define KEEP_STATS 1*/\r
14\r
15#if KEEP_STATS\r
16#define STATS(x) x\r
17#else\r
18#define STATS(x)\r
19#endif\r
20\r
a0e4f61f
TC
21#define PQ_START_SIZE 10\r
22#define AT_START 0\r
23#define AT_END 1\r
24\r
cb6fa8eb 25#define STUPID_IDS 0\r
a0e4f61f 26\r
d5e34ea9
TC
27#define LARGE_QUEUE_SIZE 50\r
28\r
a0e4f61f
TC
29/*\r
30We store the queue in a similar way to the way perl deals with arrays,\r
31we keep a block of memory, but the first element may or may not be in use,\r
32depending on the pattern of usage.\r
33\r
34There's 3 value controlling usage of the array:\r
35\r
36 - alloc - the number of elements allocated in total\r
37 - start - the first element in use in the array\r
38 - end - one past the end of the last element in the array\r
39\r
40This has the properties that:\r
41\r
42 start == 0 - no space at the front\r
43 end == alloc - no space at the end\r
44 end - start - number of elements in the queue\r
45\r
46We use a perl hash (HV *) to store the mapping from ids to priorities.\r
47\r
48*/\r
49struct poe_queue_tag {\r
50 /* the first entry in use */\r
51 int start;\r
52\r
53 /* 1 past the last entry in use, hence end - start is the number of \r
54 entries in the queue */\r
55 int end;\r
56\r
57 /* the total number of entries allocated */\r
58 int alloc;\r
59\r
60 /* used to generate item ids */\r
61 pq_id_t queue_seq;\r
62\r
63 /* used to track in use item ids */\r
64 HV *ids;\r
65\r
66 /* the actual entries */\r
67 pq_entry *entries;\r
d5e34ea9
TC
68\r
69#if KEEP_STATS\r
70 int total_finds;\r
71 int binary_finds;\r
72#endif\r
a0e4f61f
TC
73};\r
74\r
75/*\r
76poe_create - create a new queue object.\r
77\r
78No parameters. returns the new queue object.\r
79\r
80*/\r
81poe_queue *\r
82pq_create(void) {\r
1bbcbbe4 83 poe_queue *pq = mymalloc(sizeof(poe_queue));\r
a0e4f61f
TC
84 \r
85 if (pq == NULL)\r
86 croak("Out of memory");\r
87 pq->start = 0;\r
88 pq->end = 0;\r
89 pq->alloc = PQ_START_SIZE;\r
90 pq->queue_seq = 0;\r
91 pq->ids = newHV();\r
1bbcbbe4
TC
92 pq->entries = mymalloc(sizeof(pq_entry) * PQ_START_SIZE);\r
93 memset(pq->entries, 0, sizeof(pq_entry) * PQ_START_SIZE);\r
a0e4f61f
TC
94 if (pq->entries == NULL)\r
95 croak("Out of memory");\r
96\r
d5e34ea9
TC
97#if KEEP_STATS\r
98 pq->total_finds = pq->binary_finds = 0;\r
99#endif\r
100\r
a0e4f61f
TC
101 DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );\r
102\r
103 return pq;\r
104}\r
105\r
106/*\r
107pq_delete - release the queue object.\r
108\r
109This also releases one reference from each SV in the queue.\r
110\r
111*/\r
112void\r
113pq_delete(poe_queue *pq) {\r
114 int i;\r
115\r
116 DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
117 if (pq->end > pq->start) {\r
118 for (i = pq->start; i < pq->end; ++i) {\r
119 SvREFCNT_dec(pq->entries[i].payload);\r
120 }\r
121 }\r
122 SvREFCNT_dec((SV *)pq->ids);\r
123 pq->ids = NULL;\r
124 if (pq->entries)\r
1bbcbbe4 125 myfree(pq->entries);\r
a0e4f61f 126 pq->entries = NULL;\r
1bbcbbe4 127 myfree(pq);\r
a0e4f61f
TC
128}\r
129\r
130/*\r
131pq_new_id - generate a new item id.\r
132\r
133Internal use only.\r
134\r
135This, the following 3 functions and pq_create, pq_delete, should be\r
136all that needs to be modified if we change hash implementations.\r
137\r
138*/\r
139static\r
140pq_id_t\r
141pq_new_id(poe_queue *pq, pq_priority_t priority) {\r
cb6fa8eb
TC
142#if STUPID_IDS\r
143 int seq;\r
144 int i;\r
145 int found;\r
146 \r
147 do {\r
148 seq = ++pq->queue_seq;\r
149 found = 0;\r
150 for (i = pq->start; i < pq->end; ++i) {\r
151 if (pq->entries[i].id == seq) {\r
152 found = 1;\r
153 break;\r
154 }\r
155 }\r
156 } while (found);\r
157\r
158 return seq;\r
159#else\r
d5e34ea9 160 pq_id_t seq = ++pq->queue_seq;;\r
a0e4f61f 161\r
d5e34ea9 162 while (hv_exists(pq->ids, (char *)&seq, sizeof(seq))) {\r
a0e4f61f 163 seq = ++pq->queue_seq;\r
a0e4f61f 164 }\r
d5e34ea9 165 hv_store(pq->ids, (char *)&seq, sizeof(seq), newSVnv(priority), 0);\r
cb6fa8eb 166#endif\r
a0e4f61f
TC
167\r
168 return seq;\r
169}\r
170\r
171/*\r
172pq_release_id - releases an id for future use.\r
173*/\r
174static\r
175void\r
176pq_release_id(poe_queue *pq, pq_id_t id) {\r
cb6fa8eb
TC
177#if STUPID_IDS\r
178#else\r
d5e34ea9 179 hv_delete(pq->ids, (char *)&id, sizeof(id), 0);\r
cb6fa8eb 180#endif\r
a0e4f61f
TC
181}\r
182\r
183/*\r
184pq_item_priority - get the priority of an item given it's id\r
185*/\r
186static\r
187int\r
188pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {\r
cb6fa8eb
TC
189#if STUPID_IDS\r
190 int i;\r
191\r
192 for (i = pq->start; i < pq->end; ++i) {\r
193 if (pq->entries[i].id == id) {\r
194 *priority = pq->entries[i].priority;\r
195 return 1;\r
196 }\r
197 }\r
198\r
199 return 0;\r
200#else\r
d5e34ea9 201 SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);\r
a0e4f61f 202\r
d5e34ea9 203 if (!entry || !*entry)\r
a0e4f61f
TC
204 return 0;\r
205\r
d5e34ea9 206 *priority = SvNV(*entry);\r
a0e4f61f
TC
207\r
208 return 1;\r
cb6fa8eb 209#endif\r
a0e4f61f
TC
210}\r
211\r
212/*\r
213pq_set_id_priority - set the priority of an item in the id hash\r
214*/\r
215static\r
216void\r
217pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {\r
cb6fa8eb
TC
218#if STUPID_IDS\r
219 /* nothing to do, caller set it in the array */\r
220#else\r
d5e34ea9 221 SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);\r
a0e4f61f 222\r
d5e34ea9 223 if (!entry && !*entry)\r
a0e4f61f
TC
224 croak("pq_set_priority: id not found");\r
225\r
d5e34ea9 226 sv_setnv(*entry, new_priority);\r
cb6fa8eb 227#endif\r
a0e4f61f
TC
228}\r
229\r
c50a32c9
TC
230/*\r
231pq_move_items - moves items around.\r
232\r
233This encapsulates the old calls to memmove(), providing a single place\r
234to add error checking.\r
235*/\r
236static void\r
237pq_move_items(poe_queue *pq, int target, int src, int count) {\r
238\r
239 DEBUG_ERR(\r
240 {\r
241 int die = 0;\r
242 if (src < pq->start) {\r
243 fprintf(stderr, "src %d less than start %d\n", src, pq->start);\r
244 ++die;\r
245 }\r
246 if (src + count > pq->end) {\r
247 fprintf(stderr, "src %d + count %d beyond end %d\n", src, count, pq->end);\r
248 ++die;\r
249 }\r
250 if (target < 0) {\r
251 fprintf(stderr, "target %d < 0\n", target);\r
252 ++die;\r
253 }\r
254 if (target + count > pq->alloc) {\r
800342e1 255 fprintf(stderr, "target %d + count %d > alloc %d\n", target, count, pq->alloc);\r
c50a32c9
TC
256 ++die;\r
257 }\r
258 if (die) *(char *)0 = '\0';\r
259 }\r
260 )\r
261 memmove(pq->entries + target, pq->entries + src, count * sizeof(pq_entry));\r
262}\r
263\r
a0e4f61f
TC
264/*\r
265pq_realloc - make space at the front of back of the queue.\r
266\r
267This adjusts the queue to allow insertion of a single item at the\r
268front or the back of the queue.\r
269\r
270If the queue has 33% or more space available we simple adjust the\r
271position of the in-use items within the array. We try not to push the\r
272items right up against the opposite end of the array, since we might\r
273need to insert items there too.\r
274\r
275If the queue has less than 33% space available we allocate another 50%\r
276space. We then only move the queue elements if we need space at the\r
277front, since the reallocation has just opened up a huge space at the\r
278back. Since we're reallocating exponentially larger sizes we should\r
279have a constant time cost on reallocation per queue item stored (but\r
280other costs are going to be higher.)\r
281\r
282*/\r
283static\r
284void\r
285pq_realloc(poe_queue *pq, int at_end) {\r
286 int count = pq->end - pq->start;\r
287\r
288 DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );\r
a0e4f61f
TC
289 if (count * 3 / 2 < pq->alloc) {\r
290 /* 33 % or more space available, use some of it */\r
291 int new_start;\r
292\r
293 if (at_end) {\r
294 new_start = (pq->alloc - count) / 3;\r
295 }\r
296 else {\r
297 new_start = (pq->alloc - count) * 2 / 3;\r
298 }\r
299 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );\r
c50a32c9 300 pq_move_items(pq, new_start, pq->start, count);\r
a0e4f61f
TC
301 pq->start = new_start;\r
302 pq->end = new_start + count;\r
303 }\r
304 else {\r
305 int new_alloc = pq->alloc * 3 / 2;\r
1bbcbbe4 306 pq->entries = myrealloc(pq->entries, sizeof(pq_entry) * new_alloc);\r
a0e4f61f
TC
307 pq->alloc = new_alloc;\r
308\r
309 if (!pq->entries)\r
310 croak("Out of memory");\r
311\r
312 DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );\r
313\r
314 if (!at_end) {\r
315 int new_start = (new_alloc - count) * 2 / 3;\r
316 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );\r
c50a32c9 317 pq_move_items(pq, new_start, pq->start, count);\r
a0e4f61f
TC
318 pq->start = new_start;\r
319 pq->end = new_start + count;\r
320 }\r
321 }\r
a0e4f61f
TC
322 DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );\r
323}\r
324\r
325/*\r
326pq_insertion_point - figure out where to insert an item with the given\r
327priority\r
328\r
329Internal.\r
330*/\r
331static\r
332int\r
333pq_insertion_point(poe_queue *pq, pq_priority_t priority) {\r
d5e34ea9
TC
334 if (pq->end - pq->start < LARGE_QUEUE_SIZE) {\r
335 int i = pq->end;\r
336 while (i > pq->start &&\r
337 priority < pq->entries[i-1].priority) {\r
338 --i;\r
339 }\r
340 return i;\r
a0e4f61f 341 }\r
d5e34ea9
TC
342 else {\r
343 int lower = pq->start;\r
344 int upper = pq->end - 1;\r
345 while (1) {\r
346 int midpoint = (lower + upper) >> 1;\r
a0e4f61f 347\r
d5e34ea9
TC
348 if (upper < lower)\r
349 return lower;\r
350 \r
351 if (priority < pq->entries[midpoint].priority) {\r
352 upper = midpoint - 1;\r
353 continue;\r
354 }\r
355 if (priority > pq->entries[midpoint].priority) {\r
356 lower = midpoint + 1;\r
357 continue;\r
358 }\r
359 while (midpoint < pq->end &&\r
360 priority == pq->entries[midpoint].priority) {\r
361 ++midpoint;\r
362 }\r
363 return midpoint;\r
364 }\r
365 }\r
a0e4f61f
TC
366}\r
367\r
368int\r
369pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {\r
370 int fill_at;\r
371 pq_id_t id = pq_new_id(pq, priority);\r
372\r
373 DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );\r
374 if (pq->start == pq->end) {\r
375 DEBUG( fprintf(stderr, " - on empty queue\n") );\r
376 /* allow room at front and back for new entries */\r
377 pq->start = pq->alloc / 3;\r
378 pq->end = pq->start + 1;\r
379 fill_at = pq->start;\r
380 }\r
381 else if (priority >= pq->entries[pq->end-1].priority) {\r
382 DEBUG( fprintf(stderr, " - at the end\n") );\r
383 if (pq->end == pq->alloc)\r
384 /* past the end - need to realloc or make some space */\r
385 pq_realloc(pq, AT_END);\r
386 \r
387 fill_at = pq->end;\r
388 ++pq->end;\r
389 }\r
390 else if (priority < pq->entries[pq->start].priority) {\r
391 DEBUG( fprintf(stderr, " - at the front\n") );\r
392 if (pq->start == 0)\r
393 /* no space at the front, make some */\r
394 pq_realloc(pq, AT_START);\r
395\r
396 --pq->start;\r
397 fill_at = pq->start;\r
398 }\r
399 else {\r
400 int i;\r
401 DEBUG( fprintf(stderr, " - in the middle\n") );\r
402 i = pq_insertion_point(pq, priority);\r
403 \r
404 /* if we're near the end we want to push entries up, otherwise down */\r
405 if (i - pq->start > (pq->end - pq->start) / 2) {\r
406 DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",\r
407 i, pq->start, pq->end) );\r
408 /* make sure we have space, this might end up copying twice, \r
409 but too bad for now */\r
410 if (pq->end == pq->alloc) {\r
411 int old_start = pq->start;\r
412 pq_realloc(pq, AT_END);\r
413 i += pq->start - old_start;\r
414 }\r
415 \r
c50a32c9 416 pq_move_items(pq, i+1, i, pq->end - i);\r
a0e4f61f
TC
417 ++pq->end;\r
418 fill_at = i;\r
419 }\r
420 else {\r
421 DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",\r
422 i, pq->start, pq->end) );\r
423 if (pq->start == 0) {\r
424 pq_realloc(pq, AT_START);\r
425 i += pq->start;\r
426 }\r
c50a32c9 427 pq_move_items(pq, pq->start-1, pq->start, i - pq->start);\r
a0e4f61f
TC
428 --pq->start;\r
429 fill_at = i-1;\r
430 }\r
431 }\r
432 pq->entries[fill_at].priority = priority;\r
433 pq->entries[fill_at].id = id;\r
434 pq->entries[fill_at].payload = newSVsv(payload);\r
435\r
436 return id;\r
437}\r
438\r
439/*\r
440 Note: it's up to the caller to release the SV. The XS code does this \r
441 by making it mortal.\r
442*/\r
443int\r
444pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {\r
445 pq_entry *entry;\r
446 /* the caller needs to release the payload (somehow) */\r
447 if (pq->start == pq->end)\r
448 return 0;\r
449\r
450 entry = pq->entries + pq->start++;\r
451 *priority = entry->priority;\r
452 *id = entry->id;\r
453 *payload = entry->payload;\r
454 pq_release_id(pq, entry->id);\r
455\r
456 return 1;\r
457}\r
458\r
459int\r
460pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {\r
461 if (pq->start == pq->end)\r
462 return 0;\r
463\r
464 *priority = pq->entries[pq->start].priority;\r
465 return 1;\r
466}\r
467\r
468int\r
469pq_get_item_count(poe_queue *pq) {\r
470 return pq->end - pq->start;\r
471}\r
472\r
473/*\r
474pq_test_filter - the XS magic involved in passing the payload to a\r
475filter function.\r
476*/\r
477static\r
478int\r
479pq_test_filter(pq_entry *entry, SV *filter) {\r
480 /* man perlcall for the magic here */\r
481 dSP;\r
482 int count;\r
483 SV *result_sv;\r
484 int result;\r
485\r
486 ENTER;\r
487 SAVETMPS;\r
488 PUSHMARK(SP);\r
489 XPUSHs(sv_2mortal(newSVsv(entry->payload)));\r
490 PUTBACK;\r
491\r
492 count = call_sv(filter, G_SCALAR);\r
493\r
494 SPAGAIN;\r
495\r
496 if (count != 1) \r
497 croak("got other than 1 value in scalar context");\r
498\r
499 result_sv = POPs;\r
500 result = SvTRUE(result_sv);\r
501\r
502 PUTBACK;\r
503 FREETMPS;\r
504 LEAVE;\r
505\r
506 return result;\r
507}\r
508\r
509/*\r
510pq_find_item - search for an item we know is there.\r
511\r
512Internal.\r
513*/\r
514static\r
515int\r
516pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {\r
517 int i;\r
518\r
d5e34ea9
TC
519 STATS(++pq->total_finds);\r
520 if (pq->end - pq->start < LARGE_QUEUE_SIZE) {\r
521 for (i = pq->start; i < pq->end; ++i) {\r
522 if (pq->entries[i].id == id)\r
523 return i;\r
524 }\r
525 DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );\r
526 croak("Internal inconsistency: event should have been found");\r
527 }\r
528\r
529 /* try a binary search */\r
530 /* simply translated from the perl */\r
531 STATS(++pq->binary_finds);\r
532 {\r
533 int lower = pq->start;\r
534 int upper = pq->end - 1;\r
535 int linear_point;\r
536 while (1) {\r
537 int midpoint = (upper + lower) >> 1;\r
538 if (upper < lower) {\r
539 croak("Internal inconsistency, priorities out of order");\r
540 }\r
541 if (priority < pq->entries[midpoint].priority) {\r
542 upper = midpoint - 1;\r
543 continue;\r
544 }\r
545 if (priority > pq->entries[midpoint].priority) {\r
546 lower = midpoint + 1;\r
547 continue;\r
548 }\r
549 linear_point = midpoint;\r
550 while (linear_point >= pq->start &&\r
551 priority == pq->entries[linear_point].priority) {\r
552 if (pq->entries[linear_point].id == id)\r
553 return linear_point;\r
554 --linear_point;\r
555 }\r
556 linear_point = midpoint;\r
557 while ( (++linear_point < pq->end) &&\r
558 priority == pq->entries[linear_point].priority) {\r
559 if (pq->entries[linear_point].id == id)\r
560 return linear_point;\r
561 }\r
562\r
563 croak("internal inconsistency: event should have been found");\r
564 }\r
a0e4f61f 565 }\r
a0e4f61f
TC
566}\r
567\r
568int\r
569pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {\r
570 pq_priority_t priority;\r
571 int index;\r
572\r
573 if (!pq_item_priority(pq, id, &priority)) {\r
574 errno = ESRCH;\r
575 return 0;\r
576 }\r
577\r
578 index = pq_find_item(pq, id, priority);\r
579\r
580 if (!pq_test_filter(pq->entries + index, filter)) {\r
581 errno = EPERM;\r
582 return 0;\r
583 }\r
584\r
585 *removed = pq->entries[index];\r
586 pq_release_id(pq, id);\r
587 if (index == pq->start) {\r
588 ++pq->start;\r
589 }\r
590 else if (index == pq->end - 1) {\r
591 --pq->end;\r
592 }\r
593 else {\r
c50a32c9 594 pq_move_items(pq, index, index+1, pq->end - index - 1);\r
a0e4f61f
TC
595 --pq->end;\r
596 }\r
c50a32c9
TC
597 DEBUG( fprintf(stderr, "removed (%d, %p (%d))\n", id, removed->payload,\r
598 SvREFCNT(removed->payload)) );\r
a0e4f61f
TC
599\r
600 return 1;\r
601}\r
602\r
603int\r
604pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {\r
605 int in_index, out_index;\r
606 int remove_count = 0;\r
607 \r
608 *entries = NULL;\r
609 if (pq->start == pq->end)\r
610 return 0;\r
611\r
1bbcbbe4 612 *entries = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));\r
a0e4f61f
TC
613 if (!*entries)\r
614 croak("Out of memory");\r
615 \r
616 in_index = out_index = pq->start;\r
617 while (in_index < pq->end && remove_count < max_count) {\r
618 if (pq_test_filter(pq->entries + in_index, filter)) {\r
619 pq_release_id(pq, pq->entries[in_index].id);\r
620 (*entries)[remove_count++] = pq->entries[in_index++];\r
621 }\r
622 else {\r
623 pq->entries[out_index++] = pq->entries[in_index++];\r
624 }\r
625 }\r
626 while (in_index < pq->end) {\r
627 pq->entries[out_index++] = pq->entries[in_index++];\r
628 }\r
629 pq->end = out_index;\r
630 \r
631 return remove_count;\r
632}\r
633\r
634/*\r
635We need to keep the following 2 functions in sync (or combine the\r
636common code.)\r
637*/\r
638int\r
639pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {\r
640 pq_priority_t old_priority;\r
641 int index, insert_at;\r
642\r
643 if (!pq_item_priority(pq, id, &old_priority)) {\r
644 errno = ESRCH;\r
645 return 0;\r
646 }\r
647\r
648 index = pq_find_item(pq, id, old_priority);\r
649\r
650 if (!pq_test_filter(pq->entries + index, filter)) {\r
651 errno = EPERM;\r
652 return 0;\r
653 }\r
654\r
655 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );\r
656\r
657 if (pq->end - pq->start == 1) {\r
658 DEBUG( fprintf(stderr, " -- one item\n") );\r
659 /* only the one item anyway */\r
660 pq->entries[pq->start].priority = new_priority;\r
661 }\r
662 else {\r
663 insert_at = pq_insertion_point(pq, new_priority);\r
664 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );\r
665 /* the item is still in the queue, so either side of it means it\r
666 won't move */\r
667 if (insert_at == index || insert_at == index+1) {\r
668 DEBUG( fprintf(stderr, " -- change in place\n") );\r
669 pq->entries[index].priority = new_priority;\r
670 }\r
671 else {\r
672 pq_entry saved = pq->entries[index];\r
673 saved.priority = new_priority;\r
674\r
675 if (insert_at < index) {\r
676 DEBUG( fprintf(stderr, " - insert_at < index\n") );\r
c50a32c9 677 pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);\r
a0e4f61f
TC
678 pq->entries[insert_at] = saved;\r
679 }\r
680 else {\r
681 DEBUG( fprintf(stderr, " - insert_at > index\n") );\r
682 --insert_at;\r
c50a32c9 683 pq_move_items(pq, index, index + 1, insert_at - index);\r
a0e4f61f
TC
684 pq->entries[insert_at] = saved;\r
685 }\r
686 }\r
687 }\r
688\r
689 pq_set_id_priority(pq, id, new_priority);\r
690\r
691 return 1; \r
692}\r
693\r
694int\r
695pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {\r
696 pq_priority_t old_priority, new_priority;\r
697 int index, insert_at;\r
698\r
699 DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );\r
700\r
701 if (!pq_item_priority(pq, id, &old_priority)) {\r
702 errno = ESRCH;\r
703 return 0;\r
704 }\r
705\r
706 index = pq_find_item(pq, id, old_priority);\r
707\r
708 if (!pq_test_filter(pq->entries + index, filter)) {\r
709 errno = EPERM;\r
710 return 0;\r
711 }\r
712\r
713 new_priority = old_priority + delta;\r
714\r
715 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );\r
716\r
717 if (pq->end - pq->start == 1) {\r
718 DEBUG( fprintf(stderr, " -- one item\n") );\r
719 /* only the one item anyway */\r
720 pq->entries[pq->start].priority = new_priority;\r
721 }\r
722 else {\r
723 insert_at = pq_insertion_point(pq, new_priority);\r
724 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );\r
725 /* the item is still in the queue, so either side of it means it\r
726 won't move */\r
727 if (insert_at == index || insert_at == index+1) {\r
728 DEBUG( fprintf(stderr, " -- change in place\n") );\r
729 pq->entries[index].priority = new_priority;\r
730 }\r
731 else {\r
732 pq_entry saved = pq->entries[index];\r
733 saved.priority = new_priority;\r
734\r
735 if (insert_at < index) {\r
736 DEBUG( fprintf(stderr, " - insert_at < index\n") );\r
c50a32c9 737 pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);\r
a0e4f61f
TC
738 pq->entries[insert_at] = saved;\r
739 }\r
740 else {\r
741 DEBUG( fprintf(stderr, " - insert_at > index\n") );\r
742 --insert_at;\r
c50a32c9 743 pq_move_items(pq, index, index + 1, insert_at - index);\r
a0e4f61f
TC
744 pq->entries[insert_at] = saved;\r
745 }\r
746 }\r
747 }\r
748\r
749 pq_set_id_priority(pq, id, new_priority);\r
750 *priority = new_priority;\r
751\r
752 return 1; \r
753}\r
754\r
755int\r
756pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {\r
757 int count = 0;\r
758 int i;\r
759\r
760 *items = NULL;\r
761 if (pq->end == pq->start)\r
762 return 0;\r
763\r
1bbcbbe4 764 *items = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));\r
a0e4f61f
TC
765 for (i = pq->start; i < pq->end; ++i) {\r
766 if (pq_test_filter(pq->entries + i, filter)) {\r
767 (*items)[count++] = pq->entries[i];\r
768 }\r
769 }\r
770 if (!count) {\r
1bbcbbe4 771 myfree(*items);\r
a0e4f61f
TC
772 *items = NULL;\r
773 }\r
774\r
775 return count;\r
776}\r
777\r
778/*\r
779pq_dump - dump the internals of the queue structure.\r
780*/\r
781void\r
782pq_dump(poe_queue *pq) {\r
783 int i;\r
784 HE *he;\r
785\r
b51e0be1 786 fprintf(stderr, "poe_queue\n");\r
a0e4f61f
TC
787 fprintf(stderr, " start: %d\n", pq->start);\r
788 fprintf(stderr, " end: %d\n", pq->end);\r
789 fprintf(stderr, " alloc: %d\n", pq->alloc);\r
790 fprintf(stderr, " seq: %d\n", pq->queue_seq);\r
b51e0be1
TC
791 fprintf(stderr, " **Queue Entries:\n"\r
792 " index: id priority SV\n");\r
a0e4f61f
TC
793 for (i = pq->start; i < pq->end; ++i) {\r
794 pq_entry *entry = pq->entries + i;\r
795 fprintf(stderr, " %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,\r
796 entry->payload, (unsigned)SvREFCNT(entry->payload));\r
797 }\r
b51e0be1 798 fprintf(stderr, " **Hash entries:\n");\r
a0e4f61f
TC
799 hv_iterinit(pq->ids);\r
800 while ((he = hv_iternext(pq->ids)) != NULL) {\r
801 STRLEN len;\r
d5e34ea9
TC
802 fprintf(stderr, " %d => %f\n", *(pq_id_t *)HePV(he, len), SvNV(hv_iterval(pq->ids, he)));\r
803 }\r
804}\r
805\r
806/*\r
807pq_verify - basic verification of the structure of the queue\r
808\r
809For now check for duplicate ids in sequence.\r
810*/\r
811void\r
812pq_verify(poe_queue *pq) {\r
813 int i;\r
814 int lastid;\r
815 int found_err = 0;\r
816\r
817 if (pq->start != pq->end) {\r
818 lastid = pq->entries[pq->start].id;\r
819 i = pq->start + 1;\r
820 while (i < pq->end) {\r
821 if (pq->entries[i].id == lastid) {\r
822 fprintf(stderr, "Duplicate id %d at %d\n", lastid, i);\r
823 ++found_err;\r
824 }\r
825 ++i;\r
826 }\r
827 }\r
828 if (found_err) {\r
829 pq_dump(pq);\r
830 exit(1);\r
a0e4f61f
TC
831 }\r
832}\r
70aaf253
TC
833\r
834/*\r
835pq__set_errno_queue - set errno\r
836\r
837This just sets errno for testing purposes.\r
838*/\r
839void\r
840pq__set_errno_queue(int value) {\r
841 errno = value;\r
842}\r
843\r