update MANIFEST
[poe-xs-queue-array.git] / queue.c
1 #include "EXTERN.h"\r
2 #include "perl.h"\r
3 #include "XSUB.h"\r
4 \r
5 #include "queue.h"\r
6 #include "alloc.h"\r
7 \r
8 /*#define DEBUG(x) x*/\r
9 #define DEBUG(x)\r
10 /*#define DEBUG_ERR(x) x*/\r
11 #define DEBUG_ERR(x)\r
12 \r
13 /*#define KEEP_STATS 1*/\r
14 \r
15 #if KEEP_STATS\r
16 #define STATS(x) x\r
17 #else\r
18 #define STATS(x)\r
19 #endif\r
20 \r
21 #define PQ_START_SIZE 10\r
22 #define AT_START 0\r
23 #define AT_END 1\r
24 \r
25 #define STUPID_IDS 0\r
26 \r
27 #define LARGE_QUEUE_SIZE 50\r
28 \r
29 /*\r
30 We store the queue in a similar way to the way perl deals with arrays,\r
31 we keep a block of memory, but the first element may or may not be in use,\r
32 depending on the pattern of usage.\r
33 \r
34 There's 3 value controlling usage of the array:\r
35 \r
36   - alloc - the number of elements allocated in total\r
37   - start - the first element in use in the array\r
38   - end - one past the end of the last element in the array\r
39 \r
40 This has the properties that:\r
41 \r
42   start == 0 - no space at the front\r
43   end == alloc - no space at the end\r
44   end - start - number of elements in the queue\r
45 \r
46 We use a perl hash (HV *) to store the mapping from ids to priorities.\r
47 \r
48 */\r
49 struct poe_queue_tag {\r
50   /* the first entry in use */\r
51   int start;\r
52 \r
53   /* 1 past the last entry in use, hence end - start is the number of \r
54      entries in the queue */\r
55   int end;\r
56 \r
57   /* the total number of entries allocated */\r
58   int alloc;\r
59 \r
60   /* used to generate item ids */\r
61   pq_id_t queue_seq;\r
62 \r
63   /* used to track in use item ids */\r
64   HV *ids;\r
65 \r
66   /* the actual entries */\r
67   pq_entry *entries;\r
68 \r
69 #if KEEP_STATS\r
70   int total_finds;\r
71   int binary_finds;\r
72 #endif\r
73 };\r
74 \r
75 /*\r
76 poe_create - create a new queue object.\r
77 \r
78 No parameters.  returns the new queue object.\r
79 \r
80 */\r
81 poe_queue *\r
82 pq_create(void) {\r
83   poe_queue *pq = mymalloc(sizeof(poe_queue));\r
84   \r
85   if (pq == NULL)\r
86     croak("Out of memory");\r
87   pq->start = 0;\r
88   pq->end = 0;\r
89   pq->alloc = PQ_START_SIZE;\r
90   pq->queue_seq = 0;\r
91   pq->ids = newHV();\r
92   pq->entries = mymalloc(sizeof(pq_entry) * PQ_START_SIZE);\r
93   memset(pq->entries, 0, sizeof(pq_entry) * PQ_START_SIZE);\r
94   if (pq->entries == NULL)\r
95     croak("Out of memory");\r
96 \r
97 #if KEEP_STATS\r
98   pq->total_finds = pq->binary_finds = 0;\r
99 #endif\r
100 \r
101   DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );\r
102 \r
103   return pq;\r
104 }\r
105 \r
106 /*\r
107 pq_delete - release the queue object.\r
108 \r
109 This also releases one reference from each SV in the queue.\r
110 \r
111 */\r
112 void\r
113 pq_delete(poe_queue *pq) {\r
114   int i;\r
115 \r
116   DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
117   if (pq->end > pq->start) {\r
118     for (i = pq->start; i < pq->end; ++i) {\r
119       SvREFCNT_dec(pq->entries[i].payload);\r
120     }\r
121   }\r
122   SvREFCNT_dec((SV *)pq->ids);\r
123   pq->ids = NULL;\r
124   if (pq->entries)\r
125     myfree(pq->entries);\r
126   pq->entries = NULL;\r
127   myfree(pq);\r
128 }\r
129 \r
130 /*\r
131 pq_new_id - generate a new item id.\r
132 \r
133 Internal use only.\r
134 \r
135 This, the following 3 functions and pq_create, pq_delete, should be\r
136 all that needs to be modified if we change hash implementations.\r
137 \r
138 */\r
139 static\r
140 pq_id_t\r
141 pq_new_id(poe_queue *pq, pq_priority_t priority) {\r
142 #if STUPID_IDS\r
143   int seq;\r
144   int i;\r
145   int found;\r
146   \r
147   do {\r
148     seq = ++pq->queue_seq;\r
149     found = 0;\r
150     for (i = pq->start; i < pq->end; ++i) {\r
151       if (pq->entries[i].id == seq) {\r
152         found = 1;\r
153         break;\r
154       }\r
155     }\r
156   } while (found);\r
157 \r
158   return seq;\r
159 #else\r
160   pq_id_t seq = ++pq->queue_seq;;\r
161 \r
162   while (hv_exists(pq->ids, (char *)&seq, sizeof(seq))) {\r
163     seq = ++pq->queue_seq;\r
164   }\r
165   hv_store(pq->ids, (char *)&seq, sizeof(seq), newSVnv(priority), 0);\r
166 #endif\r
167 \r
168   return seq;\r
169 }\r
170 \r
171 /*\r
172 pq_release_id - releases an id for future use.\r
173 */\r
174 static\r
175 void\r
176 pq_release_id(poe_queue *pq, pq_id_t id) {\r
177 #if STUPID_IDS\r
178 #else\r
179   hv_delete(pq->ids, (char *)&id, sizeof(id), 0);\r
180 #endif\r
181 }\r
182 \r
183 /*\r
184 pq_item_priority - get the priority of an item given it's id\r
185 */\r
186 static\r
187 int\r
188 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {\r
189 #if STUPID_IDS\r
190   int i;\r
191 \r
192   for (i = pq->start; i < pq->end; ++i) {\r
193     if (pq->entries[i].id == id) {\r
194       *priority = pq->entries[i].priority;\r
195       return 1;\r
196     }\r
197   }\r
198 \r
199   return 0;\r
200 #else\r
201   SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);\r
202 \r
203   if (!entry || !*entry)\r
204     return 0;\r
205 \r
206   *priority = SvNV(*entry);\r
207 \r
208   return 1;\r
209 #endif\r
210 }\r
211 \r
212 /*\r
213 pq_set_id_priority - set the priority of an item in the id hash\r
214 */\r
215 static\r
216 void\r
217 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {\r
218 #if STUPID_IDS\r
219   /* nothing to do, caller set it in the array */\r
220 #else\r
221   SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);\r
222 \r
223   if (!entry && !*entry)\r
224     croak("pq_set_priority: id not found");\r
225 \r
226   sv_setnv(*entry, new_priority);\r
227 #endif\r
228 }\r
229 \r
230 /*\r
231 pq_move_items - moves items around.\r
232 \r
233 This encapsulates the old calls to memmove(), providing a single place\r
234 to add error checking.\r
235 */\r
236 static void\r
237 pq_move_items(poe_queue *pq, int target, int src, int count) {\r
238 \r
239   DEBUG_ERR(\r
240   {\r
241     int die = 0;\r
242     if (src < pq->start) {\r
243       fprintf(stderr, "src %d less than start %d\n", src, pq->start);\r
244       ++die;\r
245     }\r
246     if (src + count > pq->end) {\r
247       fprintf(stderr, "src %d + count %d beyond end %d\n", src, count, pq->end);\r
248       ++die;\r
249     }\r
250     if (target < 0) {\r
251       fprintf(stderr, "target %d < 0\n", target);\r
252       ++die;\r
253     }\r
254     if (target + count > pq->alloc) {\r
255       fprintf(stderr, "target %d + count %d > alloc %d\n", target, count, pq->alloc);\r
256       ++die;\r
257     }\r
258     if (die) *(char *)0 = '\0';\r
259   }\r
260   )\r
261   memmove(pq->entries + target, pq->entries + src, count * sizeof(pq_entry));\r
262 }\r
263 \r
264 /*\r
265 pq_realloc - make space at the front of back of the queue.\r
266 \r
267 This adjusts the queue to allow insertion of a single item at the\r
268 front or the back of the queue.\r
269 \r
270 If the queue has 33% or more space available we simple adjust the\r
271 position of the in-use items within the array.  We try not to push the\r
272 items right up against the opposite end of the array, since we might\r
273 need to insert items there too.\r
274 \r
275 If the queue has less than 33% space available we allocate another 50%\r
276 space.  We then only move the queue elements if we need space at the\r
277 front, since the reallocation has just opened up a huge space at the\r
278 back.  Since we're reallocating exponentially larger sizes we should\r
279 have a constant time cost on reallocation per queue item stored (but\r
280 other costs are going to be higher.)\r
281 \r
282 */\r
283 static\r
284 void\r
285 pq_realloc(poe_queue *pq, int at_end) {\r
286   int count = pq->end - pq->start;\r
287 \r
288   DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );\r
289   if (count * 3 / 2 < pq->alloc) {\r
290     /* 33 % or more space available, use some of it */\r
291     int new_start;\r
292 \r
293     if (at_end) {\r
294       new_start = (pq->alloc - count) / 3;\r
295     }\r
296     else {\r
297       new_start = (pq->alloc - count) * 2 / 3;\r
298     }\r
299     DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );\r
300     pq_move_items(pq, new_start, pq->start, count);\r
301     pq->start = new_start;\r
302     pq->end = new_start + count;\r
303   }\r
304   else {\r
305     int new_alloc = pq->alloc * 3 / 2;\r
306     pq->entries = myrealloc(pq->entries, sizeof(pq_entry) * new_alloc);\r
307     pq->alloc = new_alloc;\r
308 \r
309     if (!pq->entries)\r
310       croak("Out of memory");\r
311 \r
312     DEBUG( fprintf(stderr, "  - expanding to %d entries\n", new_alloc) );\r
313 \r
314     if (!at_end) {\r
315       int new_start = (new_alloc - count) * 2 / 3;\r
316       DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );\r
317       pq_move_items(pq, new_start, pq->start, count);\r
318       pq->start = new_start;\r
319       pq->end = new_start + count;\r
320     }\r
321   }\r
322   DEBUG( fprintf(stderr, "  final: %d %d %d\n", pq->start, pq->end, pq->alloc) );\r
323 }\r
324 \r
325 /*\r
326 pq_insertion_point - figure out where to insert an item with the given\r
327 priority\r
328 \r
329 Internal.\r
330 */\r
331 static\r
332 int\r
333 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {\r
334   if (pq->end - pq->start < LARGE_QUEUE_SIZE) {\r
335     int i = pq->end;\r
336     while (i > pq->start &&\r
337            priority < pq->entries[i-1].priority) {\r
338       --i;\r
339     }\r
340     return i;\r
341   }\r
342   else {\r
343     int lower = pq->start;\r
344     int upper = pq->end - 1;\r
345     while (1) {\r
346       int midpoint = (lower + upper) >> 1;\r
347 \r
348       if (upper < lower)\r
349         return lower;\r
350       \r
351       if (priority < pq->entries[midpoint].priority) {\r
352         upper = midpoint - 1;\r
353         continue;\r
354       }\r
355       if (priority > pq->entries[midpoint].priority) {\r
356         lower = midpoint + 1;\r
357         continue;\r
358       }\r
359       while (midpoint < pq->end &&\r
360              priority == pq->entries[midpoint].priority) {\r
361         ++midpoint;\r
362       }\r
363       return midpoint;\r
364     }\r
365   }\r
366 }\r
367 \r
368 int\r
369 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {\r
370   int fill_at;\r
371   pq_id_t id = pq_new_id(pq, priority);\r
372 \r
373   DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );\r
374   if (pq->start == pq->end) {\r
375     DEBUG( fprintf(stderr, "  - on empty queue\n") );\r
376     /* allow room at front and back for new entries */\r
377     pq->start = pq->alloc / 3;\r
378     pq->end = pq->start + 1;\r
379     fill_at = pq->start;\r
380   }\r
381   else if (priority >= pq->entries[pq->end-1].priority) {\r
382     DEBUG( fprintf(stderr, "  - at the end\n") );\r
383     if (pq->end == pq->alloc)\r
384       /* past the end - need to realloc or make some space */\r
385       pq_realloc(pq, AT_END);\r
386     \r
387     fill_at = pq->end;\r
388     ++pq->end;\r
389   }\r
390   else if (priority < pq->entries[pq->start].priority) {\r
391     DEBUG( fprintf(stderr, "  - at the front\n") );\r
392     if (pq->start == 0)\r
393       /* no space at the front, make some */\r
394       pq_realloc(pq, AT_START);\r
395 \r
396     --pq->start;\r
397     fill_at = pq->start;\r
398   }\r
399   else {\r
400     int i;\r
401     DEBUG( fprintf(stderr, "  - in the middle\n") );\r
402     i = pq_insertion_point(pq, priority);\r
403     \r
404     /* if we're near the end we want to push entries up, otherwise down */\r
405     if (i - pq->start > (pq->end - pq->start) / 2) {\r
406       DEBUG( fprintf(stderr, "    - closer to the back (%d -> [ %d %d ])\n",\r
407                      i, pq->start, pq->end) );\r
408       /* make sure we have space, this might end up copying twice, \r
409          but too bad for now */\r
410       if (pq->end == pq->alloc) {\r
411         int old_start = pq->start;\r
412         pq_realloc(pq, AT_END);\r
413         i += pq->start - old_start;\r
414       }\r
415       \r
416       pq_move_items(pq, i+1, i, pq->end - i);\r
417       ++pq->end;\r
418       fill_at = i;\r
419     }\r
420     else {\r
421       DEBUG( fprintf(stderr, "    - closer to the front (%d -> [ %d %d ])\n",\r
422                      i, pq->start, pq->end) );\r
423       if (pq->start == 0) {\r
424         pq_realloc(pq, AT_START);\r
425         i += pq->start;\r
426       }\r
427       pq_move_items(pq, pq->start-1, pq->start, i - pq->start);\r
428       --pq->start;\r
429       fill_at = i-1;\r
430     }\r
431   }\r
432   pq->entries[fill_at].priority = priority;\r
433   pq->entries[fill_at].id = id;\r
434   pq->entries[fill_at].payload = newSVsv(payload);\r
435 \r
436   return id;\r
437 }\r
438 \r
439 /*\r
440   Note: it's up to the caller to release the SV.  The XS code does this \r
441   by making it mortal.\r
442 */\r
443 int\r
444 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {\r
445   pq_entry *entry;\r
446   /* the caller needs to release the payload (somehow) */\r
447   if (pq->start == pq->end)\r
448     return 0;\r
449 \r
450   entry = pq->entries + pq->start++;\r
451   *priority = entry->priority;\r
452   *id = entry->id;\r
453   *payload = entry->payload;\r
454   pq_release_id(pq, entry->id);\r
455 \r
456   return 1;\r
457 }\r
458 \r
459 int\r
460 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {\r
461   if (pq->start == pq->end)\r
462     return 0;\r
463 \r
464   *priority = pq->entries[pq->start].priority;\r
465   return 1;\r
466 }\r
467 \r
468 int\r
469 pq_get_item_count(poe_queue *pq) {\r
470   return pq->end - pq->start;\r
471 }\r
472 \r
473 /*\r
474 pq_test_filter - the XS magic involved in passing the payload to a\r
475 filter function.\r
476 */\r
477 static\r
478 int\r
479 pq_test_filter(pq_entry *entry, SV *filter) {\r
480   /* man perlcall for the magic here */\r
481   dSP;\r
482   int count;\r
483   SV *result_sv;\r
484   int result;\r
485 \r
486   ENTER;\r
487   SAVETMPS;\r
488   PUSHMARK(SP);\r
489   XPUSHs(sv_2mortal(newSVsv(entry->payload)));\r
490   PUTBACK;\r
491 \r
492   count = call_sv(filter, G_SCALAR);\r
493 \r
494   SPAGAIN;\r
495 \r
496   if (count != 1) \r
497     croak("got other than 1 value in scalar context");\r
498 \r
499   result_sv = POPs;\r
500   result = SvTRUE(result_sv);\r
501 \r
502   PUTBACK;\r
503   FREETMPS;\r
504   LEAVE;\r
505 \r
506   return result;\r
507 }\r
508 \r
509 /*\r
510 pq_find_item - search for an item we know is there.\r
511 \r
512 Internal.\r
513 */\r
514 static\r
515 int\r
516 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {\r
517   int i;\r
518 \r
519   STATS(++pq->total_finds);\r
520   if (pq->end - pq->start < LARGE_QUEUE_SIZE) {\r
521     for (i = pq->start; i < pq->end; ++i) {\r
522       if (pq->entries[i].id == id)\r
523         return i;\r
524     }\r
525     DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );\r
526     croak("Internal inconsistency: event should have been found");\r
527   }\r
528 \r
529   /* try a binary search */\r
530   /* simply translated from the perl */\r
531   STATS(++pq->binary_finds);\r
532   {\r
533     int lower = pq->start;\r
534     int upper = pq->end - 1;\r
535     int linear_point;\r
536     while (1) {\r
537       int midpoint = (upper + lower) >> 1;\r
538       if (upper < lower) {\r
539         croak("Internal inconsistency, priorities out of order");\r
540       }\r
541       if (priority < pq->entries[midpoint].priority) {\r
542         upper = midpoint - 1;\r
543         continue;\r
544       }\r
545       if (priority > pq->entries[midpoint].priority) {\r
546         lower = midpoint + 1;\r
547         continue;\r
548       }\r
549       linear_point = midpoint;\r
550       while (linear_point >= pq->start &&\r
551              priority == pq->entries[linear_point].priority) {\r
552         if (pq->entries[linear_point].id == id)\r
553           return linear_point;\r
554         --linear_point;\r
555       }\r
556       linear_point = midpoint;\r
557       while ( (++linear_point < pq->end) &&\r
558               priority == pq->entries[linear_point].priority) {\r
559         if (pq->entries[linear_point].id == id)\r
560           return linear_point;\r
561       }\r
562 \r
563       croak("internal inconsistency: event should have been found");\r
564     }\r
565   }\r
566 }\r
567 \r
568 int\r
569 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {\r
570   pq_priority_t priority;\r
571   int index;\r
572 \r
573   if (!pq_item_priority(pq, id, &priority)) {\r
574     errno = ESRCH;\r
575     return 0;\r
576   }\r
577 \r
578   index = pq_find_item(pq, id, priority);\r
579 \r
580   if (!pq_test_filter(pq->entries + index, filter)) {\r
581     errno = EPERM;\r
582     return 0;\r
583   }\r
584 \r
585   *removed = pq->entries[index];\r
586   pq_release_id(pq, id);\r
587   if (index == pq->start) {\r
588     ++pq->start;\r
589   }\r
590   else if (index == pq->end - 1) {\r
591     --pq->end;\r
592   }\r
593   else {\r
594     pq_move_items(pq, index, index+1, pq->end - index - 1);\r
595     --pq->end;\r
596   }\r
597   DEBUG( fprintf(stderr, "removed (%d, %p (%d))\n", id, removed->payload,\r
598                  SvREFCNT(removed->payload)) );\r
599 \r
600   return 1;\r
601 }\r
602 \r
603 int\r
604 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {\r
605   int in_index, out_index;\r
606   int remove_count = 0;\r
607   \r
608   *entries = NULL;\r
609   if (pq->start == pq->end)\r
610     return 0;\r
611 \r
612   *entries = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));\r
613   if (!*entries)\r
614     croak("Out of memory");\r
615   \r
616   in_index = out_index = pq->start;\r
617   while (in_index < pq->end && remove_count < max_count) {\r
618     if (pq_test_filter(pq->entries + in_index, filter)) {\r
619       pq_release_id(pq, pq->entries[in_index].id);\r
620       (*entries)[remove_count++] = pq->entries[in_index++];\r
621     }\r
622     else {\r
623       pq->entries[out_index++] = pq->entries[in_index++];\r
624     }\r
625   }\r
626   while (in_index < pq->end) {\r
627     pq->entries[out_index++] = pq->entries[in_index++];\r
628   }\r
629   pq->end = out_index;\r
630   \r
631   return remove_count;\r
632 }\r
633 \r
634 /*\r
635 We need to keep the following 2 functions in sync (or combine the\r
636 common code.)\r
637 */\r
638 int\r
639 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {\r
640   pq_priority_t old_priority;\r
641   int index, insert_at;\r
642 \r
643   if (!pq_item_priority(pq, id, &old_priority)) {\r
644     errno = ESRCH;\r
645     return 0;\r
646   }\r
647 \r
648   index = pq_find_item(pq, id, old_priority);\r
649 \r
650   if (!pq_test_filter(pq->entries + index, filter)) {\r
651     errno = EPERM;\r
652     return 0;\r
653   }\r
654 \r
655   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );\r
656 \r
657   if (pq->end - pq->start == 1) {\r
658     DEBUG( fprintf(stderr, "   -- one item\n") );\r
659     /* only the one item anyway */\r
660     pq->entries[pq->start].priority = new_priority;\r
661   }\r
662   else {\r
663     insert_at = pq_insertion_point(pq, new_priority);\r
664     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );\r
665     /* the item is still in the queue, so either side of it means it\r
666        won't move */\r
667     if (insert_at == index || insert_at == index+1) {\r
668       DEBUG( fprintf(stderr, "   -- change in place\n") );\r
669       pq->entries[index].priority = new_priority;\r
670     }\r
671     else {\r
672       pq_entry saved = pq->entries[index];\r
673       saved.priority = new_priority;\r
674 \r
675       if (insert_at < index) {\r
676         DEBUG( fprintf(stderr, "  - insert_at < index\n") );\r
677         pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);\r
678         pq->entries[insert_at] = saved;\r
679       }\r
680       else {\r
681         DEBUG( fprintf(stderr, "  - insert_at > index\n") );\r
682         --insert_at;\r
683         pq_move_items(pq, index, index + 1, insert_at - index);\r
684         pq->entries[insert_at] = saved;\r
685       }\r
686     }\r
687   }\r
688 \r
689   pq_set_id_priority(pq, id, new_priority);\r
690 \r
691   return 1;  \r
692 }\r
693 \r
694 int\r
695 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {\r
696   pq_priority_t old_priority, new_priority;\r
697   int index, insert_at;\r
698 \r
699   DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );\r
700 \r
701   if (!pq_item_priority(pq, id, &old_priority)) {\r
702     errno = ESRCH;\r
703     return 0;\r
704   }\r
705 \r
706   index = pq_find_item(pq, id, old_priority);\r
707 \r
708   if (!pq_test_filter(pq->entries + index, filter)) {\r
709     errno = EPERM;\r
710     return 0;\r
711   }\r
712 \r
713   new_priority = old_priority + delta;\r
714 \r
715   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );\r
716 \r
717   if (pq->end - pq->start == 1) {\r
718     DEBUG( fprintf(stderr, "   -- one item\n") );\r
719     /* only the one item anyway */\r
720     pq->entries[pq->start].priority = new_priority;\r
721   }\r
722   else {\r
723     insert_at = pq_insertion_point(pq, new_priority);\r
724     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );\r
725     /* the item is still in the queue, so either side of it means it\r
726        won't move */\r
727     if (insert_at == index || insert_at == index+1) {\r
728       DEBUG( fprintf(stderr, "   -- change in place\n") );\r
729       pq->entries[index].priority = new_priority;\r
730     }\r
731     else {\r
732       pq_entry saved = pq->entries[index];\r
733       saved.priority = new_priority;\r
734 \r
735       if (insert_at < index) {\r
736         DEBUG( fprintf(stderr, "  - insert_at < index\n") );\r
737         pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);\r
738         pq->entries[insert_at] = saved;\r
739       }\r
740       else {\r
741         DEBUG( fprintf(stderr, "  - insert_at > index\n") );\r
742         --insert_at;\r
743         pq_move_items(pq, index, index + 1, insert_at - index);\r
744         pq->entries[insert_at] = saved;\r
745       }\r
746     }\r
747   }\r
748 \r
749   pq_set_id_priority(pq, id, new_priority);\r
750   *priority = new_priority;\r
751 \r
752   return 1;  \r
753 }\r
754 \r
755 int\r
756 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {\r
757   int count = 0;\r
758   int i;\r
759 \r
760   *items = NULL;\r
761   if (pq->end == pq->start)\r
762     return 0;\r
763 \r
764   *items = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));\r
765   for (i = pq->start; i < pq->end; ++i) {\r
766     if (pq_test_filter(pq->entries + i, filter)) {\r
767       (*items)[count++] = pq->entries[i];\r
768     }\r
769   }\r
770   if (!count) {\r
771     myfree(*items);\r
772     *items = NULL;\r
773   }\r
774 \r
775   return count;\r
776 }\r
777 \r
778 /*\r
779 pq_dump - dump the internals of the queue structure.\r
780 */\r
781 void\r
782 pq_dump(poe_queue *pq) {\r
783   int i;\r
784   HE *he;\r
785 \r
786   fprintf(stderr, "poe_queue\n");\r
787   fprintf(stderr, "  start: %d\n", pq->start);\r
788   fprintf(stderr, "    end: %d\n", pq->end);\r
789   fprintf(stderr, "  alloc: %d\n", pq->alloc);\r
790   fprintf(stderr, "    seq: %d\n", pq->queue_seq);\r
791   fprintf(stderr, "  **Queue Entries:\n"\r
792          "      index:   id  priority    SV\n");\r
793   for (i = pq->start; i < pq->end; ++i) {\r
794     pq_entry *entry = pq->entries + i;\r
795     fprintf(stderr, "      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,\r
796            entry->payload, (unsigned)SvREFCNT(entry->payload));\r
797   }\r
798   fprintf(stderr, "  **Hash entries:\n");\r
799   hv_iterinit(pq->ids);\r
800   while ((he = hv_iternext(pq->ids)) != NULL) {\r
801     STRLEN len;\r
802     fprintf(stderr, "   %d => %f\n", *(pq_id_t *)HePV(he, len), SvNV(hv_iterval(pq->ids, he)));\r
803   }\r
804 }\r
805 \r
806 /*\r
807 pq_verify - basic verification of the structure of the queue\r
808 \r
809 For now check for duplicate ids in sequence.\r
810 */\r
811 void\r
812 pq_verify(poe_queue *pq) {\r
813   int i;\r
814   int lastid;\r
815   int found_err = 0;\r
816 \r
817   if (pq->start != pq->end) {\r
818     lastid = pq->entries[pq->start].id;\r
819     i = pq->start + 1;\r
820     while (i < pq->end) {\r
821       if (pq->entries[i].id == lastid) {\r
822         fprintf(stderr, "Duplicate id %d at %d\n", lastid, i);\r
823         ++found_err;\r
824       }\r
825       ++i;\r
826     }\r
827   }\r
828   if (found_err) {\r
829     pq_dump(pq);\r
830     exit(1);\r
831   }\r
832 }\r
833 \r
834 /*\r
835 pq__set_errno_queue - set errno\r
836 \r
837 This just sets errno for testing purposes.\r
838 */\r
839 void\r
840 pq__set_errno_queue(int value) {\r
841   errno = value;\r
842 }\r
843 \r