#include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include /* for memmove() mostly */ #include /* errno values */ /*#define DEBUG(x) x*/ #define DEBUG(x) typedef unsigned pq_id_t; typedef double pq_priority_t; #define PQ_START_SIZE 10 #define AT_START 0 #define AT_END 1 pq_id_t queue_seq; /* an entry in the queue */ typedef struct { pq_priority_t priority; pq_id_t id; SV *payload; } pq_entry; /* We store the queue in a similar way to the way perl deals with arrays, we keep a block of memory, but the first element may or may not be in use, depending on the pattern of usage. There's 3 value controlling usage of the array: - alloc - the number of elements allocated in total - start - the first element in use in the array - end - one past the end of the last element in the array This has the properties that: start == 0 - no space at the front end == alloc - no space at the end end - start - number of elements in the queue We use a perl hash (HV *) to store the mapping from ids to priorities. */ typedef struct { /* the first entry in use */ int start; /* 1 past the last entry in use, hence end - start is the number of entries in the queue */ int end; /* the total number of entries allocated */ int alloc; /* used to generate item ids */ pq_id_t queue_seq; /* used to track in use item ids */ HV *ids; /* the actual entries */ pq_entry *entries; } poe_queue; /* poe_create - create a new queue object. No parameters. returns the new queue object. */ poe_queue * pq_create(void) { poe_queue *pq = malloc(sizeof(poe_queue)); if (pq == NULL) croak("Out of memory"); pq->start = 0; pq->end = 0; pq->alloc = PQ_START_SIZE; pq->queue_seq = 0; pq->ids = newHV(); pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE); if (pq->entries == NULL) croak("Out of memory"); DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) ); return pq; } /* pq_delete - release the queue object. This also releases one reference from each SV in the queue. */ void pq_delete(poe_queue *pq) { int i; DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) ); if (pq->end > pq->start) { for (i = pq->start; i < pq->end; ++i) { SvREFCNT_dec(pq->entries[i].payload); } } SvREFCNT_dec((SV *)pq->ids); pq->ids = NULL; if (pq->entries) free(pq->entries); pq->entries = NULL; free(pq); } /* pq_new_id - generate a new item id. Internal use only. This, the following 3 functions and pq_create, pq_delete, should be all that needs to be modified if we change hash implementations. */ static pq_id_t pq_new_id(poe_queue *pq, pq_priority_t priority) { int seq = ++pq->queue_seq; SV *index = newSViv(seq); while (hv_exists_ent(pq->ids, index, 0)) { seq = ++pq->queue_seq; sv_setiv(index, seq); } hv_store_ent(pq->ids, index, newSVnv(priority), 0); return seq; } /* pq_release_id - releases an id for future use. */ static void pq_release_id(poe_queue *pq, pq_id_t id) { SV *id_sv = sv_2mortal(newSViv(id)); hv_delete_ent(pq->ids, id_sv, 0, 0); } /* pq_item_priority - get the priority of an item given it's id */ static int pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) { HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0); if (!entry) return 0; *priority = SvNV(HeVAL(entry)); return 1; } /* pq_set_id_priority - set the priority of an item in the id hash */ static void pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) { HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0); if (!entry) croak("pq_set_priority: id not found"); sv_setnv(HeVAL(entry), new_priority); } /* pq_realloc - make space at the front of back of the queue. This adjusts the queue to allow insertion of a single item at the front or the back of the queue. If the queue has 33% or more space available we simple adjust the position of the in-use items within the array. We try not to push the items right up against the opposite end of the array, since we might need to insert items there too. If the queue has less than 33% space available we allocate another 50% space. We then only move the queue elements if we need space at the front, since the reallocation has just opened up a huge space at the back. Since we're reallocating exponentially larger sizes we should have a constant time cost on reallocation per queue item stored (but other costs are going to be higher.) */ static void pq_realloc(poe_queue *pq, int at_end) { int count = pq->end - pq->start; DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) ); if (count * 3 / 2 < pq->alloc) { /* 33 % or more space available, use some of it */ int new_start; if (at_end) { new_start = (pq->alloc - count) / 3; } else { new_start = (pq->alloc - count) * 2 / 3; } DEBUG( fprintf(stderr, " moving start to %d\n", new_start) ); memmove(pq->entries + new_start, pq->entries + pq->start, count * sizeof(pq_entry)); pq->start = new_start; pq->end = new_start + count; } else { int new_alloc = pq->alloc * 3 / 2; pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc); pq->alloc = new_alloc; if (!pq->entries) croak("Out of memory"); DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) ); if (!at_end) { int new_start = (new_alloc - count) * 2 / 3; DEBUG( fprintf(stderr, " moving start to %d\n", new_start) ); memmove(pq->entries + new_start, pq->entries + pq->start, count * sizeof(pq_entry)); pq->start = new_start; pq->end = new_start + count; } } DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) ); } /* pq_insertion_point - figure out where to insert an item with the given priority Internal. */ static int pq_insertion_point(poe_queue *pq, pq_priority_t priority) { /* for now this is just a linear search, later we should make it binary */ int i = pq->end; while (i > pq->start && priority < pq->entries[i-1].priority) { --i; } return i; } int pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) { int fill_at; pq_id_t id = pq_new_id(pq, priority); DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) ); if (pq->start == pq->end) { DEBUG( fprintf(stderr, " - on empty queue\n") ); /* allow room at front and back for new entries */ pq->start = pq->alloc / 3; pq->end = pq->start + 1; fill_at = pq->start; } else if (priority >= pq->entries[pq->end-1].priority) { DEBUG( fprintf(stderr, " - at the end\n") ); if (pq->end == pq->alloc) /* past the end - need to realloc or make some space */ pq_realloc(pq, AT_END); fill_at = pq->end; ++pq->end; } else if (priority < pq->entries[pq->start].priority) { DEBUG( fprintf(stderr, " - at the front\n") ); if (pq->start == 0) /* no space at the front, make some */ pq_realloc(pq, AT_START); --pq->start; fill_at = pq->start; } else { int i; DEBUG( fprintf(stderr, " - in the middle\n") ); i = pq_insertion_point(pq, priority); /* if we're near the end we want to push entries up, otherwise down */ if (i - pq->start > (pq->end - pq->start) / 2) { DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n", i, pq->start, pq->end) ); /* make sure we have space, this might end up copying twice, but too bad for now */ if (pq->end == pq->alloc) { int old_start = pq->start; pq_realloc(pq, AT_END); i += pq->start - old_start; } memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry)); ++pq->end; fill_at = i; } else { DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n", i, pq->start, pq->end) ); if (pq->start == 0) { pq_realloc(pq, AT_START); i += pq->start; } memmove(pq->entries + pq->start - 1, pq->entries + pq->start, (i - pq->start) * sizeof(pq_entry)); --pq->start; fill_at = i-1; } } pq->entries[fill_at].priority = priority; pq->entries[fill_at].id = id; pq->entries[fill_at].payload = newSVsv(payload); return id; } /* Note: it's up to the caller to release the SV. The XS code does this by making it mortal. */ int pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) { pq_entry *entry; /* the caller needs to release the payload (somehow) */ if (pq->start == pq->end) return 0; entry = pq->entries + pq->start++; *priority = entry->priority; *id = entry->id; *payload = entry->payload; pq_release_id(pq, entry->id); return 1; } int pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) { if (pq->start == pq->end) return 0; *priority = pq->entries[pq->start].priority; return 1; } int pq_get_item_count(poe_queue *pq) { return pq->end - pq->start; } /* pq_test_filter - the XS magic involved in passing the payload to a filter function. */ static int pq_test_filter(pq_entry *entry, SV *filter) { /* man perlcall for the magic here */ dSP; int count; SV *result_sv; int result; ENTER; SAVETMPS; PUSHMARK(SP); XPUSHs(sv_2mortal(newSVsv(entry->payload))); PUTBACK; count = call_sv(filter, G_SCALAR); SPAGAIN; if (count != 1) croak("got other than 1 value in scalar context"); result_sv = POPs; result = SvTRUE(result_sv); PUTBACK; FREETMPS; LEAVE; return result; } /* pq_find_item - search for an item we know is there. Internal. */ static int pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) { int i; for (i = pq->start; i < pq->end; ++i) { if (pq->entries[i].id == id) return i; } DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) ); croak("Internal inconsistency: event should have been found"); } int pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) { pq_priority_t priority; int index; if (!pq_item_priority(pq, id, &priority)) { errno = ESRCH; return 0; } index = pq_find_item(pq, id, priority); if (!pq_test_filter(pq->entries + index, filter)) { errno = EPERM; return 0; } *removed = pq->entries[index]; pq_release_id(pq, id); if (index == pq->start) { ++pq->start; } else if (index == pq->end - 1) { --pq->end; } else { memmove(pq->entries + index, pq->entries + index + 1, sizeof(pq_entry) * (pq->end - index - 1)); --pq->end; } return 1; } int pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) { int in_index, out_index; int remove_count = 0; *entries = NULL; if (pq->start == pq->end) return 0; *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start)); if (!*entries) croak("Out of memory"); in_index = out_index = pq->start; while (in_index < pq->end && remove_count < max_count) { if (pq_test_filter(pq->entries + in_index, filter)) { pq_release_id(pq, pq->entries[in_index].id); (*entries)[remove_count++] = pq->entries[in_index++]; } else { pq->entries[out_index++] = pq->entries[in_index++]; } } while (in_index < pq->end) { pq->entries[out_index++] = pq->entries[in_index++]; } pq->end = out_index; return remove_count; } /* We need to keep the following 2 functions in sync (or combine the common code.) */ int pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) { pq_priority_t old_priority; int index, insert_at; if (!pq_item_priority(pq, id, &old_priority)) { errno = ESRCH; return 0; } index = pq_find_item(pq, id, old_priority); if (!pq_test_filter(pq->entries + index, filter)) { errno = EPERM; return 0; } DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) ); if (pq->end - pq->start == 1) { DEBUG( fprintf(stderr, " -- one item\n") ); /* only the one item anyway */ pq->entries[pq->start].priority = new_priority; } else { insert_at = pq_insertion_point(pq, new_priority); DEBUG( fprintf(stderr, " - new index %d\n", insert_at) ); /* the item is still in the queue, so either side of it means it won't move */ if (insert_at == index || insert_at == index+1) { DEBUG( fprintf(stderr, " -- change in place\n") ); pq->entries[index].priority = new_priority; } else { pq_entry saved = pq->entries[index]; saved.priority = new_priority; if (insert_at < index) { DEBUG( fprintf(stderr, " - insert_at < index\n") ); memmove(pq->entries + insert_at + 1, pq->entries + insert_at, sizeof(pq_entry) * (index - insert_at)); pq->entries[insert_at] = saved; } else { DEBUG( fprintf(stderr, " - insert_at > index\n") ); --insert_at; memmove(pq->entries + index, pq->entries + index + 1, sizeof(pq_entry) * (insert_at - index)); pq->entries[insert_at] = saved; } } } pq_set_id_priority(pq, id, new_priority); return 1; } int pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) { pq_priority_t old_priority, new_priority; int index, insert_at; DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) ); if (!pq_item_priority(pq, id, &old_priority)) { errno = ESRCH; return 0; } index = pq_find_item(pq, id, old_priority); if (!pq_test_filter(pq->entries + index, filter)) { errno = EPERM; return 0; } new_priority = old_priority + delta; DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) ); if (pq->end - pq->start == 1) { DEBUG( fprintf(stderr, " -- one item\n") ); /* only the one item anyway */ pq->entries[pq->start].priority = new_priority; } else { insert_at = pq_insertion_point(pq, new_priority); DEBUG( fprintf(stderr, " - new index %d\n", insert_at) ); /* the item is still in the queue, so either side of it means it won't move */ if (insert_at == index || insert_at == index+1) { DEBUG( fprintf(stderr, " -- change in place\n") ); pq->entries[index].priority = new_priority; } else { pq_entry saved = pq->entries[index]; saved.priority = new_priority; if (insert_at < index) { DEBUG( fprintf(stderr, " - insert_at < index\n") ); memmove(pq->entries + insert_at + 1, pq->entries + insert_at, sizeof(pq_entry) * (index - insert_at)); pq->entries[insert_at] = saved; } else { DEBUG( fprintf(stderr, " - insert_at > index\n") ); --insert_at; memmove(pq->entries + index, pq->entries + index + 1, sizeof(pq_entry) * (insert_at - index)); pq->entries[insert_at] = saved; } } } pq_set_id_priority(pq, id, new_priority); *priority = new_priority; return 1; } int pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) { int count = 0; int i; *items = NULL; if (pq->end == pq->start) return 0; *items = malloc(sizeof(pq_entry) * (pq->end - pq->start)); for (i = pq->start; i < pq->end; ++i) { if (pq_test_filter(pq->entries + i, filter)) { (*items)[count++] = pq->entries[i]; } } if (!count) { free(*items); *items = NULL; } return count; } /* pq_dump - dump the internals of the queue structure. */ void pq_dump(poe_queue *pq) { int i; HE *he; printf("poe_queue\n"); printf(" start: %d\n", pq->start); printf(" end: %d\n", pq->end); printf(" alloc: %d\n", pq->alloc); printf(" seq: %d\n", pq->queue_seq); printf(" **Queue Entries:\n" " index: id priority SV\n"); for (i = pq->start; i < pq->end; ++i) { pq_entry *entry = pq->entries + i; printf(" %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority, entry->payload, (unsigned)SvREFCNT(entry->payload)); } printf(" **Hash entries:\n"); hv_iterinit(pq->ids); while ((he = hv_iternext(pq->ids)) != NULL) { STRLEN len; printf(" %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he))); } } /* this typedef lets the standard T_PTROBJ typemap handle the conversion between perl class and C type and back again */ typedef poe_queue *POE__XS__Queue__Array; /* This gives us our new method and correct destruction */ #define pq_new(class) pq_create() #define pq_DESTROY(pq) pq_delete(pq) MODULE = POE::XS::Queue::Array PACKAGE = POE::XS::Queue::Array PREFIX = pq_ PROTOTYPES: DISABLE POE::XS::Queue::Array pq_new(class) void pq_DESTROY(pq) POE::XS::Queue::Array pq int pq_enqueue(pq, priority, payload) POE::XS::Queue::Array pq double priority SV *payload void pq_dequeue_next(pq) POE::XS::Queue::Array pq PREINIT: pq_priority_t priority; pq_id_t id; SV *payload; PPCODE: if (pq_dequeue_next(pq, &priority, &id, &payload)) { EXTEND(SP, 3); PUSHs(sv_2mortal(newSVnv(priority))); PUSHs(sv_2mortal(newSViv(id))); PUSHs(sv_2mortal(payload)); } SV * pq_get_next_priority(pq) POE::XS::Queue::Array pq PREINIT: pq_priority_t priority; CODE: if (pq_get_next_priority(pq, &priority)) { RETVAL = newSVnv(priority); /* XS will mortalize this for us */ } else { RETVAL = &PL_sv_undef; } OUTPUT: RETVAL int pq_get_item_count(pq) POE::XS::Queue::Array pq void pq_remove_item(pq, id, filter) POE::XS::Queue::Array pq int id SV *filter PREINIT: pq_entry removed; PPCODE: if (pq_remove_item(pq, id, filter, &removed)) { EXTEND(SP, 3); PUSHs(sv_2mortal(newSVnv(removed.priority))); PUSHs(sv_2mortal(newSViv(removed.id))); PUSHs(sv_2mortal(removed.payload)); } void pq_remove_items(pq, filter, ...) POE::XS::Queue::Array pq SV *filter PREINIT: int max_count; pq_entry *removed_entries = NULL; int removed_count; int i; PPCODE: if (items > 2) max_count = SvIV(ST(2)); else max_count = pq_get_item_count(pq); removed_count = pq_remove_items(pq, filter, max_count, &removed_entries); if (removed_count) { EXTEND(SP, removed_count); for (i = 0; i < removed_count; ++i) { pq_entry *entry = removed_entries + i; AV *av = newAV(); SV *rv; av_extend(av, 2); av_store(av, 0, newSVnv(entry->priority)); av_store(av, 1, newSViv(entry->id)); av_store(av, 2, entry->payload); rv = newRV_noinc((SV *)av); PUSHs(sv_2mortal(rv)); } } if (removed_entries) free(removed_entries); void pq_adjust_priority(pq, id, filter, delta) POE::XS::Queue::Array pq int id SV *filter double delta PREINIT: pq_priority_t new_priority; PPCODE: if (pq_adjust_priority(pq, id, filter, delta, &new_priority)) { EXTEND(SP, 1); PUSHs(sv_2mortal(newSVnv(new_priority))); } void pq_set_priority(pq, id, filter, new_priority) POE::XS::Queue::Array pq int id SV *filter double new_priority PPCODE: if (pq_set_priority(pq, id, filter, new_priority)) { EXTEND(SP, 1); PUSHs(sv_2mortal(newSVnv(new_priority))); } void pq_peek_items(pq, filter, ...) POE::XS::Queue::Array pq SV *filter PREINIT: pq_entry *ret_items; int count, i; int max_count; PPCODE: if (items == 3) max_count = SvIV(ST(2)); else max_count = pq_get_item_count(pq); count = pq_peek_items(pq, filter, max_count, &ret_items); if (count) { EXTEND(SP, count); for (i = 0; i < count; ++i) { pq_entry *entry = ret_items + i; AV *av = newAV(); SV *rv; av_extend(av, 2); av_store(av, 0, newSVnv(entry->priority)); av_store(av, 1, newSViv(entry->id)); av_store(av, 2, newSVsv(entry->payload)); rv = newRV_noinc((SV *)av); PUSHs(sv_2mortal(rv)); } free(ret_items); } void pq_dump(pq) POE::XS::Queue::Array pq