--- /dev/null
+#include "EXTERN.h"
+#include "perl.h"
+#include "XSUB.h"
+#include <string.h> /* for memmove() mostly */
+#include <errno.h> /* errno values */
+
+/*#define DEBUG(x) x*/
+#define DEBUG(x)
+
+typedef unsigned pq_id_t;
+typedef double pq_priority_t;
+
+#define PQ_START_SIZE 10
+#define AT_START 0
+#define AT_END 1
+
+pq_id_t queue_seq;
+
+/* an entry in the queue */
+typedef struct {
+ pq_priority_t priority;
+ pq_id_t id;
+ SV *payload;
+} pq_entry;
+
+/*
+We store the queue in a similar way to the way perl deals with arrays,
+we keep a block of memory, but the first element may or may not be in use,
+depending on the pattern of usage.
+
+There's 3 value controlling usage of the array:
+
+ - alloc - the number of elements allocated in total
+ - start - the first element in use in the array
+ - end - one past the end of the last element in the array
+
+This has the properties that:
+
+ start == 0 - no space at the front
+ end == alloc - no space at the end
+ end - start - number of elements in the queue
+
+We use a perl hash (HV *) to store the mapping from ids to priorities.
+
+*/
+typedef struct {
+ /* the first entry in use */
+ int start;
+
+ /* 1 past the last entry in use, hence end - start is the number of
+ entries in the queue */
+ int end;
+
+ /* the total number of entries allocated */
+ int alloc;
+
+ /* used to generate item ids */
+ pq_id_t queue_seq;
+
+ /* used to track in use item ids */
+ HV *ids;
+
+ /* the actual entries */
+ pq_entry *entries;
+} poe_queue;
+
+/*
+poe_create - create a new queue object.
+
+No parameters. returns the new queue object.
+
+*/
+poe_queue *
+pq_create(void) {
+ poe_queue *pq = malloc(sizeof(poe_queue));
+
+ if (pq == NULL)
+ croak("Out of memory");
+ pq->start = 0;
+ pq->end = 0;
+ pq->alloc = PQ_START_SIZE;
+ pq->queue_seq = 0;
+ pq->ids = newHV();
+ pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);
+ if (pq->entries == NULL)
+ croak("Out of memory");
+
+ DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
+
+ return pq;
+}
+
+/*
+pq_delete - release the queue object.
+
+This also releases one reference from each SV in the queue.
+
+*/
+void
+pq_delete(poe_queue *pq) {
+ int i;
+
+ DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
+ if (pq->end > pq->start) {
+ for (i = pq->start; i < pq->end; ++i) {
+ SvREFCNT_dec(pq->entries[i].payload);
+ }
+ }
+ SvREFCNT_dec((SV *)pq->ids);
+ pq->ids = NULL;
+ if (pq->entries)
+ free(pq->entries);
+ pq->entries = NULL;
+ free(pq);
+}
+
+/*
+pq_new_id - generate a new item id.
+
+Internal use only.
+
+This, the following 3 functions and pq_create, pq_delete, should be
+all that needs to be modified if we change hash implementations.
+
+*/
+static
+pq_id_t
+pq_new_id(poe_queue *pq, pq_priority_t priority) {
+ int seq = ++pq->queue_seq;
+ SV *index = newSViv(seq);
+
+ while (hv_exists_ent(pq->ids, index, 0)) {
+ seq = ++pq->queue_seq;
+ sv_setiv(index, seq);
+ }
+ hv_store_ent(pq->ids, index, newSVnv(priority), 0);
+
+ return seq;
+}
+
+/*
+pq_release_id - releases an id for future use.
+*/
+static
+void
+pq_release_id(poe_queue *pq, pq_id_t id) {
+ SV *id_sv = sv_2mortal(newSViv(id));
+
+ hv_delete_ent(pq->ids, id_sv, 0, 0);
+}
+
+/*
+pq_item_priority - get the priority of an item given it's id
+*/
+static
+int
+pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
+ HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
+
+ if (!entry)
+ return 0;
+
+ *priority = SvNV(HeVAL(entry));
+
+ return 1;
+}
+
+/*
+pq_set_id_priority - set the priority of an item in the id hash
+*/
+static
+void
+pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
+ HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
+
+ if (!entry)
+ croak("pq_set_priority: id not found");
+
+ sv_setnv(HeVAL(entry), new_priority);
+}
+
+/*
+pq_realloc - make space at the front of back of the queue.
+
+This adjusts the queue to allow insertion of a single item at the
+front or the back of the queue.
+
+If the queue has 33% or more space available we simple adjust the
+position of the in-use items within the array. We try not to push the
+items right up against the opposite end of the array, since we might
+need to insert items there too.
+
+If the queue has less than 33% space available we allocate another 50%
+space. We then only move the queue elements if we need space at the
+front, since the reallocation has just opened up a huge space at the
+back. Since we're reallocating exponentially larger sizes we should
+have a constant time cost on reallocation per queue item stored (but
+other costs are going to be higher.)
+
+*/
+static
+void
+pq_realloc(poe_queue *pq, int at_end) {
+ int count = pq->end - pq->start;
+
+ DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
+ if (count * 3 / 2 < pq->alloc) {
+ /* 33 % or more space available, use some of it */
+ int new_start;
+
+ if (at_end) {
+ new_start = (pq->alloc - count) / 3;
+ }
+ else {
+ new_start = (pq->alloc - count) * 2 / 3;
+ }
+ DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
+ memmove(pq->entries + new_start, pq->entries + pq->start,
+ count * sizeof(pq_entry));
+ pq->start = new_start;
+ pq->end = new_start + count;
+ }
+ else {
+ int new_alloc = pq->alloc * 3 / 2;
+ pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);
+ pq->alloc = new_alloc;
+
+ if (!pq->entries)
+ croak("Out of memory");
+
+ DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );
+
+ if (!at_end) {
+ int new_start = (new_alloc - count) * 2 / 3;
+ DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
+ memmove(pq->entries + new_start, pq->entries + pq->start,
+ count * sizeof(pq_entry));
+ pq->start = new_start;
+ pq->end = new_start + count;
+ }
+ }
+ DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
+}
+
+/*
+pq_insertion_point - figure out where to insert an item with the given
+priority
+
+Internal.
+*/
+static
+int
+pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
+ /* for now this is just a linear search, later we should make it
+ binary */
+ int i = pq->end;
+ while (i > pq->start &&
+ priority < pq->entries[i-1].priority) {
+ --i;
+ }
+
+ return i;
+}
+
+int
+pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
+ int fill_at;
+ pq_id_t id = pq_new_id(pq, priority);
+
+ DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
+ if (pq->start == pq->end) {
+ DEBUG( fprintf(stderr, " - on empty queue\n") );
+ /* allow room at front and back for new entries */
+ pq->start = pq->alloc / 3;
+ pq->end = pq->start + 1;
+ fill_at = pq->start;
+ }
+ else if (priority >= pq->entries[pq->end-1].priority) {
+ DEBUG( fprintf(stderr, " - at the end\n") );
+ if (pq->end == pq->alloc)
+ /* past the end - need to realloc or make some space */
+ pq_realloc(pq, AT_END);
+
+ fill_at = pq->end;
+ ++pq->end;
+ }
+ else if (priority < pq->entries[pq->start].priority) {
+ DEBUG( fprintf(stderr, " - at the front\n") );
+ if (pq->start == 0)
+ /* no space at the front, make some */
+ pq_realloc(pq, AT_START);
+
+ --pq->start;
+ fill_at = pq->start;
+ }
+ else {
+ int i;
+ DEBUG( fprintf(stderr, " - in the middle\n") );
+ i = pq_insertion_point(pq, priority);
+
+ /* if we're near the end we want to push entries up, otherwise down */
+ if (i - pq->start > (pq->end - pq->start) / 2) {
+ DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",
+ i, pq->start, pq->end) );
+ /* make sure we have space, this might end up copying twice,
+ but too bad for now */
+ if (pq->end == pq->alloc) {
+ int old_start = pq->start;
+ pq_realloc(pq, AT_END);
+ i += pq->start - old_start;
+ }
+
+ memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));
+ ++pq->end;
+ fill_at = i;
+ }
+ else {
+ DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",
+ i, pq->start, pq->end) );
+ if (pq->start == 0) {
+ pq_realloc(pq, AT_START);
+ i += pq->start;
+ }
+ memmove(pq->entries + pq->start - 1, pq->entries + pq->start,
+ (i - pq->start) * sizeof(pq_entry));
+ --pq->start;
+ fill_at = i-1;
+ }
+ }
+ pq->entries[fill_at].priority = priority;
+ pq->entries[fill_at].id = id;
+ pq->entries[fill_at].payload = newSVsv(payload);
+
+ return id;
+}
+
+/*
+ Note: it's up to the caller to release the SV. The XS code does this
+ by making it mortal.
+*/
+int
+pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
+ pq_entry *entry;
+ /* the caller needs to release the payload (somehow) */
+ if (pq->start == pq->end)
+ return 0;
+
+ entry = pq->entries + pq->start++;
+ *priority = entry->priority;
+ *id = entry->id;
+ *payload = entry->payload;
+ pq_release_id(pq, entry->id);
+
+ return 1;
+}
+
+int
+pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
+ if (pq->start == pq->end)
+ return 0;
+
+ *priority = pq->entries[pq->start].priority;
+ return 1;
+}
+
+int
+pq_get_item_count(poe_queue *pq) {
+ return pq->end - pq->start;
+}
+
+/*
+pq_test_filter - the XS magic involved in passing the payload to a
+filter function.
+*/
+static
+int
+pq_test_filter(pq_entry *entry, SV *filter) {
+ /* man perlcall for the magic here */
+ dSP;
+ int count;
+ SV *result_sv;
+ int result;
+
+ ENTER;
+ SAVETMPS;
+ PUSHMARK(SP);
+ XPUSHs(sv_2mortal(newSVsv(entry->payload)));
+ PUTBACK;
+
+ count = call_sv(filter, G_SCALAR);
+
+ SPAGAIN;
+
+ if (count != 1)
+ croak("got other than 1 value in scalar context");
+
+ result_sv = POPs;
+ result = SvTRUE(result_sv);
+
+ PUTBACK;
+ FREETMPS;
+ LEAVE;
+
+ return result;
+}
+
+/*
+pq_find_item - search for an item we know is there.
+
+Internal.
+*/
+static
+int
+pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
+ int i;
+
+ for (i = pq->start; i < pq->end; ++i) {
+ if (pq->entries[i].id == id)
+ return i;
+ }
+ DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
+ croak("Internal inconsistency: event should have been found");
+}
+
+int
+pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
+ pq_priority_t priority;
+ int index;
+
+ if (!pq_item_priority(pq, id, &priority)) {
+ errno = ESRCH;
+ return 0;
+ }
+
+ index = pq_find_item(pq, id, priority);
+
+ if (!pq_test_filter(pq->entries + index, filter)) {
+ errno = EPERM;
+ return 0;
+ }
+
+ *removed = pq->entries[index];
+ pq_release_id(pq, id);
+ if (index == pq->start) {
+ ++pq->start;
+ }
+ else if (index == pq->end - 1) {
+ --pq->end;
+ }
+ else {
+ memmove(pq->entries + index, pq->entries + index + 1,
+ sizeof(pq_entry) * (pq->end - index - 1));
+ --pq->end;
+ }
+
+ return 1;
+}
+
+int
+pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
+ int in_index, out_index;
+ int remove_count = 0;
+
+ *entries = NULL;
+ if (pq->start == pq->end)
+ return 0;
+
+ *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));
+ if (!*entries)
+ croak("Out of memory");
+
+ in_index = out_index = pq->start;
+ while (in_index < pq->end && remove_count < max_count) {
+ if (pq_test_filter(pq->entries + in_index, filter)) {
+ pq_release_id(pq, pq->entries[in_index].id);
+ (*entries)[remove_count++] = pq->entries[in_index++];
+ }
+ else {
+ pq->entries[out_index++] = pq->entries[in_index++];
+ }
+ }
+ while (in_index < pq->end) {
+ pq->entries[out_index++] = pq->entries[in_index++];
+ }
+ pq->end = out_index;
+
+ return remove_count;
+}
+
+/*
+We need to keep the following 2 functions in sync (or combine the
+common code.)
+*/
+int
+pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
+ pq_priority_t old_priority;
+ int index, insert_at;
+
+ if (!pq_item_priority(pq, id, &old_priority)) {
+ errno = ESRCH;
+ return 0;
+ }
+
+ index = pq_find_item(pq, id, old_priority);
+
+ if (!pq_test_filter(pq->entries + index, filter)) {
+ errno = EPERM;
+ return 0;
+ }
+
+ DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
+
+ if (pq->end - pq->start == 1) {
+ DEBUG( fprintf(stderr, " -- one item\n") );
+ /* only the one item anyway */
+ pq->entries[pq->start].priority = new_priority;
+ }
+ else {
+ insert_at = pq_insertion_point(pq, new_priority);
+ DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
+ /* the item is still in the queue, so either side of it means it
+ won't move */
+ if (insert_at == index || insert_at == index+1) {
+ DEBUG( fprintf(stderr, " -- change in place\n") );
+ pq->entries[index].priority = new_priority;
+ }
+ else {
+ pq_entry saved = pq->entries[index];
+ saved.priority = new_priority;
+
+ if (insert_at < index) {
+ DEBUG( fprintf(stderr, " - insert_at < index\n") );
+ memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
+ sizeof(pq_entry) * (index - insert_at));
+ pq->entries[insert_at] = saved;
+ }
+ else {
+ DEBUG( fprintf(stderr, " - insert_at > index\n") );
+ --insert_at;
+ memmove(pq->entries + index, pq->entries + index + 1,
+ sizeof(pq_entry) * (insert_at - index));
+ pq->entries[insert_at] = saved;
+ }
+ }
+ }
+
+ pq_set_id_priority(pq, id, new_priority);
+
+ return 1;
+}
+
+int
+pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
+ pq_priority_t old_priority, new_priority;
+ int index, insert_at;
+
+ DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
+
+ if (!pq_item_priority(pq, id, &old_priority)) {
+ errno = ESRCH;
+ return 0;
+ }
+
+ index = pq_find_item(pq, id, old_priority);
+
+ if (!pq_test_filter(pq->entries + index, filter)) {
+ errno = EPERM;
+ return 0;
+ }
+
+ new_priority = old_priority + delta;
+
+ DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
+
+ if (pq->end - pq->start == 1) {
+ DEBUG( fprintf(stderr, " -- one item\n") );
+ /* only the one item anyway */
+ pq->entries[pq->start].priority = new_priority;
+ }
+ else {
+ insert_at = pq_insertion_point(pq, new_priority);
+ DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
+ /* the item is still in the queue, so either side of it means it
+ won't move */
+ if (insert_at == index || insert_at == index+1) {
+ DEBUG( fprintf(stderr, " -- change in place\n") );
+ pq->entries[index].priority = new_priority;
+ }
+ else {
+ pq_entry saved = pq->entries[index];
+ saved.priority = new_priority;
+
+ if (insert_at < index) {
+ DEBUG( fprintf(stderr, " - insert_at < index\n") );
+ memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
+ sizeof(pq_entry) * (index - insert_at));
+ pq->entries[insert_at] = saved;
+ }
+ else {
+ DEBUG( fprintf(stderr, " - insert_at > index\n") );
+ --insert_at;
+ memmove(pq->entries + index, pq->entries + index + 1,
+ sizeof(pq_entry) * (insert_at - index));
+ pq->entries[insert_at] = saved;
+ }
+ }
+ }
+
+ pq_set_id_priority(pq, id, new_priority);
+ *priority = new_priority;
+
+ return 1;
+}
+
+int
+pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
+ int count = 0;
+ int i;
+
+ *items = NULL;
+ if (pq->end == pq->start)
+ return 0;
+
+ *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));
+ for (i = pq->start; i < pq->end; ++i) {
+ if (pq_test_filter(pq->entries + i, filter)) {
+ (*items)[count++] = pq->entries[i];
+ }
+ }
+ if (!count) {
+ free(*items);
+ *items = NULL;
+ }
+
+ return count;
+}
+
+/*
+pq_dump - dump the internals of the queue structure.
+*/
+void
+pq_dump(poe_queue *pq) {
+ int i;
+ HE *he;
+
+ printf("poe_queue\n");
+ printf(" start: %d\n", pq->start);
+ printf(" end: %d\n", pq->end);
+ printf(" alloc: %d\n", pq->alloc);
+ printf(" seq: %d\n", pq->queue_seq);
+ printf(" **Queue Entries:\n"
+ " index: id priority SV\n");
+ for (i = pq->start; i < pq->end; ++i) {
+ pq_entry *entry = pq->entries + i;
+ printf(" %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,
+ entry->payload, (unsigned)SvREFCNT(entry->payload));
+ }
+ printf(" **Hash entries:\n");
+ hv_iterinit(pq->ids);
+ while ((he = hv_iternext(pq->ids)) != NULL) {
+ STRLEN len;
+ printf(" %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
+ }
+}
+
+/* this typedef lets the standard T_PTROBJ typemap handle the
+conversion between perl class and C type and back again */
+typedef poe_queue *POE__XS__Queue__Array;
+
+/* This gives us our new method and correct destruction */
+#define pq_new(class) pq_create()
+#define pq_DESTROY(pq) pq_delete(pq)
+
+MODULE = POE::XS::Queue::Array PACKAGE = POE::XS::Queue::Array PREFIX = pq_
+
+PROTOTYPES: DISABLE
+
+POE::XS::Queue::Array
+pq_new(class)
+
+void
+pq_DESTROY(pq)
+ POE::XS::Queue::Array pq
+
+int
+pq_enqueue(pq, priority, payload)
+ POE::XS::Queue::Array pq
+ double priority
+ SV *payload
+
+void
+pq_dequeue_next(pq)
+ POE::XS::Queue::Array pq
+ PREINIT:
+ pq_priority_t priority;
+ pq_id_t id;
+ SV *payload;
+ PPCODE:
+ if (pq_dequeue_next(pq, &priority, &id, &payload)) {
+ EXTEND(SP, 3);
+ PUSHs(sv_2mortal(newSVnv(priority)));
+ PUSHs(sv_2mortal(newSViv(id)));
+ PUSHs(sv_2mortal(payload));
+ }
+
+SV *
+pq_get_next_priority(pq)
+ POE::XS::Queue::Array pq
+ PREINIT:
+ pq_priority_t priority;
+ CODE:
+ if (pq_get_next_priority(pq, &priority)) {
+ RETVAL = newSVnv(priority); /* XS will mortalize this for us */
+ }
+ else {
+ RETVAL = &PL_sv_undef;
+ }
+ OUTPUT:
+ RETVAL
+
+int
+pq_get_item_count(pq)
+ POE::XS::Queue::Array pq
+
+void
+pq_remove_item(pq, id, filter)
+ POE::XS::Queue::Array pq
+ int id
+ SV *filter
+ PREINIT:
+ pq_entry removed;
+ PPCODE:
+ if (pq_remove_item(pq, id, filter, &removed)) {
+ EXTEND(SP, 3);
+ PUSHs(sv_2mortal(newSVnv(removed.priority)));
+ PUSHs(sv_2mortal(newSViv(removed.id)));
+ PUSHs(sv_2mortal(removed.payload));
+ }
+
+void
+pq_remove_items(pq, filter, ...)
+ POE::XS::Queue::Array pq
+ SV *filter
+ PREINIT:
+ int max_count;
+ pq_entry *removed_entries = NULL;
+ int removed_count;
+ int i;
+ PPCODE:
+ if (items > 2)
+ max_count = SvIV(ST(2));
+ else
+ max_count = pq_get_item_count(pq);
+ removed_count = pq_remove_items(pq, filter, max_count,
+ &removed_entries);
+ if (removed_count) {
+ EXTEND(SP, removed_count);
+ for (i = 0; i < removed_count; ++i) {
+ pq_entry *entry = removed_entries + i;
+ AV *av = newAV();
+ SV *rv;
+ av_extend(av, 2);
+ av_store(av, 0, newSVnv(entry->priority));
+ av_store(av, 1, newSViv(entry->id));
+ av_store(av, 2, entry->payload);
+ rv = newRV_noinc((SV *)av);
+ PUSHs(sv_2mortal(rv));
+ }
+ }
+ if (removed_entries)
+ free(removed_entries);
+
+void
+pq_adjust_priority(pq, id, filter, delta)
+ POE::XS::Queue::Array pq
+ int id
+ SV *filter
+ double delta
+ PREINIT:
+ pq_priority_t new_priority;
+ PPCODE:
+ if (pq_adjust_priority(pq, id, filter, delta, &new_priority)) {
+ EXTEND(SP, 1);
+ PUSHs(sv_2mortal(newSVnv(new_priority)));
+ }
+
+void
+pq_set_priority(pq, id, filter, new_priority)
+ POE::XS::Queue::Array pq
+ int id
+ SV *filter
+ double new_priority
+ PPCODE:
+ if (pq_set_priority(pq, id, filter, new_priority)) {
+ EXTEND(SP, 1);
+ PUSHs(sv_2mortal(newSVnv(new_priority)));
+ }
+
+void
+pq_peek_items(pq, filter, ...)
+ POE::XS::Queue::Array pq
+ SV *filter
+ PREINIT:
+ pq_entry *ret_items;
+ int count, i;
+ int max_count;
+ PPCODE:
+ if (items == 3)
+ max_count = SvIV(ST(2));
+ else
+ max_count = pq_get_item_count(pq);
+ count = pq_peek_items(pq, filter, max_count, &ret_items);
+ if (count) {
+ EXTEND(SP, count);
+ for (i = 0; i < count; ++i) {
+ pq_entry *entry = ret_items + i;
+ AV *av = newAV();
+ SV *rv;
+ av_extend(av, 2);
+ av_store(av, 0, newSVnv(entry->priority));
+ av_store(av, 1, newSViv(entry->id));
+ av_store(av, 2, newSVsv(entry->payload));
+ rv = newRV_noinc((SV *)av);
+ PUSHs(sv_2mortal(rv));
+ }
+ free(ret_items);
+ }
+
+void
+pq_dump(pq)
+ POE::XS::Queue::Array pq
--- /dev/null
+#!/usr/bin/perl -w
+# $Id: 01_array.t,v 1.1 2004/09/04 22:50:38 rcaputo Exp $
+
+# Tests basic queue operations.
+# copied from the POE distribution and modified to use
+# POE::XS::Queue::Array instead of POE::Queue::Array.
+#
+# modified a couple of tests to use is_deeply() instead of ok(eq_array ...)
+# during debugging.
+
+use strict;
+
+use lib qw(./mylib);
+
+use Test::More tests => 2047;
+
+sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+sub POE::Kernel::TRACE_DEFAULT () { 1 }
+sub POE::Kernel::TRACE_FILENAME () { "./test-output.err" }
+
+use POSIX qw(EPERM ESRCH);
+
+BEGIN { use_ok("POE::XS::Queue::Array") }
+
+my $q = POE::XS::Queue::Array->new();
+
+ok($q->get_item_count == 0, "queue begins empty");
+ok(!defined($q->dequeue_next), "can't dequeue from empty queue");
+
+ok($q->enqueue(1, "one") == 1, "first enqueue has id 1");
+ok($q->enqueue(3, "tre") == 2, "second enqueue has id 2");
+ok($q->enqueue(2, "two") == 3, "third enqueue has id 3");
+
+is_deeply(
+ [$q->dequeue_next()], [1, 1, "one"],
+ "event one dequeued correctly"
+);
+
+is_deeply(
+ [$q->dequeue_next()], [2, 3, "two"],
+ "event two dequeued correctly"
+);
+
+ok(
+ eq_array( [$q->dequeue_next()], [3, 2, "tre"] ),
+ "event three dequeued correctly"
+);
+
+ok(
+ eq_array( [$q->dequeue_next()], [] ),
+ "empty queue marker dequeued correctly"
+);
+
+{ my @events = (
+ [ a => 1 ],
+ [ c => 3 ],
+ [ e => 5 ],
+ [ b => 2 ],
+ [ d => 4 ],
+ );
+
+ my $base_event_id = 4;
+ enqueue_events(\@events, $base_event_id);
+}
+
+# Not constants.
+sub always_ok { 1 }
+sub never_ok { 0 }
+
+ok(
+ eq_array( [$q->remove_item(7, \&always_ok)], [2, 7, "b"] ),
+ "removed event b by its ID"
+);
+
+ok(
+ eq_array( [$q->remove_item(5, \&always_ok)], [3, 5, "c"] ),
+ "removed event c by its ID"
+);
+
+ok(
+ eq_array( [$q->remove_item(8, \&always_ok)], [4, 8, "d"] ),
+ "removed event d by its ID"
+);
+
+$! = 0;
+ok(
+ ( eq_array( [$q->remove_item(6, \&never_ok )], [] ) &&
+ $! == EPERM
+ ),
+ "didn't have permission to remove event e"
+);
+
+$! = 0;
+ok(
+ ( eq_array( [$q->remove_item(8, \&always_ok)], [] ) &&
+ $! == ESRCH
+ ),
+ "couldn't remove nonexistent event d"
+);
+
+ok(
+ eq_array( [$q->dequeue_next()], [1, 4, "a"] ),
+ "dequeued event a correctly"
+);
+
+ok(
+ eq_array( [$q->dequeue_next()], [5, 6, "e"] ),
+ "dequeued event e correctly"
+);
+
+ok(
+ eq_array( [$q->dequeue_next()], [] ),
+ "empty queue marker dequeued correctly"
+);
+
+{ my @events = (
+ [ a => 1 ],
+ [ c => 3 ],
+ [ e => 5 ],
+ [ b => 2 ],
+ [ d => 4 ],
+ [ f => 6 ],
+ );
+
+ my $base_event_id = 9;
+ enqueue_events(\@events, $base_event_id);
+}
+
+ok($q->get_item_count() == 6, "queue contains six events");
+
+sub odd_letters { $_[0] =~ /[ace]/ }
+sub even_letters { $_[0] =~ /[bdf]/ }
+
+{ my @items = $q->remove_items(\&odd_letters, 3);
+ my @target = (
+ [ 1, 9, "a" ],
+ [ 3, 10, "c" ],
+ [ 5, 11, "e" ],
+ );
+
+ ok(eq_array(\@items, \@target), "removed odd letters from queue");
+ ok($q->get_item_count() == 3, "leaving three events");
+}
+
+{ my @items = $q->remove_items(\&odd_letters, 3);
+ my @target;
+
+ ok(eq_array(\@items, \@target), "no more odd letters to remove");
+}
+
+{ my @items = $q->remove_items(\&even_letters, 3);
+ my @target = (
+ [ 2, 12, "b" ],
+ [ 4, 13, "d" ],
+ [ 6, 14, "f" ],
+ );
+
+ is_deeply(\@items, \@target, "removed even letters from queue");
+ ok($q->get_item_count() == 0, "leaving the queue empty");
+}
+
+{ my @events = (
+ [ a => 10 ],
+ [ b => 20 ],
+ [ c => 30 ],
+ [ d => 40 ],
+ [ e => 50 ],
+ [ f => 60 ],
+ );
+
+ my $base_event_id = 15;
+ enqueue_events(\@events, $base_event_id);
+}
+
+ok($q->get_item_count() == 6, "leaving six events in the queue");
+
+{ my @items = $q->peek_items(\&even_letters);
+ my @target = (
+ [ 20, 16, "b" ],
+ [ 40, 18, "d" ],
+ [ 60, 20, "f" ],
+ );
+
+ ok(eq_array(\@items, \@target), "found even letters in queue");
+}
+
+ok(
+ $q->adjust_priority(19, \&always_ok, -15) == 35,
+ "adjusted event e priority by -15"
+);
+
+ok(
+ $q->adjust_priority(16, \&always_ok, +15) == 35,
+ "adjusted event b priority by +15"
+);
+
+{ my @items = $q->remove_items(\&always_ok);
+ my @target = (
+ [ 10, 15, "a" ],
+ [ 30, 17, "c" ],
+ [ 35, 19, "e" ], # e got there first
+ [ 35, 16, "b" ], # b got there second
+ [ 40, 18, "d" ],
+ [ 60, 20, "f" ],
+ );
+
+ ok(eq_array(\@items, \@target), "colliding priorities are FIFO");
+}
+
+ok($q->get_item_count() == 0, "full queue removal leaves zero events");
+
+### Large Queue Tests. The only functions that use large queues are
+### enqueue(), adjust_priority(), and set_priority(). Large queues
+### are over ~500 elements.
+
+# Generate a list of events in random priority order.
+
+sub shuffled_list {
+ my $limit = shift() - 1;
+ my @list = (0..$limit);
+ my $i = @list;
+ while (--$i) {
+ my $j = int rand($i+1);
+ @list[$i,$j] = @list[$j,$i];
+ }
+ @list;
+}
+
+sub is_even { !($_[0] % 2) }
+sub is_odd { $_[0] % 2 }
+
+sub verify_queue {
+ my $target_diff = shift;
+
+ my $low_priority = -999999;
+
+ while (my ($pri, $id, $item) = $q->dequeue_next()) {
+ my $diff;
+ if ($pri < 0) {
+ $diff = $item - $pri;
+ }
+ else {
+ $diff = $pri - $item;
+ }
+
+ ok(
+ ($pri > $low_priority) && ($diff == $target_diff),
+ "$item - $pri == $diff (should be $target_diff)"
+ );
+
+ $low_priority = $pri;
+ }
+}
+
+# Enqueue all the events, then adjust their priorities. The
+# even-numbered events have their priorities reduced by 1000; the odd
+# ones have their priorities increased by 1000.
+
+{ my @ids;
+ for my $major (shuffled_list(10)) {
+ for my $minor (shuffled_list(100)) {
+ my $priority = sprintf("%2d%02d", $major, $minor);
+ push @ids, $q->enqueue($priority, $priority);
+ }
+ }
+
+ foreach my $id (@ids) { $q->adjust_priority($id, \&is_even, -1000); }
+ foreach my $id (@ids) { $q->adjust_priority($id, \&is_odd, 1000); }
+}
+
+# Verify that the queue remains in order, and that the adjusted
+# priorities are correct.
+
+print "!!!!!!!! 1\n";
+verify_queue(1000);
+
+# Now set priorities to absolute values. The values are
+
+{ my @id_recs;
+ for my $major (shuffled_list(10)) {
+ for my $minor (shuffled_list(100)) {
+ my $priority = sprintf("%2d%02d", $major, $minor);
+ push @id_recs, [ $q->enqueue($priority, $priority), $priority ];
+ }
+ }
+
+ foreach my $id_rec (@id_recs) {
+ my ($id, $pri) = @$id_rec;
+ $q->set_priority($id, \&is_even, $pri + 500);
+ }
+
+ foreach my $id_rec (@id_recs) {
+ my ($id, $pri) = @$id_rec;
+ $q->set_priority($id, \&is_odd, $pri + 500);
+ }
+
+ verify_queue(500);
+}
+
+### Helper functions.
+
+sub enqueue_events {
+ my ($events, $id) = @_;
+ foreach (@$events) {
+ my ($ev, $prio) = @$_;
+ ok($q->enqueue($prio, $ev) == $id++, "enqueued event $ev correctly");
+ }
+}