initial release
[poe-xs-queue-array.git] / Array.xs
1 #include "EXTERN.h"
2 #include "perl.h"
3 #include "XSUB.h"
4 #include <string.h> /* for memmove() mostly */
5 #include <errno.h> /* errno values */
6
7 /*#define DEBUG(x) x*/
8 #define DEBUG(x)
9
10 typedef unsigned pq_id_t;
11 typedef double pq_priority_t;
12
13 #define PQ_START_SIZE 10
14 #define AT_START 0
15 #define AT_END 1
16
17 pq_id_t queue_seq;
18
19 /* an entry in the queue */
20 typedef struct {
21   pq_priority_t priority;
22   pq_id_t id;
23   SV *payload;
24 } pq_entry;
25
26 /*
27 We store the queue in a similar way to the way perl deals with arrays,
28 we keep a block of memory, but the first element may or may not be in use,
29 depending on the pattern of usage.
30
31 There's 3 value controlling usage of the array:
32
33   - alloc - the number of elements allocated in total
34   - start - the first element in use in the array
35   - end - one past the end of the last element in the array
36
37 This has the properties that:
38
39   start == 0 - no space at the front
40   end == alloc - no space at the end
41   end - start - number of elements in the queue
42
43 We use a perl hash (HV *) to store the mapping from ids to priorities.
44
45 */
46 typedef struct {
47   /* the first entry in use */
48   int start;
49
50   /* 1 past the last entry in use, hence end - start is the number of 
51      entries in the queue */
52   int end;
53
54   /* the total number of entries allocated */
55   int alloc;
56
57   /* used to generate item ids */
58   pq_id_t queue_seq;
59
60   /* used to track in use item ids */
61   HV *ids;
62
63   /* the actual entries */
64   pq_entry *entries;
65 } poe_queue;
66
67 /*
68 poe_create - create a new queue object.
69
70 No parameters.  returns the new queue object.
71
72 */
73 poe_queue *
74 pq_create(void) {
75   poe_queue *pq = malloc(sizeof(poe_queue));
76   
77   if (pq == NULL)
78     croak("Out of memory");
79   pq->start = 0;
80   pq->end = 0;
81   pq->alloc = PQ_START_SIZE;
82   pq->queue_seq = 0;
83   pq->ids = newHV();
84   pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);
85   if (pq->entries == NULL)
86     croak("Out of memory");
87
88   DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
89
90   return pq;
91 }
92
93 /*
94 pq_delete - release the queue object.
95
96 This also releases one reference from each SV in the queue.
97
98 */
99 void
100 pq_delete(poe_queue *pq) {
101   int i;
102
103   DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
104   if (pq->end > pq->start) {
105     for (i = pq->start; i < pq->end; ++i) {
106       SvREFCNT_dec(pq->entries[i].payload);
107     }
108   }
109   SvREFCNT_dec((SV *)pq->ids);
110   pq->ids = NULL;
111   if (pq->entries)
112     free(pq->entries);
113   pq->entries = NULL;
114   free(pq);
115 }
116
117 /*
118 pq_new_id - generate a new item id.
119
120 Internal use only.
121
122 This, the following 3 functions and pq_create, pq_delete, should be
123 all that needs to be modified if we change hash implementations.
124
125 */
126 static
127 pq_id_t
128 pq_new_id(poe_queue *pq, pq_priority_t priority) {
129   int seq = ++pq->queue_seq;
130   SV *index = newSViv(seq);
131
132   while (hv_exists_ent(pq->ids, index, 0)) {
133     seq = ++pq->queue_seq;
134     sv_setiv(index, seq);
135   }
136   hv_store_ent(pq->ids, index, newSVnv(priority), 0);
137
138   return seq;
139 }
140
141 /*
142 pq_release_id - releases an id for future use.
143 */
144 static
145 void
146 pq_release_id(poe_queue *pq, pq_id_t id) {
147   SV *id_sv = sv_2mortal(newSViv(id));
148
149   hv_delete_ent(pq->ids, id_sv, 0, 0);
150 }
151
152 /*
153 pq_item_priority - get the priority of an item given it's id
154 */
155 static
156 int
157 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
158   HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
159
160   if (!entry)
161     return 0;
162
163   *priority = SvNV(HeVAL(entry));
164
165   return 1;
166 }
167
168 /*
169 pq_set_id_priority - set the priority of an item in the id hash
170 */
171 static
172 void
173 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
174   HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
175
176   if (!entry)
177     croak("pq_set_priority: id not found");
178
179   sv_setnv(HeVAL(entry), new_priority);
180 }
181
182 /*
183 pq_realloc - make space at the front of back of the queue.
184
185 This adjusts the queue to allow insertion of a single item at the
186 front or the back of the queue.
187
188 If the queue has 33% or more space available we simple adjust the
189 position of the in-use items within the array.  We try not to push the
190 items right up against the opposite end of the array, since we might
191 need to insert items there too.
192
193 If the queue has less than 33% space available we allocate another 50%
194 space.  We then only move the queue elements if we need space at the
195 front, since the reallocation has just opened up a huge space at the
196 back.  Since we're reallocating exponentially larger sizes we should
197 have a constant time cost on reallocation per queue item stored (but
198 other costs are going to be higher.)
199
200 */
201 static
202 void
203 pq_realloc(poe_queue *pq, int at_end) {
204   int count = pq->end - pq->start;
205
206   DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
207   if (count * 3 / 2 < pq->alloc) {
208     /* 33 % or more space available, use some of it */
209     int new_start;
210
211     if (at_end) {
212       new_start = (pq->alloc - count) / 3;
213     }
214     else {
215       new_start = (pq->alloc - count) * 2 / 3;
216     }
217     DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );
218     memmove(pq->entries + new_start, pq->entries + pq->start,
219             count * sizeof(pq_entry));
220     pq->start = new_start;
221     pq->end = new_start + count;
222   }
223   else {
224     int new_alloc = pq->alloc * 3 / 2;
225     pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);
226     pq->alloc = new_alloc;
227
228     if (!pq->entries)
229       croak("Out of memory");
230
231     DEBUG( fprintf(stderr, "  - expanding to %d entries\n", new_alloc) );
232
233     if (!at_end) {
234       int new_start = (new_alloc - count) * 2 / 3;
235       DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );
236       memmove(pq->entries + new_start, pq->entries + pq->start,
237               count * sizeof(pq_entry));
238       pq->start = new_start;
239       pq->end = new_start + count;
240     }
241   }
242   DEBUG( fprintf(stderr, "  final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
243 }
244
245 /*
246 pq_insertion_point - figure out where to insert an item with the given
247 priority
248
249 Internal.
250 */
251 static
252 int
253 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
254   /* for now this is just a linear search, later we should make it 
255      binary */
256   int i = pq->end;
257   while (i > pq->start &&
258          priority < pq->entries[i-1].priority) {
259     --i;
260   }
261
262   return i;
263 }
264
265 int
266 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
267   int fill_at;
268   pq_id_t id = pq_new_id(pq, priority);
269
270   DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
271   if (pq->start == pq->end) {
272     DEBUG( fprintf(stderr, "  - on empty queue\n") );
273     /* allow room at front and back for new entries */
274     pq->start = pq->alloc / 3;
275     pq->end = pq->start + 1;
276     fill_at = pq->start;
277   }
278   else if (priority >= pq->entries[pq->end-1].priority) {
279     DEBUG( fprintf(stderr, "  - at the end\n") );
280     if (pq->end == pq->alloc)
281       /* past the end - need to realloc or make some space */
282       pq_realloc(pq, AT_END);
283     
284     fill_at = pq->end;
285     ++pq->end;
286   }
287   else if (priority < pq->entries[pq->start].priority) {
288     DEBUG( fprintf(stderr, "  - at the front\n") );
289     if (pq->start == 0)
290       /* no space at the front, make some */
291       pq_realloc(pq, AT_START);
292
293     --pq->start;
294     fill_at = pq->start;
295   }
296   else {
297     int i;
298     DEBUG( fprintf(stderr, "  - in the middle\n") );
299     i = pq_insertion_point(pq, priority);
300     
301     /* if we're near the end we want to push entries up, otherwise down */
302     if (i - pq->start > (pq->end - pq->start) / 2) {
303       DEBUG( fprintf(stderr, "    - closer to the back (%d -> [ %d %d ])\n",
304                      i, pq->start, pq->end) );
305       /* make sure we have space, this might end up copying twice, 
306          but too bad for now */
307       if (pq->end == pq->alloc) {
308         int old_start = pq->start;
309         pq_realloc(pq, AT_END);
310         i += pq->start - old_start;
311       }
312       
313       memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));
314       ++pq->end;
315       fill_at = i;
316     }
317     else {
318       DEBUG( fprintf(stderr, "    - closer to the front (%d -> [ %d %d ])\n",
319                      i, pq->start, pq->end) );
320       if (pq->start == 0) {
321         pq_realloc(pq, AT_START);
322         i += pq->start;
323       }
324       memmove(pq->entries + pq->start - 1, pq->entries + pq->start,
325              (i - pq->start) * sizeof(pq_entry));
326       --pq->start;
327       fill_at = i-1;
328     }
329   }
330   pq->entries[fill_at].priority = priority;
331   pq->entries[fill_at].id = id;
332   pq->entries[fill_at].payload = newSVsv(payload);
333
334   return id;
335 }
336
337 /*
338   Note: it's up to the caller to release the SV.  The XS code does this 
339   by making it mortal.
340 */
341 int
342 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
343   pq_entry *entry;
344   /* the caller needs to release the payload (somehow) */
345   if (pq->start == pq->end)
346     return 0;
347
348   entry = pq->entries + pq->start++;
349   *priority = entry->priority;
350   *id = entry->id;
351   *payload = entry->payload;
352   pq_release_id(pq, entry->id);
353
354   return 1;
355 }
356
357 int
358 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
359   if (pq->start == pq->end)
360     return 0;
361
362   *priority = pq->entries[pq->start].priority;
363   return 1;
364 }
365
366 int
367 pq_get_item_count(poe_queue *pq) {
368   return pq->end - pq->start;
369 }
370
371 /*
372 pq_test_filter - the XS magic involved in passing the payload to a
373 filter function.
374 */
375 static
376 int
377 pq_test_filter(pq_entry *entry, SV *filter) {
378   /* man perlcall for the magic here */
379   dSP;
380   int count;
381   SV *result_sv;
382   int result;
383
384   ENTER;
385   SAVETMPS;
386   PUSHMARK(SP);
387   XPUSHs(sv_2mortal(newSVsv(entry->payload)));
388   PUTBACK;
389
390   count = call_sv(filter, G_SCALAR);
391
392   SPAGAIN;
393
394   if (count != 1) 
395     croak("got other than 1 value in scalar context");
396
397   result_sv = POPs;
398   result = SvTRUE(result_sv);
399
400   PUTBACK;
401   FREETMPS;
402   LEAVE;
403
404   return result;
405 }
406
407 /*
408 pq_find_item - search for an item we know is there.
409
410 Internal.
411 */
412 static
413 int
414 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
415   int i;
416
417   for (i = pq->start; i < pq->end; ++i) {
418     if (pq->entries[i].id == id)
419       return i;
420   }
421   DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
422   croak("Internal inconsistency: event should have been found");
423 }
424
425 int
426 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
427   pq_priority_t priority;
428   int index;
429
430   if (!pq_item_priority(pq, id, &priority)) {
431     errno = ESRCH;
432     return 0;
433   }
434
435   index = pq_find_item(pq, id, priority);
436
437   if (!pq_test_filter(pq->entries + index, filter)) {
438     errno = EPERM;
439     return 0;
440   }
441
442   *removed = pq->entries[index];
443   pq_release_id(pq, id);
444   if (index == pq->start) {
445     ++pq->start;
446   }
447   else if (index == pq->end - 1) {
448     --pq->end;
449   }
450   else {
451     memmove(pq->entries + index, pq->entries + index + 1,
452             sizeof(pq_entry) * (pq->end - index - 1));
453     --pq->end;
454   }
455
456   return 1;
457 }
458
459 int
460 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
461   int in_index, out_index;
462   int remove_count = 0;
463   
464   *entries = NULL;
465   if (pq->start == pq->end)
466     return 0;
467
468   *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));
469   if (!*entries)
470     croak("Out of memory");
471   
472   in_index = out_index = pq->start;
473   while (in_index < pq->end && remove_count < max_count) {
474     if (pq_test_filter(pq->entries + in_index, filter)) {
475       pq_release_id(pq, pq->entries[in_index].id);
476       (*entries)[remove_count++] = pq->entries[in_index++];
477     }
478     else {
479       pq->entries[out_index++] = pq->entries[in_index++];
480     }
481   }
482   while (in_index < pq->end) {
483     pq->entries[out_index++] = pq->entries[in_index++];
484   }
485   pq->end = out_index;
486   
487   return remove_count;
488 }
489
490 /*
491 We need to keep the following 2 functions in sync (or combine the
492 common code.)
493 */
494 int
495 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
496   pq_priority_t old_priority;
497   int index, insert_at;
498
499   if (!pq_item_priority(pq, id, &old_priority)) {
500     errno = ESRCH;
501     return 0;
502   }
503
504   index = pq_find_item(pq, id, old_priority);
505
506   if (!pq_test_filter(pq->entries + index, filter)) {
507     errno = EPERM;
508     return 0;
509   }
510
511   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );
512
513   if (pq->end - pq->start == 1) {
514     DEBUG( fprintf(stderr, "   -- one item\n") );
515     /* only the one item anyway */
516     pq->entries[pq->start].priority = new_priority;
517   }
518   else {
519     insert_at = pq_insertion_point(pq, new_priority);
520     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );
521     /* the item is still in the queue, so either side of it means it
522        won't move */
523     if (insert_at == index || insert_at == index+1) {
524       DEBUG( fprintf(stderr, "   -- change in place\n") );
525       pq->entries[index].priority = new_priority;
526     }
527     else {
528       pq_entry saved = pq->entries[index];
529       saved.priority = new_priority;
530
531       if (insert_at < index) {
532         DEBUG( fprintf(stderr, "  - insert_at < index\n") );
533         memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
534                 sizeof(pq_entry) * (index - insert_at));
535         pq->entries[insert_at] = saved;
536       }
537       else {
538         DEBUG( fprintf(stderr, "  - insert_at > index\n") );
539         --insert_at;
540         memmove(pq->entries + index, pq->entries + index + 1,
541                 sizeof(pq_entry) * (insert_at - index));
542         pq->entries[insert_at] = saved;
543       }
544     }
545   }
546
547   pq_set_id_priority(pq, id, new_priority);
548
549   return 1;  
550 }
551
552 int
553 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
554   pq_priority_t old_priority, new_priority;
555   int index, insert_at;
556
557   DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
558
559   if (!pq_item_priority(pq, id, &old_priority)) {
560     errno = ESRCH;
561     return 0;
562   }
563
564   index = pq_find_item(pq, id, old_priority);
565
566   if (!pq_test_filter(pq->entries + index, filter)) {
567     errno = EPERM;
568     return 0;
569   }
570
571   new_priority = old_priority + delta;
572
573   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );
574
575   if (pq->end - pq->start == 1) {
576     DEBUG( fprintf(stderr, "   -- one item\n") );
577     /* only the one item anyway */
578     pq->entries[pq->start].priority = new_priority;
579   }
580   else {
581     insert_at = pq_insertion_point(pq, new_priority);
582     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );
583     /* the item is still in the queue, so either side of it means it
584        won't move */
585     if (insert_at == index || insert_at == index+1) {
586       DEBUG( fprintf(stderr, "   -- change in place\n") );
587       pq->entries[index].priority = new_priority;
588     }
589     else {
590       pq_entry saved = pq->entries[index];
591       saved.priority = new_priority;
592
593       if (insert_at < index) {
594         DEBUG( fprintf(stderr, "  - insert_at < index\n") );
595         memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
596                 sizeof(pq_entry) * (index - insert_at));
597         pq->entries[insert_at] = saved;
598       }
599       else {
600         DEBUG( fprintf(stderr, "  - insert_at > index\n") );
601         --insert_at;
602         memmove(pq->entries + index, pq->entries + index + 1,
603                 sizeof(pq_entry) * (insert_at - index));
604         pq->entries[insert_at] = saved;
605       }
606     }
607   }
608
609   pq_set_id_priority(pq, id, new_priority);
610   *priority = new_priority;
611
612   return 1;  
613 }
614
615 int
616 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
617   int count = 0;
618   int i;
619
620   *items = NULL;
621   if (pq->end == pq->start)
622     return 0;
623
624   *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));
625   for (i = pq->start; i < pq->end; ++i) {
626     if (pq_test_filter(pq->entries + i, filter)) {
627       (*items)[count++] = pq->entries[i];
628     }
629   }
630   if (!count) {
631     free(*items);
632     *items = NULL;
633   }
634
635   return count;
636 }
637
638 /*
639 pq_dump - dump the internals of the queue structure.
640 */
641 void
642 pq_dump(poe_queue *pq) {
643   int i;
644   HE *he;
645
646   printf("poe_queue\n");
647   printf("  start: %d\n", pq->start);
648   printf("    end: %d\n", pq->end);
649   printf("  alloc: %d\n", pq->alloc);
650   printf("    seq: %d\n", pq->queue_seq);
651   printf("  **Queue Entries:\n"
652          "      index:   id  priority    SV\n");
653   for (i = pq->start; i < pq->end; ++i) {
654     pq_entry *entry = pq->entries + i;
655     printf("      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,
656            entry->payload, (unsigned)SvREFCNT(entry->payload));
657   }
658   printf("  **Hash entries:\n");
659   hv_iterinit(pq->ids);
660   while ((he = hv_iternext(pq->ids)) != NULL) {
661     STRLEN len;
662     printf("   %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
663   }
664 }
665
666 /* this typedef lets the standard T_PTROBJ typemap handle the
667 conversion between perl class and C type and back again */
668 typedef poe_queue *POE__XS__Queue__Array;
669
670 /* This gives us our new method and correct destruction */
671 #define pq_new(class) pq_create()
672 #define pq_DESTROY(pq) pq_delete(pq)
673
674 MODULE = POE::XS::Queue::Array  PACKAGE = POE::XS::Queue::Array PREFIX = pq_
675
676 PROTOTYPES: DISABLE
677
678 POE::XS::Queue::Array
679 pq_new(class)
680
681 void
682 pq_DESTROY(pq)
683         POE::XS::Queue::Array pq
684
685 int
686 pq_enqueue(pq, priority, payload)
687      POE::XS::Queue::Array pq
688      double priority
689      SV *payload
690      
691 void
692 pq_dequeue_next(pq)
693         POE::XS::Queue::Array pq
694       PREINIT:
695         pq_priority_t priority;
696         pq_id_t id;
697         SV *payload;
698       PPCODE:
699         if (pq_dequeue_next(pq, &priority, &id, &payload)) {
700           EXTEND(SP, 3);
701           PUSHs(sv_2mortal(newSVnv(priority)));
702           PUSHs(sv_2mortal(newSViv(id)));
703           PUSHs(sv_2mortal(payload));
704         }
705
706 SV *
707 pq_get_next_priority(pq)
708         POE::XS::Queue::Array pq
709       PREINIT:
710         pq_priority_t priority;
711       CODE:
712         if (pq_get_next_priority(pq, &priority)) {
713           RETVAL = newSVnv(priority); /* XS will mortalize this for us */
714         }
715         else {
716           RETVAL = &PL_sv_undef;
717         }
718       OUTPUT:
719         RETVAL
720
721 int
722 pq_get_item_count(pq)
723         POE::XS::Queue::Array pq
724
725 void
726 pq_remove_item(pq, id, filter)
727         POE::XS::Queue::Array pq
728         int id
729         SV *filter
730       PREINIT:
731         pq_entry removed;
732       PPCODE:
733         if (pq_remove_item(pq, id, filter, &removed)) {
734           EXTEND(SP, 3);
735           PUSHs(sv_2mortal(newSVnv(removed.priority)));
736           PUSHs(sv_2mortal(newSViv(removed.id)));
737           PUSHs(sv_2mortal(removed.payload));
738         }
739
740 void
741 pq_remove_items(pq, filter, ...)
742         POE::XS::Queue::Array pq
743         SV *filter
744       PREINIT:
745         int max_count;
746         pq_entry *removed_entries = NULL;
747         int removed_count;
748         int i;
749       PPCODE:
750         if (items > 2)
751           max_count = SvIV(ST(2));
752         else
753           max_count = pq_get_item_count(pq);
754         removed_count = pq_remove_items(pq, filter, max_count, 
755                                         &removed_entries);
756         if (removed_count) {
757           EXTEND(SP, removed_count);
758           for (i = 0; i < removed_count; ++i) {
759             pq_entry *entry = removed_entries + i;
760             AV *av = newAV();
761             SV *rv;
762             av_extend(av, 2);
763             av_store(av, 0, newSVnv(entry->priority));
764             av_store(av, 1, newSViv(entry->id));
765             av_store(av, 2, entry->payload);
766             rv = newRV_noinc((SV *)av);
767             PUSHs(sv_2mortal(rv));
768           }
769         }
770         if (removed_entries)
771           free(removed_entries);
772
773 void
774 pq_adjust_priority(pq, id, filter, delta)
775         POE::XS::Queue::Array pq
776         int id
777         SV *filter
778         double delta
779       PREINIT:
780         pq_priority_t new_priority;
781       PPCODE:
782         if (pq_adjust_priority(pq, id, filter, delta, &new_priority)) {
783           EXTEND(SP, 1);
784           PUSHs(sv_2mortal(newSVnv(new_priority)));
785         }
786
787 void
788 pq_set_priority(pq, id, filter, new_priority)
789         POE::XS::Queue::Array pq
790         int id
791         SV *filter
792         double new_priority
793       PPCODE:
794         if (pq_set_priority(pq, id, filter, new_priority)) {
795           EXTEND(SP, 1);
796           PUSHs(sv_2mortal(newSVnv(new_priority)));
797         }
798
799 void
800 pq_peek_items(pq, filter, ...)
801         POE::XS::Queue::Array pq
802         SV *filter
803       PREINIT:
804         pq_entry *ret_items;
805         int count, i;
806         int max_count;
807       PPCODE:
808         if (items == 3)
809           max_count = SvIV(ST(2));
810         else
811           max_count = pq_get_item_count(pq);
812         count = pq_peek_items(pq, filter, max_count, &ret_items);
813         if (count) {
814           EXTEND(SP, count);
815           for (i = 0; i < count; ++i) {
816             pq_entry *entry = ret_items + i;
817             AV *av = newAV();
818             SV *rv;
819             av_extend(av, 2);
820             av_store(av, 0, newSVnv(entry->priority));
821             av_store(av, 1, newSViv(entry->id));
822             av_store(av, 2, newSVsv(entry->payload));
823             rv = newRV_noinc((SV *)av);
824             PUSHs(sv_2mortal(rv));
825           }
826           free(ret_items);
827         }
828
829 void
830 pq_dump(pq)
831         POE::XS::Queue::Array pq