initial release
[poe-xs-queue-array.git] / Array.xs
CommitLineData
e38f8ec4
TC
1#include "EXTERN.h"
2#include "perl.h"
3#include "XSUB.h"
4#include <string.h> /* for memmove() mostly */
5#include <errno.h> /* errno values */
6
7/*#define DEBUG(x) x*/
8#define DEBUG(x)
9
10typedef unsigned pq_id_t;
11typedef double pq_priority_t;
12
13#define PQ_START_SIZE 10
14#define AT_START 0
15#define AT_END 1
16
17pq_id_t queue_seq;
18
19/* an entry in the queue */
20typedef struct {
21 pq_priority_t priority;
22 pq_id_t id;
23 SV *payload;
24} pq_entry;
25
26/*
27We store the queue in a similar way to the way perl deals with arrays,
28we keep a block of memory, but the first element may or may not be in use,
29depending on the pattern of usage.
30
31There's 3 value controlling usage of the array:
32
33 - alloc - the number of elements allocated in total
34 - start - the first element in use in the array
35 - end - one past the end of the last element in the array
36
37This has the properties that:
38
39 start == 0 - no space at the front
40 end == alloc - no space at the end
41 end - start - number of elements in the queue
42
43We use a perl hash (HV *) to store the mapping from ids to priorities.
44
45*/
46typedef struct {
47 /* the first entry in use */
48 int start;
49
50 /* 1 past the last entry in use, hence end - start is the number of
51 entries in the queue */
52 int end;
53
54 /* the total number of entries allocated */
55 int alloc;
56
57 /* used to generate item ids */
58 pq_id_t queue_seq;
59
60 /* used to track in use item ids */
61 HV *ids;
62
63 /* the actual entries */
64 pq_entry *entries;
65} poe_queue;
66
67/*
68poe_create - create a new queue object.
69
70No parameters. returns the new queue object.
71
72*/
73poe_queue *
74pq_create(void) {
75 poe_queue *pq = malloc(sizeof(poe_queue));
76
77 if (pq == NULL)
78 croak("Out of memory");
79 pq->start = 0;
80 pq->end = 0;
81 pq->alloc = PQ_START_SIZE;
82 pq->queue_seq = 0;
83 pq->ids = newHV();
84 pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);
85 if (pq->entries == NULL)
86 croak("Out of memory");
87
88 DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
89
90 return pq;
91}
92
93/*
94pq_delete - release the queue object.
95
96This also releases one reference from each SV in the queue.
97
98*/
99void
100pq_delete(poe_queue *pq) {
101 int i;
102
103 DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
104 if (pq->end > pq->start) {
105 for (i = pq->start; i < pq->end; ++i) {
106 SvREFCNT_dec(pq->entries[i].payload);
107 }
108 }
109 SvREFCNT_dec((SV *)pq->ids);
110 pq->ids = NULL;
111 if (pq->entries)
112 free(pq->entries);
113 pq->entries = NULL;
114 free(pq);
115}
116
117/*
118pq_new_id - generate a new item id.
119
120Internal use only.
121
122This, the following 3 functions and pq_create, pq_delete, should be
123all that needs to be modified if we change hash implementations.
124
125*/
126static
127pq_id_t
128pq_new_id(poe_queue *pq, pq_priority_t priority) {
129 int seq = ++pq->queue_seq;
130 SV *index = newSViv(seq);
131
132 while (hv_exists_ent(pq->ids, index, 0)) {
133 seq = ++pq->queue_seq;
134 sv_setiv(index, seq);
135 }
136 hv_store_ent(pq->ids, index, newSVnv(priority), 0);
137
138 return seq;
139}
140
141/*
142pq_release_id - releases an id for future use.
143*/
144static
145void
146pq_release_id(poe_queue *pq, pq_id_t id) {
147 SV *id_sv = sv_2mortal(newSViv(id));
148
149 hv_delete_ent(pq->ids, id_sv, 0, 0);
150}
151
152/*
153pq_item_priority - get the priority of an item given it's id
154*/
155static
156int
157pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
158 HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
159
160 if (!entry)
161 return 0;
162
163 *priority = SvNV(HeVAL(entry));
164
165 return 1;
166}
167
168/*
169pq_set_id_priority - set the priority of an item in the id hash
170*/
171static
172void
173pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
174 HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
175
176 if (!entry)
177 croak("pq_set_priority: id not found");
178
179 sv_setnv(HeVAL(entry), new_priority);
180}
181
182/*
183pq_realloc - make space at the front of back of the queue.
184
185This adjusts the queue to allow insertion of a single item at the
186front or the back of the queue.
187
188If the queue has 33% or more space available we simple adjust the
189position of the in-use items within the array. We try not to push the
190items right up against the opposite end of the array, since we might
191need to insert items there too.
192
193If the queue has less than 33% space available we allocate another 50%
194space. We then only move the queue elements if we need space at the
195front, since the reallocation has just opened up a huge space at the
196back. Since we're reallocating exponentially larger sizes we should
197have a constant time cost on reallocation per queue item stored (but
198other costs are going to be higher.)
199
200*/
201static
202void
203pq_realloc(poe_queue *pq, int at_end) {
204 int count = pq->end - pq->start;
205
206 DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
207 if (count * 3 / 2 < pq->alloc) {
208 /* 33 % or more space available, use some of it */
209 int new_start;
210
211 if (at_end) {
212 new_start = (pq->alloc - count) / 3;
213 }
214 else {
215 new_start = (pq->alloc - count) * 2 / 3;
216 }
217 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
218 memmove(pq->entries + new_start, pq->entries + pq->start,
219 count * sizeof(pq_entry));
220 pq->start = new_start;
221 pq->end = new_start + count;
222 }
223 else {
224 int new_alloc = pq->alloc * 3 / 2;
225 pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);
226 pq->alloc = new_alloc;
227
228 if (!pq->entries)
229 croak("Out of memory");
230
231 DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );
232
233 if (!at_end) {
234 int new_start = (new_alloc - count) * 2 / 3;
235 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
236 memmove(pq->entries + new_start, pq->entries + pq->start,
237 count * sizeof(pq_entry));
238 pq->start = new_start;
239 pq->end = new_start + count;
240 }
241 }
242 DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
243}
244
245/*
246pq_insertion_point - figure out where to insert an item with the given
247priority
248
249Internal.
250*/
251static
252int
253pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
254 /* for now this is just a linear search, later we should make it
255 binary */
256 int i = pq->end;
257 while (i > pq->start &&
258 priority < pq->entries[i-1].priority) {
259 --i;
260 }
261
262 return i;
263}
264
265int
266pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
267 int fill_at;
268 pq_id_t id = pq_new_id(pq, priority);
269
270 DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
271 if (pq->start == pq->end) {
272 DEBUG( fprintf(stderr, " - on empty queue\n") );
273 /* allow room at front and back for new entries */
274 pq->start = pq->alloc / 3;
275 pq->end = pq->start + 1;
276 fill_at = pq->start;
277 }
278 else if (priority >= pq->entries[pq->end-1].priority) {
279 DEBUG( fprintf(stderr, " - at the end\n") );
280 if (pq->end == pq->alloc)
281 /* past the end - need to realloc or make some space */
282 pq_realloc(pq, AT_END);
283
284 fill_at = pq->end;
285 ++pq->end;
286 }
287 else if (priority < pq->entries[pq->start].priority) {
288 DEBUG( fprintf(stderr, " - at the front\n") );
289 if (pq->start == 0)
290 /* no space at the front, make some */
291 pq_realloc(pq, AT_START);
292
293 --pq->start;
294 fill_at = pq->start;
295 }
296 else {
297 int i;
298 DEBUG( fprintf(stderr, " - in the middle\n") );
299 i = pq_insertion_point(pq, priority);
300
301 /* if we're near the end we want to push entries up, otherwise down */
302 if (i - pq->start > (pq->end - pq->start) / 2) {
303 DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",
304 i, pq->start, pq->end) );
305 /* make sure we have space, this might end up copying twice,
306 but too bad for now */
307 if (pq->end == pq->alloc) {
308 int old_start = pq->start;
309 pq_realloc(pq, AT_END);
310 i += pq->start - old_start;
311 }
312
313 memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));
314 ++pq->end;
315 fill_at = i;
316 }
317 else {
318 DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",
319 i, pq->start, pq->end) );
320 if (pq->start == 0) {
321 pq_realloc(pq, AT_START);
322 i += pq->start;
323 }
324 memmove(pq->entries + pq->start - 1, pq->entries + pq->start,
325 (i - pq->start) * sizeof(pq_entry));
326 --pq->start;
327 fill_at = i-1;
328 }
329 }
330 pq->entries[fill_at].priority = priority;
331 pq->entries[fill_at].id = id;
332 pq->entries[fill_at].payload = newSVsv(payload);
333
334 return id;
335}
336
337/*
338 Note: it's up to the caller to release the SV. The XS code does this
339 by making it mortal.
340*/
341int
342pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
343 pq_entry *entry;
344 /* the caller needs to release the payload (somehow) */
345 if (pq->start == pq->end)
346 return 0;
347
348 entry = pq->entries + pq->start++;
349 *priority = entry->priority;
350 *id = entry->id;
351 *payload = entry->payload;
352 pq_release_id(pq, entry->id);
353
354 return 1;
355}
356
357int
358pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
359 if (pq->start == pq->end)
360 return 0;
361
362 *priority = pq->entries[pq->start].priority;
363 return 1;
364}
365
366int
367pq_get_item_count(poe_queue *pq) {
368 return pq->end - pq->start;
369}
370
371/*
372pq_test_filter - the XS magic involved in passing the payload to a
373filter function.
374*/
375static
376int
377pq_test_filter(pq_entry *entry, SV *filter) {
378 /* man perlcall for the magic here */
379 dSP;
380 int count;
381 SV *result_sv;
382 int result;
383
384 ENTER;
385 SAVETMPS;
386 PUSHMARK(SP);
387 XPUSHs(sv_2mortal(newSVsv(entry->payload)));
388 PUTBACK;
389
390 count = call_sv(filter, G_SCALAR);
391
392 SPAGAIN;
393
394 if (count != 1)
395 croak("got other than 1 value in scalar context");
396
397 result_sv = POPs;
398 result = SvTRUE(result_sv);
399
400 PUTBACK;
401 FREETMPS;
402 LEAVE;
403
404 return result;
405}
406
407/*
408pq_find_item - search for an item we know is there.
409
410Internal.
411*/
412static
413int
414pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
415 int i;
416
417 for (i = pq->start; i < pq->end; ++i) {
418 if (pq->entries[i].id == id)
419 return i;
420 }
421 DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
422 croak("Internal inconsistency: event should have been found");
423}
424
425int
426pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
427 pq_priority_t priority;
428 int index;
429
430 if (!pq_item_priority(pq, id, &priority)) {
431 errno = ESRCH;
432 return 0;
433 }
434
435 index = pq_find_item(pq, id, priority);
436
437 if (!pq_test_filter(pq->entries + index, filter)) {
438 errno = EPERM;
439 return 0;
440 }
441
442 *removed = pq->entries[index];
443 pq_release_id(pq, id);
444 if (index == pq->start) {
445 ++pq->start;
446 }
447 else if (index == pq->end - 1) {
448 --pq->end;
449 }
450 else {
451 memmove(pq->entries + index, pq->entries + index + 1,
452 sizeof(pq_entry) * (pq->end - index - 1));
453 --pq->end;
454 }
455
456 return 1;
457}
458
459int
460pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
461 int in_index, out_index;
462 int remove_count = 0;
463
464 *entries = NULL;
465 if (pq->start == pq->end)
466 return 0;
467
468 *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));
469 if (!*entries)
470 croak("Out of memory");
471
472 in_index = out_index = pq->start;
473 while (in_index < pq->end && remove_count < max_count) {
474 if (pq_test_filter(pq->entries + in_index, filter)) {
475 pq_release_id(pq, pq->entries[in_index].id);
476 (*entries)[remove_count++] = pq->entries[in_index++];
477 }
478 else {
479 pq->entries[out_index++] = pq->entries[in_index++];
480 }
481 }
482 while (in_index < pq->end) {
483 pq->entries[out_index++] = pq->entries[in_index++];
484 }
485 pq->end = out_index;
486
487 return remove_count;
488}
489
490/*
491We need to keep the following 2 functions in sync (or combine the
492common code.)
493*/
494int
495pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
496 pq_priority_t old_priority;
497 int index, insert_at;
498
499 if (!pq_item_priority(pq, id, &old_priority)) {
500 errno = ESRCH;
501 return 0;
502 }
503
504 index = pq_find_item(pq, id, old_priority);
505
506 if (!pq_test_filter(pq->entries + index, filter)) {
507 errno = EPERM;
508 return 0;
509 }
510
511 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
512
513 if (pq->end - pq->start == 1) {
514 DEBUG( fprintf(stderr, " -- one item\n") );
515 /* only the one item anyway */
516 pq->entries[pq->start].priority = new_priority;
517 }
518 else {
519 insert_at = pq_insertion_point(pq, new_priority);
520 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
521 /* the item is still in the queue, so either side of it means it
522 won't move */
523 if (insert_at == index || insert_at == index+1) {
524 DEBUG( fprintf(stderr, " -- change in place\n") );
525 pq->entries[index].priority = new_priority;
526 }
527 else {
528 pq_entry saved = pq->entries[index];
529 saved.priority = new_priority;
530
531 if (insert_at < index) {
532 DEBUG( fprintf(stderr, " - insert_at < index\n") );
533 memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
534 sizeof(pq_entry) * (index - insert_at));
535 pq->entries[insert_at] = saved;
536 }
537 else {
538 DEBUG( fprintf(stderr, " - insert_at > index\n") );
539 --insert_at;
540 memmove(pq->entries + index, pq->entries + index + 1,
541 sizeof(pq_entry) * (insert_at - index));
542 pq->entries[insert_at] = saved;
543 }
544 }
545 }
546
547 pq_set_id_priority(pq, id, new_priority);
548
549 return 1;
550}
551
552int
553pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
554 pq_priority_t old_priority, new_priority;
555 int index, insert_at;
556
557 DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
558
559 if (!pq_item_priority(pq, id, &old_priority)) {
560 errno = ESRCH;
561 return 0;
562 }
563
564 index = pq_find_item(pq, id, old_priority);
565
566 if (!pq_test_filter(pq->entries + index, filter)) {
567 errno = EPERM;
568 return 0;
569 }
570
571 new_priority = old_priority + delta;
572
573 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
574
575 if (pq->end - pq->start == 1) {
576 DEBUG( fprintf(stderr, " -- one item\n") );
577 /* only the one item anyway */
578 pq->entries[pq->start].priority = new_priority;
579 }
580 else {
581 insert_at = pq_insertion_point(pq, new_priority);
582 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
583 /* the item is still in the queue, so either side of it means it
584 won't move */
585 if (insert_at == index || insert_at == index+1) {
586 DEBUG( fprintf(stderr, " -- change in place\n") );
587 pq->entries[index].priority = new_priority;
588 }
589 else {
590 pq_entry saved = pq->entries[index];
591 saved.priority = new_priority;
592
593 if (insert_at < index) {
594 DEBUG( fprintf(stderr, " - insert_at < index\n") );
595 memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
596 sizeof(pq_entry) * (index - insert_at));
597 pq->entries[insert_at] = saved;
598 }
599 else {
600 DEBUG( fprintf(stderr, " - insert_at > index\n") );
601 --insert_at;
602 memmove(pq->entries + index, pq->entries + index + 1,
603 sizeof(pq_entry) * (insert_at - index));
604 pq->entries[insert_at] = saved;
605 }
606 }
607 }
608
609 pq_set_id_priority(pq, id, new_priority);
610 *priority = new_priority;
611
612 return 1;
613}
614
615int
616pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
617 int count = 0;
618 int i;
619
620 *items = NULL;
621 if (pq->end == pq->start)
622 return 0;
623
624 *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));
625 for (i = pq->start; i < pq->end; ++i) {
626 if (pq_test_filter(pq->entries + i, filter)) {
627 (*items)[count++] = pq->entries[i];
628 }
629 }
630 if (!count) {
631 free(*items);
632 *items = NULL;
633 }
634
635 return count;
636}
637
638/*
639pq_dump - dump the internals of the queue structure.
640*/
641void
642pq_dump(poe_queue *pq) {
643 int i;
644 HE *he;
645
646 printf("poe_queue\n");
647 printf(" start: %d\n", pq->start);
648 printf(" end: %d\n", pq->end);
649 printf(" alloc: %d\n", pq->alloc);
650 printf(" seq: %d\n", pq->queue_seq);
651 printf(" **Queue Entries:\n"
652 " index: id priority SV\n");
653 for (i = pq->start; i < pq->end; ++i) {
654 pq_entry *entry = pq->entries + i;
655 printf(" %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,
656 entry->payload, (unsigned)SvREFCNT(entry->payload));
657 }
658 printf(" **Hash entries:\n");
659 hv_iterinit(pq->ids);
660 while ((he = hv_iternext(pq->ids)) != NULL) {
661 STRLEN len;
662 printf(" %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
663 }
664}
665
666/* this typedef lets the standard T_PTROBJ typemap handle the
667conversion between perl class and C type and back again */
668typedef poe_queue *POE__XS__Queue__Array;
669
670/* This gives us our new method and correct destruction */
671#define pq_new(class) pq_create()
672#define pq_DESTROY(pq) pq_delete(pq)
673
674MODULE = POE::XS::Queue::Array PACKAGE = POE::XS::Queue::Array PREFIX = pq_
675
676PROTOTYPES: DISABLE
677
678POE::XS::Queue::Array
679pq_new(class)
680
681void
682pq_DESTROY(pq)
683 POE::XS::Queue::Array pq
684
685int
686pq_enqueue(pq, priority, payload)
687 POE::XS::Queue::Array pq
688 double priority
689 SV *payload
690
691void
692pq_dequeue_next(pq)
693 POE::XS::Queue::Array pq
694 PREINIT:
695 pq_priority_t priority;
696 pq_id_t id;
697 SV *payload;
698 PPCODE:
699 if (pq_dequeue_next(pq, &priority, &id, &payload)) {
700 EXTEND(SP, 3);
701 PUSHs(sv_2mortal(newSVnv(priority)));
702 PUSHs(sv_2mortal(newSViv(id)));
703 PUSHs(sv_2mortal(payload));
704 }
705
706SV *
707pq_get_next_priority(pq)
708 POE::XS::Queue::Array pq
709 PREINIT:
710 pq_priority_t priority;
711 CODE:
712 if (pq_get_next_priority(pq, &priority)) {
713 RETVAL = newSVnv(priority); /* XS will mortalize this for us */
714 }
715 else {
716 RETVAL = &PL_sv_undef;
717 }
718 OUTPUT:
719 RETVAL
720
721int
722pq_get_item_count(pq)
723 POE::XS::Queue::Array pq
724
725void
726pq_remove_item(pq, id, filter)
727 POE::XS::Queue::Array pq
728 int id
729 SV *filter
730 PREINIT:
731 pq_entry removed;
732 PPCODE:
733 if (pq_remove_item(pq, id, filter, &removed)) {
734 EXTEND(SP, 3);
735 PUSHs(sv_2mortal(newSVnv(removed.priority)));
736 PUSHs(sv_2mortal(newSViv(removed.id)));
737 PUSHs(sv_2mortal(removed.payload));
738 }
739
740void
741pq_remove_items(pq, filter, ...)
742 POE::XS::Queue::Array pq
743 SV *filter
744 PREINIT:
745 int max_count;
746 pq_entry *removed_entries = NULL;
747 int removed_count;
748 int i;
749 PPCODE:
750 if (items > 2)
751 max_count = SvIV(ST(2));
752 else
753 max_count = pq_get_item_count(pq);
754 removed_count = pq_remove_items(pq, filter, max_count,
755 &removed_entries);
756 if (removed_count) {
757 EXTEND(SP, removed_count);
758 for (i = 0; i < removed_count; ++i) {
759 pq_entry *entry = removed_entries + i;
760 AV *av = newAV();
761 SV *rv;
762 av_extend(av, 2);
763 av_store(av, 0, newSVnv(entry->priority));
764 av_store(av, 1, newSViv(entry->id));
765 av_store(av, 2, entry->payload);
766 rv = newRV_noinc((SV *)av);
767 PUSHs(sv_2mortal(rv));
768 }
769 }
770 if (removed_entries)
771 free(removed_entries);
772
773void
774pq_adjust_priority(pq, id, filter, delta)
775 POE::XS::Queue::Array pq
776 int id
777 SV *filter
778 double delta
779 PREINIT:
780 pq_priority_t new_priority;
781 PPCODE:
782 if (pq_adjust_priority(pq, id, filter, delta, &new_priority)) {
783 EXTEND(SP, 1);
784 PUSHs(sv_2mortal(newSVnv(new_priority)));
785 }
786
787void
788pq_set_priority(pq, id, filter, new_priority)
789 POE::XS::Queue::Array pq
790 int id
791 SV *filter
792 double new_priority
793 PPCODE:
794 if (pq_set_priority(pq, id, filter, new_priority)) {
795 EXTEND(SP, 1);
796 PUSHs(sv_2mortal(newSVnv(new_priority)));
797 }
798
799void
800pq_peek_items(pq, filter, ...)
801 POE::XS::Queue::Array pq
802 SV *filter
803 PREINIT:
804 pq_entry *ret_items;
805 int count, i;
806 int max_count;
807 PPCODE:
808 if (items == 3)
809 max_count = SvIV(ST(2));
810 else
811 max_count = pq_get_item_count(pq);
812 count = pq_peek_items(pq, filter, max_count, &ret_items);
813 if (count) {
814 EXTEND(SP, count);
815 for (i = 0; i < count; ++i) {
816 pq_entry *entry = ret_items + i;
817 AV *av = newAV();
818 SV *rv;
819 av_extend(av, 2);
820 av_store(av, 0, newSVnv(entry->priority));
821 av_store(av, 1, newSViv(entry->id));
822 av_store(av, 2, newSVsv(entry->payload));
823 rv = newRV_noinc((SV *)av);
824 PUSHs(sv_2mortal(rv));
825 }
826 free(ret_items);
827 }
828
829void
830pq_dump(pq)
831 POE::XS::Queue::Array pq