4 #include <string.h> /* for memmove() mostly */
5 #include <errno.h> /* errno values */
10 typedef unsigned pq_id_t;
11 typedef double pq_priority_t;
13 #define PQ_START_SIZE 10
19 /* an entry in the queue */
21 pq_priority_t priority;
27 We store the queue in a similar way to the way perl deals with arrays,
28 we keep a block of memory, but the first element may or may not be in use,
29 depending on the pattern of usage.
31 There's 3 value controlling usage of the array:
33 - alloc - the number of elements allocated in total
34 - start - the first element in use in the array
35 - end - one past the end of the last element in the array
37 This has the properties that:
39 start == 0 - no space at the front
40 end == alloc - no space at the end
41 end - start - number of elements in the queue
43 We use a perl hash (HV *) to store the mapping from ids to priorities.
47 /* the first entry in use */
50 /* 1 past the last entry in use, hence end - start is the number of
51 entries in the queue */
54 /* the total number of entries allocated */
57 /* used to generate item ids */
60 /* used to track in use item ids */
63 /* the actual entries */
68 poe_create - create a new queue object.
70 No parameters. returns the new queue object.
75 poe_queue *pq = malloc(sizeof(poe_queue));
78 croak("Out of memory");
81 pq->alloc = PQ_START_SIZE;
84 pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);
85 if (pq->entries == NULL)
86 croak("Out of memory");
88 DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
94 pq_delete - release the queue object.
96 This also releases one reference from each SV in the queue.
100 pq_delete(poe_queue *pq) {
103 DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
104 if (pq->end > pq->start) {
105 for (i = pq->start; i < pq->end; ++i) {
106 SvREFCNT_dec(pq->entries[i].payload);
109 SvREFCNT_dec((SV *)pq->ids);
118 pq_new_id - generate a new item id.
122 This, the following 3 functions and pq_create, pq_delete, should be
123 all that needs to be modified if we change hash implementations.
128 pq_new_id(poe_queue *pq, pq_priority_t priority) {
129 int seq = ++pq->queue_seq;
130 SV *index = newSViv(seq);
132 while (hv_exists_ent(pq->ids, index, 0)) {
133 seq = ++pq->queue_seq;
134 sv_setiv(index, seq);
136 hv_store_ent(pq->ids, index, newSVnv(priority), 0);
142 pq_release_id - releases an id for future use.
146 pq_release_id(poe_queue *pq, pq_id_t id) {
147 SV *id_sv = sv_2mortal(newSViv(id));
149 hv_delete_ent(pq->ids, id_sv, 0, 0);
153 pq_item_priority - get the priority of an item given it's id
157 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
158 HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
163 *priority = SvNV(HeVAL(entry));
169 pq_set_id_priority - set the priority of an item in the id hash
173 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
174 HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
177 croak("pq_set_priority: id not found");
179 sv_setnv(HeVAL(entry), new_priority);
183 pq_realloc - make space at the front of back of the queue.
185 This adjusts the queue to allow insertion of a single item at the
186 front or the back of the queue.
188 If the queue has 33% or more space available we simple adjust the
189 position of the in-use items within the array. We try not to push the
190 items right up against the opposite end of the array, since we might
191 need to insert items there too.
193 If the queue has less than 33% space available we allocate another 50%
194 space. We then only move the queue elements if we need space at the
195 front, since the reallocation has just opened up a huge space at the
196 back. Since we're reallocating exponentially larger sizes we should
197 have a constant time cost on reallocation per queue item stored (but
198 other costs are going to be higher.)
203 pq_realloc(poe_queue *pq, int at_end) {
204 int count = pq->end - pq->start;
206 DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
207 if (count * 3 / 2 < pq->alloc) {
208 /* 33 % or more space available, use some of it */
212 new_start = (pq->alloc - count) / 3;
215 new_start = (pq->alloc - count) * 2 / 3;
217 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
218 memmove(pq->entries + new_start, pq->entries + pq->start,
219 count * sizeof(pq_entry));
220 pq->start = new_start;
221 pq->end = new_start + count;
224 int new_alloc = pq->alloc * 3 / 2;
225 pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);
226 pq->alloc = new_alloc;
229 croak("Out of memory");
231 DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );
234 int new_start = (new_alloc - count) * 2 / 3;
235 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
236 memmove(pq->entries + new_start, pq->entries + pq->start,
237 count * sizeof(pq_entry));
238 pq->start = new_start;
239 pq->end = new_start + count;
242 DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
246 pq_insertion_point - figure out where to insert an item with the given
253 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
254 /* for now this is just a linear search, later we should make it
257 while (i > pq->start &&
258 priority < pq->entries[i-1].priority) {
266 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
268 pq_id_t id = pq_new_id(pq, priority);
270 DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
271 if (pq->start == pq->end) {
272 DEBUG( fprintf(stderr, " - on empty queue\n") );
273 /* allow room at front and back for new entries */
274 pq->start = pq->alloc / 3;
275 pq->end = pq->start + 1;
278 else if (priority >= pq->entries[pq->end-1].priority) {
279 DEBUG( fprintf(stderr, " - at the end\n") );
280 if (pq->end == pq->alloc)
281 /* past the end - need to realloc or make some space */
282 pq_realloc(pq, AT_END);
287 else if (priority < pq->entries[pq->start].priority) {
288 DEBUG( fprintf(stderr, " - at the front\n") );
290 /* no space at the front, make some */
291 pq_realloc(pq, AT_START);
298 DEBUG( fprintf(stderr, " - in the middle\n") );
299 i = pq_insertion_point(pq, priority);
301 /* if we're near the end we want to push entries up, otherwise down */
302 if (i - pq->start > (pq->end - pq->start) / 2) {
303 DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",
304 i, pq->start, pq->end) );
305 /* make sure we have space, this might end up copying twice,
306 but too bad for now */
307 if (pq->end == pq->alloc) {
308 int old_start = pq->start;
309 pq_realloc(pq, AT_END);
310 i += pq->start - old_start;
313 memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));
318 DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",
319 i, pq->start, pq->end) );
320 if (pq->start == 0) {
321 pq_realloc(pq, AT_START);
324 memmove(pq->entries + pq->start - 1, pq->entries + pq->start,
325 (i - pq->start) * sizeof(pq_entry));
330 pq->entries[fill_at].priority = priority;
331 pq->entries[fill_at].id = id;
332 pq->entries[fill_at].payload = newSVsv(payload);
338 Note: it's up to the caller to release the SV. The XS code does this
342 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
344 /* the caller needs to release the payload (somehow) */
345 if (pq->start == pq->end)
348 entry = pq->entries + pq->start++;
349 *priority = entry->priority;
351 *payload = entry->payload;
352 pq_release_id(pq, entry->id);
358 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
359 if (pq->start == pq->end)
362 *priority = pq->entries[pq->start].priority;
367 pq_get_item_count(poe_queue *pq) {
368 return pq->end - pq->start;
372 pq_test_filter - the XS magic involved in passing the payload to a
377 pq_test_filter(pq_entry *entry, SV *filter) {
378 /* man perlcall for the magic here */
387 XPUSHs(sv_2mortal(newSVsv(entry->payload)));
390 count = call_sv(filter, G_SCALAR);
395 croak("got other than 1 value in scalar context");
398 result = SvTRUE(result_sv);
408 pq_find_item - search for an item we know is there.
414 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
417 for (i = pq->start; i < pq->end; ++i) {
418 if (pq->entries[i].id == id)
421 DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
422 croak("Internal inconsistency: event should have been found");
426 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
427 pq_priority_t priority;
430 if (!pq_item_priority(pq, id, &priority)) {
435 index = pq_find_item(pq, id, priority);
437 if (!pq_test_filter(pq->entries + index, filter)) {
442 *removed = pq->entries[index];
443 pq_release_id(pq, id);
444 if (index == pq->start) {
447 else if (index == pq->end - 1) {
451 memmove(pq->entries + index, pq->entries + index + 1,
452 sizeof(pq_entry) * (pq->end - index - 1));
460 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
461 int in_index, out_index;
462 int remove_count = 0;
465 if (pq->start == pq->end)
468 *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));
470 croak("Out of memory");
472 in_index = out_index = pq->start;
473 while (in_index < pq->end && remove_count < max_count) {
474 if (pq_test_filter(pq->entries + in_index, filter)) {
475 pq_release_id(pq, pq->entries[in_index].id);
476 (*entries)[remove_count++] = pq->entries[in_index++];
479 pq->entries[out_index++] = pq->entries[in_index++];
482 while (in_index < pq->end) {
483 pq->entries[out_index++] = pq->entries[in_index++];
491 We need to keep the following 2 functions in sync (or combine the
495 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
496 pq_priority_t old_priority;
497 int index, insert_at;
499 if (!pq_item_priority(pq, id, &old_priority)) {
504 index = pq_find_item(pq, id, old_priority);
506 if (!pq_test_filter(pq->entries + index, filter)) {
511 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
513 if (pq->end - pq->start == 1) {
514 DEBUG( fprintf(stderr, " -- one item\n") );
515 /* only the one item anyway */
516 pq->entries[pq->start].priority = new_priority;
519 insert_at = pq_insertion_point(pq, new_priority);
520 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
521 /* the item is still in the queue, so either side of it means it
523 if (insert_at == index || insert_at == index+1) {
524 DEBUG( fprintf(stderr, " -- change in place\n") );
525 pq->entries[index].priority = new_priority;
528 pq_entry saved = pq->entries[index];
529 saved.priority = new_priority;
531 if (insert_at < index) {
532 DEBUG( fprintf(stderr, " - insert_at < index\n") );
533 memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
534 sizeof(pq_entry) * (index - insert_at));
535 pq->entries[insert_at] = saved;
538 DEBUG( fprintf(stderr, " - insert_at > index\n") );
540 memmove(pq->entries + index, pq->entries + index + 1,
541 sizeof(pq_entry) * (insert_at - index));
542 pq->entries[insert_at] = saved;
547 pq_set_id_priority(pq, id, new_priority);
553 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
554 pq_priority_t old_priority, new_priority;
555 int index, insert_at;
557 DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
559 if (!pq_item_priority(pq, id, &old_priority)) {
564 index = pq_find_item(pq, id, old_priority);
566 if (!pq_test_filter(pq->entries + index, filter)) {
571 new_priority = old_priority + delta;
573 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
575 if (pq->end - pq->start == 1) {
576 DEBUG( fprintf(stderr, " -- one item\n") );
577 /* only the one item anyway */
578 pq->entries[pq->start].priority = new_priority;
581 insert_at = pq_insertion_point(pq, new_priority);
582 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
583 /* the item is still in the queue, so either side of it means it
585 if (insert_at == index || insert_at == index+1) {
586 DEBUG( fprintf(stderr, " -- change in place\n") );
587 pq->entries[index].priority = new_priority;
590 pq_entry saved = pq->entries[index];
591 saved.priority = new_priority;
593 if (insert_at < index) {
594 DEBUG( fprintf(stderr, " - insert_at < index\n") );
595 memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
596 sizeof(pq_entry) * (index - insert_at));
597 pq->entries[insert_at] = saved;
600 DEBUG( fprintf(stderr, " - insert_at > index\n") );
602 memmove(pq->entries + index, pq->entries + index + 1,
603 sizeof(pq_entry) * (insert_at - index));
604 pq->entries[insert_at] = saved;
609 pq_set_id_priority(pq, id, new_priority);
610 *priority = new_priority;
616 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
621 if (pq->end == pq->start)
624 *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));
625 for (i = pq->start; i < pq->end; ++i) {
626 if (pq_test_filter(pq->entries + i, filter)) {
627 (*items)[count++] = pq->entries[i];
639 pq_dump - dump the internals of the queue structure.
642 pq_dump(poe_queue *pq) {
646 printf("poe_queue\n");
647 printf(" start: %d\n", pq->start);
648 printf(" end: %d\n", pq->end);
649 printf(" alloc: %d\n", pq->alloc);
650 printf(" seq: %d\n", pq->queue_seq);
651 printf(" **Queue Entries:\n"
652 " index: id priority SV\n");
653 for (i = pq->start; i < pq->end; ++i) {
654 pq_entry *entry = pq->entries + i;
655 printf(" %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,
656 entry->payload, (unsigned)SvREFCNT(entry->payload));
658 printf(" **Hash entries:\n");
659 hv_iterinit(pq->ids);
660 while ((he = hv_iternext(pq->ids)) != NULL) {
662 printf(" %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
666 /* this typedef lets the standard T_PTROBJ typemap handle the
667 conversion between perl class and C type and back again */
668 typedef poe_queue *POE__XS__Queue__Array;
670 /* This gives us our new method and correct destruction */
671 #define pq_new(class) pq_create()
672 #define pq_DESTROY(pq) pq_delete(pq)
674 MODULE = POE::XS::Queue::Array PACKAGE = POE::XS::Queue::Array PREFIX = pq_
678 POE::XS::Queue::Array
683 POE::XS::Queue::Array pq
686 pq_enqueue(pq, priority, payload)
687 POE::XS::Queue::Array pq
693 POE::XS::Queue::Array pq
695 pq_priority_t priority;
699 if (pq_dequeue_next(pq, &priority, &id, &payload)) {
701 PUSHs(sv_2mortal(newSVnv(priority)));
702 PUSHs(sv_2mortal(newSViv(id)));
703 PUSHs(sv_2mortal(payload));
707 pq_get_next_priority(pq)
708 POE::XS::Queue::Array pq
710 pq_priority_t priority;
712 if (pq_get_next_priority(pq, &priority)) {
713 RETVAL = newSVnv(priority); /* XS will mortalize this for us */
716 RETVAL = &PL_sv_undef;
722 pq_get_item_count(pq)
723 POE::XS::Queue::Array pq
726 pq_remove_item(pq, id, filter)
727 POE::XS::Queue::Array pq
733 if (pq_remove_item(pq, id, filter, &removed)) {
735 PUSHs(sv_2mortal(newSVnv(removed.priority)));
736 PUSHs(sv_2mortal(newSViv(removed.id)));
737 PUSHs(sv_2mortal(removed.payload));
741 pq_remove_items(pq, filter, ...)
742 POE::XS::Queue::Array pq
746 pq_entry *removed_entries = NULL;
751 max_count = SvIV(ST(2));
753 max_count = pq_get_item_count(pq);
754 removed_count = pq_remove_items(pq, filter, max_count,
757 EXTEND(SP, removed_count);
758 for (i = 0; i < removed_count; ++i) {
759 pq_entry *entry = removed_entries + i;
763 av_store(av, 0, newSVnv(entry->priority));
764 av_store(av, 1, newSViv(entry->id));
765 av_store(av, 2, entry->payload);
766 rv = newRV_noinc((SV *)av);
767 PUSHs(sv_2mortal(rv));
771 free(removed_entries);
774 pq_adjust_priority(pq, id, filter, delta)
775 POE::XS::Queue::Array pq
780 pq_priority_t new_priority;
782 if (pq_adjust_priority(pq, id, filter, delta, &new_priority)) {
784 PUSHs(sv_2mortal(newSVnv(new_priority)));
788 pq_set_priority(pq, id, filter, new_priority)
789 POE::XS::Queue::Array pq
794 if (pq_set_priority(pq, id, filter, new_priority)) {
796 PUSHs(sv_2mortal(newSVnv(new_priority)));
800 pq_peek_items(pq, filter, ...)
801 POE::XS::Queue::Array pq
809 max_count = SvIV(ST(2));
811 max_count = pq_get_item_count(pq);
812 count = pq_peek_items(pq, filter, max_count, &ret_items);
815 for (i = 0; i < count; ++i) {
816 pq_entry *entry = ret_items + i;
820 av_store(av, 0, newSVnv(entry->priority));
821 av_store(av, 1, newSViv(entry->id));
822 av_store(av, 2, newSVsv(entry->payload));
823 rv = newRV_noinc((SV *)av);
824 PUSHs(sv_2mortal(rv));
831 POE::XS::Queue::Array pq