initial release v0.001
authorTony Cook <tony@develop=help.com>
Tue, 21 Mar 2006 00:12:52 +0000 (00:12 +0000)
committerTony Cook <tony@develop=help.com>
Tue, 21 Mar 2006 00:12:52 +0000 (00:12 +0000)
Array.pm [new file with mode: 0644]
Array.xs [new file with mode: 0644]
Changes [new file with mode: 0644]
MANIFEST [new file with mode: 0644]
META.yml [new file with mode: 0644]
Makefile.PL [new file with mode: 0644]
README [new file with mode: 0644]
bench1.perl [new file with mode: 0644]
bench2.perl [new file with mode: 0644]
t/01_array.t [new file with mode: 0644]
typemap [new file with mode: 0644]

diff --git a/Array.pm b/Array.pm
new file mode 100644 (file)
index 0000000..388dd48
--- /dev/null
+++ b/Array.pm
@@ -0,0 +1,101 @@
+package POE::XS::Queue::Array;
+use strict;
+use vars qw(@ISA $VERSION);
+use POE::Queue;
+
+@ISA = qw(POE::Queue);
+
+BEGIN {
+  require Exporter;
+  @ISA = qw(Exporter);
+  $VERSION = '0.001';
+  eval {
+    # try XSLoader first, DynaLoader has annoying baggage
+    require XSLoader;
+    XSLoader::load('POE::XS::Queue::Array' => $VERSION);
+    1;
+  } or do {
+    require DynaLoader;
+    push @ISA, 'DynaLoader';
+    bootstrap POE::XS::Queue::Array $VERSION;
+  }
+}
+
+# lifted from POE::Queue::Array
+sub ITEM_PRIORITY () { 0 }
+sub ITEM_ID       () { 1 }
+sub ITEM_PAYLOAD  () { 2 }
+
+sub import {
+  my $package = caller();
+  no strict 'refs';
+  *{ $package . '::ITEM_PRIORITY' } = \&ITEM_PRIORITY;
+  *{ $package . '::ITEM_ID'       } = \&ITEM_ID;
+  *{ $package . '::ITEM_PAYLOAD'  } = \&ITEM_PAYLOAD;
+}
+
+# everything else is XS
+1;
+
+__END__
+
+=head1 NAME
+
+POE::XS::Queue::Array - an XS implementation of POE::Queue::Array.
+
+=head1 SYNOPSIS
+
+See POE::Queue.
+
+=head1 DESCRIPTION
+
+This class is an implementation of the abstract POE::Queue interface.
+It implements a priority queue using C, with an XS interface supplied.
+
+The current implementation could use some optimization, especially for
+large queues.
+
+Please see the POE::Queue documentation, which explainsthis one's
+functions, features, and behavior.
+
+=head1 SEE ALSO
+
+POE, POE::Queue, POE::Queue::Array
+
+=head1 BUGS
+
+None known.
+
+Some possible improvements include:
+
+=over
+
+=item *
+
+use binary searches for large queues
+
+=item *
+
+use a B-Tree for the queue (not a binary tree, a B-Tree), though this
+would require a module rename.
+
+=item *
+
+use a custom hash instead of a HV for the id to priority mapping,
+either glib's hash or convert to C++ and use the STL map.
+
+=item *
+
+some of the XS code could be optimized to do less work in scalar
+context, pq_remove_items and pq_peek_items could avoid building all
+those array refs.
+
+=back
+
+=head1 AUTHOR
+
+Tony Cook <tonyc@cpan.org>
+
+=cut
+
+
diff --git a/Array.xs b/Array.xs
new file mode 100644 (file)
index 0000000..98a75f1
--- /dev/null
+++ b/Array.xs
@@ -0,0 +1,831 @@
+#include "EXTERN.h"
+#include "perl.h"
+#include "XSUB.h"
+#include <string.h> /* for memmove() mostly */
+#include <errno.h> /* errno values */
+
+/*#define DEBUG(x) x*/
+#define DEBUG(x)
+
+typedef unsigned pq_id_t;
+typedef double pq_priority_t;
+
+#define PQ_START_SIZE 10
+#define AT_START 0
+#define AT_END 1
+
+pq_id_t queue_seq;
+
+/* an entry in the queue */
+typedef struct {
+  pq_priority_t priority;
+  pq_id_t id;
+  SV *payload;
+} pq_entry;
+
+/*
+We store the queue in a similar way to the way perl deals with arrays,
+we keep a block of memory, but the first element may or may not be in use,
+depending on the pattern of usage.
+
+There's 3 value controlling usage of the array:
+
+  - alloc - the number of elements allocated in total
+  - start - the first element in use in the array
+  - end - one past the end of the last element in the array
+
+This has the properties that:
+
+  start == 0 - no space at the front
+  end == alloc - no space at the end
+  end - start - number of elements in the queue
+
+We use a perl hash (HV *) to store the mapping from ids to priorities.
+
+*/
+typedef struct {
+  /* the first entry in use */
+  int start;
+
+  /* 1 past the last entry in use, hence end - start is the number of 
+     entries in the queue */
+  int end;
+
+  /* the total number of entries allocated */
+  int alloc;
+
+  /* used to generate item ids */
+  pq_id_t queue_seq;
+
+  /* used to track in use item ids */
+  HV *ids;
+
+  /* the actual entries */
+  pq_entry *entries;
+} poe_queue;
+
+/*
+poe_create - create a new queue object.
+
+No parameters.  returns the new queue object.
+
+*/
+poe_queue *
+pq_create(void) {
+  poe_queue *pq = malloc(sizeof(poe_queue));
+  
+  if (pq == NULL)
+    croak("Out of memory");
+  pq->start = 0;
+  pq->end = 0;
+  pq->alloc = PQ_START_SIZE;
+  pq->queue_seq = 0;
+  pq->ids = newHV();
+  pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);
+  if (pq->entries == NULL)
+    croak("Out of memory");
+
+  DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
+
+  return pq;
+}
+
+/*
+pq_delete - release the queue object.
+
+This also releases one reference from each SV in the queue.
+
+*/
+void
+pq_delete(poe_queue *pq) {
+  int i;
+
+  DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
+  if (pq->end > pq->start) {
+    for (i = pq->start; i < pq->end; ++i) {
+      SvREFCNT_dec(pq->entries[i].payload);
+    }
+  }
+  SvREFCNT_dec((SV *)pq->ids);
+  pq->ids = NULL;
+  if (pq->entries)
+    free(pq->entries);
+  pq->entries = NULL;
+  free(pq);
+}
+
+/*
+pq_new_id - generate a new item id.
+
+Internal use only.
+
+This, the following 3 functions and pq_create, pq_delete, should be
+all that needs to be modified if we change hash implementations.
+
+*/
+static
+pq_id_t
+pq_new_id(poe_queue *pq, pq_priority_t priority) {
+  int seq = ++pq->queue_seq;
+  SV *index = newSViv(seq);
+
+  while (hv_exists_ent(pq->ids, index, 0)) {
+    seq = ++pq->queue_seq;
+    sv_setiv(index, seq);
+  }
+  hv_store_ent(pq->ids, index, newSVnv(priority), 0);
+
+  return seq;
+}
+
+/*
+pq_release_id - releases an id for future use.
+*/
+static
+void
+pq_release_id(poe_queue *pq, pq_id_t id) {
+  SV *id_sv = sv_2mortal(newSViv(id));
+
+  hv_delete_ent(pq->ids, id_sv, 0, 0);
+}
+
+/*
+pq_item_priority - get the priority of an item given it's id
+*/
+static
+int
+pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
+  HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
+
+  if (!entry)
+    return 0;
+
+  *priority = SvNV(HeVAL(entry));
+
+  return 1;
+}
+
+/*
+pq_set_id_priority - set the priority of an item in the id hash
+*/
+static
+void
+pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
+  HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);
+
+  if (!entry)
+    croak("pq_set_priority: id not found");
+
+  sv_setnv(HeVAL(entry), new_priority);
+}
+
+/*
+pq_realloc - make space at the front of back of the queue.
+
+This adjusts the queue to allow insertion of a single item at the
+front or the back of the queue.
+
+If the queue has 33% or more space available we simple adjust the
+position of the in-use items within the array.  We try not to push the
+items right up against the opposite end of the array, since we might
+need to insert items there too.
+
+If the queue has less than 33% space available we allocate another 50%
+space.  We then only move the queue elements if we need space at the
+front, since the reallocation has just opened up a huge space at the
+back.  Since we're reallocating exponentially larger sizes we should
+have a constant time cost on reallocation per queue item stored (but
+other costs are going to be higher.)
+
+*/
+static
+void
+pq_realloc(poe_queue *pq, int at_end) {
+  int count = pq->end - pq->start;
+
+  DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
+  if (count * 3 / 2 < pq->alloc) {
+    /* 33 % or more space available, use some of it */
+    int new_start;
+
+    if (at_end) {
+      new_start = (pq->alloc - count) / 3;
+    }
+    else {
+      new_start = (pq->alloc - count) * 2 / 3;
+    }
+    DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );
+    memmove(pq->entries + new_start, pq->entries + pq->start,
+           count * sizeof(pq_entry));
+    pq->start = new_start;
+    pq->end = new_start + count;
+  }
+  else {
+    int new_alloc = pq->alloc * 3 / 2;
+    pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);
+    pq->alloc = new_alloc;
+
+    if (!pq->entries)
+      croak("Out of memory");
+
+    DEBUG( fprintf(stderr, "  - expanding to %d entries\n", new_alloc) );
+
+    if (!at_end) {
+      int new_start = (new_alloc - count) * 2 / 3;
+      DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );
+      memmove(pq->entries + new_start, pq->entries + pq->start,
+             count * sizeof(pq_entry));
+      pq->start = new_start;
+      pq->end = new_start + count;
+    }
+  }
+  DEBUG( fprintf(stderr, "  final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
+}
+
+/*
+pq_insertion_point - figure out where to insert an item with the given
+priority
+
+Internal.
+*/
+static
+int
+pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
+  /* for now this is just a linear search, later we should make it 
+     binary */
+  int i = pq->end;
+  while (i > pq->start &&
+         priority < pq->entries[i-1].priority) {
+    --i;
+  }
+
+  return i;
+}
+
+int
+pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
+  int fill_at;
+  pq_id_t id = pq_new_id(pq, priority);
+
+  DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
+  if (pq->start == pq->end) {
+    DEBUG( fprintf(stderr, "  - on empty queue\n") );
+    /* allow room at front and back for new entries */
+    pq->start = pq->alloc / 3;
+    pq->end = pq->start + 1;
+    fill_at = pq->start;
+  }
+  else if (priority >= pq->entries[pq->end-1].priority) {
+    DEBUG( fprintf(stderr, "  - at the end\n") );
+    if (pq->end == pq->alloc)
+      /* past the end - need to realloc or make some space */
+      pq_realloc(pq, AT_END);
+    
+    fill_at = pq->end;
+    ++pq->end;
+  }
+  else if (priority < pq->entries[pq->start].priority) {
+    DEBUG( fprintf(stderr, "  - at the front\n") );
+    if (pq->start == 0)
+      /* no space at the front, make some */
+      pq_realloc(pq, AT_START);
+
+    --pq->start;
+    fill_at = pq->start;
+  }
+  else {
+    int i;
+    DEBUG( fprintf(stderr, "  - in the middle\n") );
+    i = pq_insertion_point(pq, priority);
+    
+    /* if we're near the end we want to push entries up, otherwise down */
+    if (i - pq->start > (pq->end - pq->start) / 2) {
+      DEBUG( fprintf(stderr, "    - closer to the back (%d -> [ %d %d ])\n",
+                     i, pq->start, pq->end) );
+      /* make sure we have space, this might end up copying twice, 
+        but too bad for now */
+      if (pq->end == pq->alloc) {
+        int old_start = pq->start;
+       pq_realloc(pq, AT_END);
+        i += pq->start - old_start;
+      }
+      
+      memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));
+      ++pq->end;
+      fill_at = i;
+    }
+    else {
+      DEBUG( fprintf(stderr, "    - closer to the front (%d -> [ %d %d ])\n",
+                     i, pq->start, pq->end) );
+      if (pq->start == 0) {
+       pq_realloc(pq, AT_START);
+       i += pq->start;
+      }
+      memmove(pq->entries + pq->start - 1, pq->entries + pq->start,
+            (i - pq->start) * sizeof(pq_entry));
+      --pq->start;
+      fill_at = i-1;
+    }
+  }
+  pq->entries[fill_at].priority = priority;
+  pq->entries[fill_at].id = id;
+  pq->entries[fill_at].payload = newSVsv(payload);
+
+  return id;
+}
+
+/*
+  Note: it's up to the caller to release the SV.  The XS code does this 
+  by making it mortal.
+*/
+int
+pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
+  pq_entry *entry;
+  /* the caller needs to release the payload (somehow) */
+  if (pq->start == pq->end)
+    return 0;
+
+  entry = pq->entries + pq->start++;
+  *priority = entry->priority;
+  *id = entry->id;
+  *payload = entry->payload;
+  pq_release_id(pq, entry->id);
+
+  return 1;
+}
+
+int
+pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
+  if (pq->start == pq->end)
+    return 0;
+
+  *priority = pq->entries[pq->start].priority;
+  return 1;
+}
+
+int
+pq_get_item_count(poe_queue *pq) {
+  return pq->end - pq->start;
+}
+
+/*
+pq_test_filter - the XS magic involved in passing the payload to a
+filter function.
+*/
+static
+int
+pq_test_filter(pq_entry *entry, SV *filter) {
+  /* man perlcall for the magic here */
+  dSP;
+  int count;
+  SV *result_sv;
+  int result;
+
+  ENTER;
+  SAVETMPS;
+  PUSHMARK(SP);
+  XPUSHs(sv_2mortal(newSVsv(entry->payload)));
+  PUTBACK;
+
+  count = call_sv(filter, G_SCALAR);
+
+  SPAGAIN;
+
+  if (count != 1) 
+    croak("got other than 1 value in scalar context");
+
+  result_sv = POPs;
+  result = SvTRUE(result_sv);
+
+  PUTBACK;
+  FREETMPS;
+  LEAVE;
+
+  return result;
+}
+
+/*
+pq_find_item - search for an item we know is there.
+
+Internal.
+*/
+static
+int
+pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
+  int i;
+
+  for (i = pq->start; i < pq->end; ++i) {
+    if (pq->entries[i].id == id)
+      return i;
+  }
+  DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
+  croak("Internal inconsistency: event should have been found");
+}
+
+int
+pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
+  pq_priority_t priority;
+  int index;
+
+  if (!pq_item_priority(pq, id, &priority)) {
+    errno = ESRCH;
+    return 0;
+  }
+
+  index = pq_find_item(pq, id, priority);
+
+  if (!pq_test_filter(pq->entries + index, filter)) {
+    errno = EPERM;
+    return 0;
+  }
+
+  *removed = pq->entries[index];
+  pq_release_id(pq, id);
+  if (index == pq->start) {
+    ++pq->start;
+  }
+  else if (index == pq->end - 1) {
+    --pq->end;
+  }
+  else {
+    memmove(pq->entries + index, pq->entries + index + 1,
+           sizeof(pq_entry) * (pq->end - index - 1));
+    --pq->end;
+  }
+
+  return 1;
+}
+
+int
+pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
+  int in_index, out_index;
+  int remove_count = 0;
+  
+  *entries = NULL;
+  if (pq->start == pq->end)
+    return 0;
+
+  *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));
+  if (!*entries)
+    croak("Out of memory");
+  
+  in_index = out_index = pq->start;
+  while (in_index < pq->end && remove_count < max_count) {
+    if (pq_test_filter(pq->entries + in_index, filter)) {
+      pq_release_id(pq, pq->entries[in_index].id);
+      (*entries)[remove_count++] = pq->entries[in_index++];
+    }
+    else {
+      pq->entries[out_index++] = pq->entries[in_index++];
+    }
+  }
+  while (in_index < pq->end) {
+    pq->entries[out_index++] = pq->entries[in_index++];
+  }
+  pq->end = out_index;
+  
+  return remove_count;
+}
+
+/*
+We need to keep the following 2 functions in sync (or combine the
+common code.)
+*/
+int
+pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
+  pq_priority_t old_priority;
+  int index, insert_at;
+
+  if (!pq_item_priority(pq, id, &old_priority)) {
+    errno = ESRCH;
+    return 0;
+  }
+
+  index = pq_find_item(pq, id, old_priority);
+
+  if (!pq_test_filter(pq->entries + index, filter)) {
+    errno = EPERM;
+    return 0;
+  }
+
+  DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );
+
+  if (pq->end - pq->start == 1) {
+    DEBUG( fprintf(stderr, "   -- one item\n") );
+    /* only the one item anyway */
+    pq->entries[pq->start].priority = new_priority;
+  }
+  else {
+    insert_at = pq_insertion_point(pq, new_priority);
+    DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );
+    /* the item is still in the queue, so either side of it means it
+       won't move */
+    if (insert_at == index || insert_at == index+1) {
+      DEBUG( fprintf(stderr, "   -- change in place\n") );
+      pq->entries[index].priority = new_priority;
+    }
+    else {
+      pq_entry saved = pq->entries[index];
+      saved.priority = new_priority;
+
+      if (insert_at < index) {
+        DEBUG( fprintf(stderr, "  - insert_at < index\n") );
+        memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
+               sizeof(pq_entry) * (index - insert_at));
+        pq->entries[insert_at] = saved;
+      }
+      else {
+        DEBUG( fprintf(stderr, "  - insert_at > index\n") );
+       --insert_at;
+       memmove(pq->entries + index, pq->entries + index + 1,
+               sizeof(pq_entry) * (insert_at - index));
+        pq->entries[insert_at] = saved;
+      }
+    }
+  }
+
+  pq_set_id_priority(pq, id, new_priority);
+
+  return 1;  
+}
+
+int
+pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
+  pq_priority_t old_priority, new_priority;
+  int index, insert_at;
+
+  DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
+
+  if (!pq_item_priority(pq, id, &old_priority)) {
+    errno = ESRCH;
+    return 0;
+  }
+
+  index = pq_find_item(pq, id, old_priority);
+
+  if (!pq_test_filter(pq->entries + index, filter)) {
+    errno = EPERM;
+    return 0;
+  }
+
+  new_priority = old_priority + delta;
+
+  DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );
+
+  if (pq->end - pq->start == 1) {
+    DEBUG( fprintf(stderr, "   -- one item\n") );
+    /* only the one item anyway */
+    pq->entries[pq->start].priority = new_priority;
+  }
+  else {
+    insert_at = pq_insertion_point(pq, new_priority);
+    DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );
+    /* the item is still in the queue, so either side of it means it
+       won't move */
+    if (insert_at == index || insert_at == index+1) {
+      DEBUG( fprintf(stderr, "   -- change in place\n") );
+      pq->entries[index].priority = new_priority;
+    }
+    else {
+      pq_entry saved = pq->entries[index];
+      saved.priority = new_priority;
+
+      if (insert_at < index) {
+        DEBUG( fprintf(stderr, "  - insert_at < index\n") );
+        memmove(pq->entries + insert_at + 1, pq->entries + insert_at,
+               sizeof(pq_entry) * (index - insert_at));
+        pq->entries[insert_at] = saved;
+      }
+      else {
+        DEBUG( fprintf(stderr, "  - insert_at > index\n") );
+       --insert_at;
+       memmove(pq->entries + index, pq->entries + index + 1,
+               sizeof(pq_entry) * (insert_at - index));
+        pq->entries[insert_at] = saved;
+      }
+    }
+  }
+
+  pq_set_id_priority(pq, id, new_priority);
+  *priority = new_priority;
+
+  return 1;  
+}
+
+int
+pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
+  int count = 0;
+  int i;
+
+  *items = NULL;
+  if (pq->end == pq->start)
+    return 0;
+
+  *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));
+  for (i = pq->start; i < pq->end; ++i) {
+    if (pq_test_filter(pq->entries + i, filter)) {
+      (*items)[count++] = pq->entries[i];
+    }
+  }
+  if (!count) {
+    free(*items);
+    *items = NULL;
+  }
+
+  return count;
+}
+
+/*
+pq_dump - dump the internals of the queue structure.
+*/
+void
+pq_dump(poe_queue *pq) {
+  int i;
+  HE *he;
+
+  printf("poe_queue\n");
+  printf("  start: %d\n", pq->start);
+  printf("    end: %d\n", pq->end);
+  printf("  alloc: %d\n", pq->alloc);
+  printf("    seq: %d\n", pq->queue_seq);
+  printf("  **Queue Entries:\n"
+         "      index:   id  priority    SV\n");
+  for (i = pq->start; i < pq->end; ++i) {
+    pq_entry *entry = pq->entries + i;
+    printf("      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,
+          entry->payload, (unsigned)SvREFCNT(entry->payload));
+  }
+  printf("  **Hash entries:\n");
+  hv_iterinit(pq->ids);
+  while ((he = hv_iternext(pq->ids)) != NULL) {
+    STRLEN len;
+    printf("   %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
+  }
+}
+
+/* this typedef lets the standard T_PTROBJ typemap handle the
+conversion between perl class and C type and back again */
+typedef poe_queue *POE__XS__Queue__Array;
+
+/* This gives us our new method and correct destruction */
+#define pq_new(class) pq_create()
+#define pq_DESTROY(pq) pq_delete(pq)
+
+MODULE = POE::XS::Queue::Array  PACKAGE = POE::XS::Queue::Array PREFIX = pq_
+
+PROTOTYPES: DISABLE
+
+POE::XS::Queue::Array
+pq_new(class)
+
+void
+pq_DESTROY(pq)
+       POE::XS::Queue::Array pq
+
+int
+pq_enqueue(pq, priority, payload)
+     POE::XS::Queue::Array pq
+     double priority
+     SV *payload
+     
+void
+pq_dequeue_next(pq)
+       POE::XS::Queue::Array pq
+      PREINIT:
+       pq_priority_t priority;
+       pq_id_t id;
+       SV *payload;
+      PPCODE:
+       if (pq_dequeue_next(pq, &priority, &id, &payload)) {
+         EXTEND(SP, 3);
+         PUSHs(sv_2mortal(newSVnv(priority)));
+         PUSHs(sv_2mortal(newSViv(id)));
+         PUSHs(sv_2mortal(payload));
+       }
+
+SV *
+pq_get_next_priority(pq)
+       POE::XS::Queue::Array pq
+      PREINIT:
+       pq_priority_t priority;
+      CODE:
+       if (pq_get_next_priority(pq, &priority)) {
+          RETVAL = newSVnv(priority); /* XS will mortalize this for us */
+       }
+       else {
+         RETVAL = &PL_sv_undef;
+       }
+      OUTPUT:
+       RETVAL
+
+int
+pq_get_item_count(pq)
+       POE::XS::Queue::Array pq
+
+void
+pq_remove_item(pq, id, filter)
+       POE::XS::Queue::Array pq
+       int id
+       SV *filter
+      PREINIT:
+       pq_entry removed;
+      PPCODE:
+       if (pq_remove_item(pq, id, filter, &removed)) {
+         EXTEND(SP, 3);
+         PUSHs(sv_2mortal(newSVnv(removed.priority)));
+         PUSHs(sv_2mortal(newSViv(removed.id)));
+         PUSHs(sv_2mortal(removed.payload));
+        }
+
+void
+pq_remove_items(pq, filter, ...)
+       POE::XS::Queue::Array pq
+       SV *filter
+      PREINIT:
+       int max_count;
+       pq_entry *removed_entries = NULL;
+       int removed_count;
+       int i;
+      PPCODE:
+       if (items > 2)
+          max_count = SvIV(ST(2));
+        else
+          max_count = pq_get_item_count(pq);
+       removed_count = pq_remove_items(pq, filter, max_count, 
+                                        &removed_entries);
+        if (removed_count) {
+         EXTEND(SP, removed_count);
+          for (i = 0; i < removed_count; ++i) {
+           pq_entry *entry = removed_entries + i;
+           AV *av = newAV();
+           SV *rv;
+           av_extend(av, 2);
+           av_store(av, 0, newSVnv(entry->priority));
+           av_store(av, 1, newSViv(entry->id));
+           av_store(av, 2, entry->payload);
+           rv = newRV_noinc((SV *)av);
+           PUSHs(sv_2mortal(rv));
+          }
+       }
+       if (removed_entries)
+          free(removed_entries);
+
+void
+pq_adjust_priority(pq, id, filter, delta)
+       POE::XS::Queue::Array pq
+       int id
+       SV *filter
+       double delta
+      PREINIT:
+        pq_priority_t new_priority;
+      PPCODE:
+        if (pq_adjust_priority(pq, id, filter, delta, &new_priority)) {
+         EXTEND(SP, 1);
+         PUSHs(sv_2mortal(newSVnv(new_priority)));
+       }
+
+void
+pq_set_priority(pq, id, filter, new_priority)
+       POE::XS::Queue::Array pq
+       int id
+       SV *filter
+       double new_priority
+      PPCODE:
+        if (pq_set_priority(pq, id, filter, new_priority)) {
+         EXTEND(SP, 1);
+         PUSHs(sv_2mortal(newSVnv(new_priority)));
+       }
+
+void
+pq_peek_items(pq, filter, ...)
+       POE::XS::Queue::Array pq
+       SV *filter
+      PREINIT:
+        pq_entry *ret_items;
+        int count, i;
+       int max_count;
+      PPCODE:
+        if (items == 3)
+          max_count = SvIV(ST(2));
+        else
+          max_count = pq_get_item_count(pq);
+        count = pq_peek_items(pq, filter, max_count, &ret_items);
+        if (count) {
+          EXTEND(SP, count);
+          for (i = 0; i < count; ++i) {
+           pq_entry *entry = ret_items + i;
+           AV *av = newAV();
+           SV *rv;
+           av_extend(av, 2);
+           av_store(av, 0, newSVnv(entry->priority));
+           av_store(av, 1, newSViv(entry->id));
+           av_store(av, 2, newSVsv(entry->payload));
+           rv = newRV_noinc((SV *)av);
+           PUSHs(sv_2mortal(rv));
+         }
+          free(ret_items);
+       }
+
+void
+pq_dump(pq)
+       POE::XS::Queue::Array pq
diff --git a/Changes b/Changes
new file mode 100644 (file)
index 0000000..ace56c5
--- /dev/null
+++ b/Changes
@@ -0,0 +1,4 @@
+Revision history for Perl extension POE::XS::Queue::Array.
+
+0.001
+ - initial release
diff --git a/MANIFEST b/MANIFEST
new file mode 100644 (file)
index 0000000..1983c7c
--- /dev/null
+++ b/MANIFEST
@@ -0,0 +1,11 @@
+Array.pm
+Array.xs
+Changes
+MANIFEST
+META.yml                                 Module meta-data (added by MakeMaker)
+Makefile.PL
+README
+bench1.perl    based on code in POE's svn tree
+bench2.perl    my own benchmarks
+t/01_array.t
+typemap
diff --git a/META.yml b/META.yml
new file mode 100644 (file)
index 0000000..a879a44
--- /dev/null
+++ b/META.yml
@@ -0,0 +1,11 @@
+# http://module-build.sourceforge.net/META-spec.html
+#XXXXXXX This is a prototype!!!  It will change in the future!!! XXXXX#
+name:         POE-XS-Queue-Array
+version:      0.001
+version_from: Array.pm
+installdirs:  site
+requires:
+    POE:                           0.29
+
+distribution_type: module
+generated_by: ExtUtils::MakeMaker version 6.17
diff --git a/Makefile.PL b/Makefile.PL
new file mode 100644 (file)
index 0000000..9f7b605
--- /dev/null
@@ -0,0 +1,19 @@
+use ExtUtils::MakeMaker;
+
+my %opts = 
+  (
+   NAME => 'POE::XS::Queue::Array',
+   VERSION_FROM => 'Array.pm',
+   OBJECT => 'Array.o',
+   PREREQ_PM => {
+                'POE'    => 0.29,
+               },
+  );
+if ($ExtUtils::MakeMaker::VERSION > 6.06) {
+  $opts{AUTHOR} = 'Tony Cook <tonyc@cpan.org>';
+  $opts{ABSTRACT} = 'XS version of POE::Queue::Array';
+}
+
+WriteMakefile(%opts);
+
+
diff --git a/README b/README
new file mode 100644 (file)
index 0000000..5a48fd3
--- /dev/null
+++ b/README
@@ -0,0 +1,19 @@
+POE::XS::Queue::Array is an implementation of the POE::Queue XS interface.
+
+Unlike POE, you need a C compiler to install this module.
+
+To install do:
+
+  perl Makefile.PL
+  make
+  make test
+  make install  # you may need to be root for this step
+
+This module should be considered experimental for now, there are
+optimizations that could be performed that aren't, see perldoc
+POE::XS::Queue::Array for details.
+
+This program is free software; you can redistribute it and/or
+modify it under the same terms as Perl itself.
+
+Copyright (c) 2006 Anthony Cook.
diff --git a/bench1.perl b/bench1.perl
new file mode 100644 (file)
index 0000000..9d82380
--- /dev/null
@@ -0,0 +1,113 @@
+#!/usr/bin/perl
+use warnings;
+use strict;
+use blib;
+
+$|=1;
+
+use POE::Queue::Array;
+use POE::XS::Queue::Array;
+
+# The sequence length should be at least as many items as there are
+# priorities.
+
+sub MAX_PRIORITIES  () { 200 }
+sub SEQUENCE_LENGTH () { 5000 }
+
+die if SEQUENCE_LENGTH < MAX_PRIORITIES;
+
+# Fisher-Yates shuffle them, for extra yummy randomness.  Use srand
+# with the same seed each time so every @seq list represents different
+# lengths of the same "random" sequence.
+
+my @seq;
+sub build_list {
+  my $priorities = shift;
+  my $factor = SEQUENCE_LENGTH / $priorities;
+
+  @seq = map { [ int($_ / $factor), $_ ] } (0..(SEQUENCE_LENGTH-1));
+
+  { srand(1);
+    my $i = @seq;
+    while (--$i) {
+      my $j = int rand($i+1);
+      @seq[$i,$j] = @seq[$j,$i];
+    }
+  }
+}
+
+# Run through the list for a number of benchmarks.  Each benchmark has
+# a different number of priorities.
+
+for my $priorities (1..MAX_PRIORITIES) {
+
+  build_list($priorities);
+
+  # One for each queue implementation.
+  for my $impl (qw(POE::Queue::Array POE::XS::Queue::Array)) {
+
+    my $queue = $impl->new();
+
+    ### Plain enqueue/dequeue.
+
+    my ($begin_usr, $begin_sys) = (times)[0,1];
+    $queue->enqueue(@$_) for @seq;
+    my ($cease_usr, $cease_sys) = (times)[0,1];
+
+    my $elapsed = ($cease_usr - $begin_usr) + ($cease_sys - $begin_sys);
+
+    print( join( "\t",
+                 $priorities,
+                 $impl, "enqueue-plain",
+                 $elapsed/SEQUENCE_LENGTH, # Time per operation.
+               ),
+           "\n"
+         );
+
+    ($begin_usr, $begin_sys) = (times)[0,1];
+    1 while $queue->dequeue_next;
+    ($cease_usr, $cease_sys) = (times)[0,1];
+
+    $elapsed = ($cease_usr - $begin_usr) + ($cease_sys - $begin_sys);
+
+    print( join( "\t",
+                 $priorities,
+                 $impl, "dequeue-plain",
+                 $elapsed/SEQUENCE_LENGTH, # Time per operation.
+               ),
+           "\n"
+         );
+
+    ### Next-priority enqueue/dequeue.  The enqueue is actually just a
+    ### plain one, but we get to see the effect of internal data
+    ### structure freeing tradeoffs.
+
+#     ($begin_usr, $begin_sys) = (times)[0,1];
+#     $queue->enqueue(@$_) for @seq;
+#     ($cease_usr, $cease_sys) = (times)[0,1];
+
+#     $elapsed = ($cease_usr - $begin_usr) + ($cease_sys - $begin_sys);
+
+#     print( join( "\t",
+#                  $priorities,
+#                  $impl, "enqueue-np",
+#                  $elapsed/SEQUENCE_LENGTH, # Time per operation.
+#                ),
+#            "\n"
+#          );
+
+#     ($begin_usr, $begin_sys) = (times)[0,1];
+#     1 while scalar(@{$queue->dequeue_next_priority});
+#     ($cease_usr, $cease_sys) = (times)[0,1];
+
+#     $elapsed = ($cease_usr - $begin_usr) + ($cease_sys - $begin_sys);
+
+#     print( join( "\t",
+#                  $priorities,
+#                  $impl, "dequeue-np",
+#                  $elapsed/SEQUENCE_LENGTH, # Time per operation.
+#                ),
+#            "\n"
+#          );
+  }
+}
diff --git a/bench2.perl b/bench2.perl
new file mode 100644 (file)
index 0000000..57fccef
--- /dev/null
@@ -0,0 +1,80 @@
+#!perl -w
+use strict;
+use blib;
+use POE::XS::Queue::Array ();
+use POE::Queue::Array ();
+use Benchmark;
+
+# 1000 items to queue
+my @items = map [ $_, $_ ], map rand(1000), 1..1000;
+
+# test queues for timing adjust_priority
+my %adjust;
+my %adjust_ids;
+my @adjust_val;
+for my $impl (qw(POE::XS::Queue::Array POE::Queue::Array)) {
+  my $queue = $impl->new;
+
+  my @ids = map $queue->enqueue(@$_), @items;
+
+  $adjust{$impl} = $queue;
+  $adjust_ids{$impl} = \@ids;
+}
+for my $index (0..999) {
+  $adjust_val[$index] = rand(100) - 50;
+}
+
+timethese(-10,
+         {
+          xs_big => sub { big('POE::XS::Queue::Array') },
+          perl_big => sub { big('POE::Queue::Array') },
+          xs_enqueue => sub { enqueue('POE::XS::Queue::Array') },
+          perl_enqueue => sub { enqueue('POE::Queue::Array') },
+          xs_adjust => sub { adjust('POE::XS::Queue::Array') },
+          perl_adjust => sub { adjust('POE::Queue::Array') },
+         });
+
+# does general queue work
+sub big {
+  my $class = shift;
+
+  my $queue = $class->new;
+
+  my @ids = map $queue->enqueue(@$_), @items;
+
+  for my $id (@ids[1..100]) {
+    $queue->adjust_priority($id, sub { 1 }, -5);
+  }
+  my %remove = map { $_ => 1 } @ids[-100..-1];
+  $queue->remove_items(sub { $remove{$_[0]} });
+
+  for my $id (@ids[-200..-101]) {
+    $queue->remove_item($id, sub { 1 });
+  }
+
+  $queue->remove_items(sub { 0 });
+
+  $queue->dequeue_next while $queue->get_item_count;
+}
+
+# enqueue a bunch
+sub enqueue {
+  my $class = shift;
+
+  my $queue = $class->new;
+
+  my @ids = map $queue->enqueue(@$_), @items;
+}
+
+# adjust the priorities on a bunch of items
+sub adjust {
+  my $class = shift;
+
+  my $queue = $adjust{$class};
+
+  my $index = 0;
+  for my $id (@{$adjust_ids{$class}}) {
+    $queue->adjust_priority($id, sub { 1 }, $adjust_val[$index]);
+    ++$index;
+  }
+}
diff --git a/t/01_array.t b/t/01_array.t
new file mode 100644 (file)
index 0000000..9a5c60b
--- /dev/null
@@ -0,0 +1,308 @@
+#!/usr/bin/perl -w
+# $Id: 01_array.t,v 1.1 2004/09/04 22:50:38 rcaputo Exp $
+
+# Tests basic queue operations.
+# copied from the POE distribution and modified to use
+# POE::XS::Queue::Array instead of POE::Queue::Array.
+# 
+# modified a couple of tests to use is_deeply() instead of ok(eq_array ...)
+# during debugging.
+
+use strict;
+
+use lib qw(./mylib);
+
+use Test::More tests => 2047;
+
+sub POE::Kernel::ASSERT_DEFAULT () { 1 }
+sub POE::Kernel::TRACE_DEFAULT  () { 1 }
+sub POE::Kernel::TRACE_FILENAME () { "./test-output.err" }
+
+use POSIX qw(EPERM ESRCH);
+
+BEGIN { use_ok("POE::XS::Queue::Array") }
+
+my $q = POE::XS::Queue::Array->new();
+
+ok($q->get_item_count == 0, "queue begins empty");
+ok(!defined($q->dequeue_next), "can't dequeue from empty queue");
+
+ok($q->enqueue(1, "one") == 1, "first enqueue has id 1");
+ok($q->enqueue(3, "tre") == 2, "second enqueue has id 2");
+ok($q->enqueue(2, "two") == 3, "third enqueue has id 3");
+
+is_deeply(
+  [$q->dequeue_next()], [1, 1, "one"],
+  "event one dequeued correctly"
+);
+
+is_deeply(
+  [$q->dequeue_next()], [2, 3, "two"],
+  "event two dequeued correctly"
+);
+
+ok(
+  eq_array( [$q->dequeue_next()], [3, 2, "tre"] ),
+  "event three dequeued correctly"
+);
+
+ok(
+  eq_array( [$q->dequeue_next()], [] ),
+  "empty queue marker dequeued correctly"
+);
+
+{ my @events = (
+    [ a => 1 ],
+    [ c => 3 ],
+    [ e => 5 ],
+    [ b => 2 ],
+    [ d => 4 ],
+  );
+
+  my $base_event_id = 4;
+  enqueue_events(\@events, $base_event_id);
+}
+
+# Not constants.
+sub always_ok { 1 }
+sub never_ok  { 0 }
+
+ok(
+  eq_array( [$q->remove_item(7, \&always_ok)], [2, 7, "b"] ),
+  "removed event b by its ID"
+);
+
+ok(
+  eq_array( [$q->remove_item(5, \&always_ok)], [3, 5, "c"] ),
+  "removed event c by its ID"
+);
+
+ok(
+  eq_array( [$q->remove_item(8, \&always_ok)], [4, 8, "d"] ),
+  "removed event d by its ID"
+);
+
+$! = 0;
+ok(
+  ( eq_array( [$q->remove_item(6, \&never_ok )], [] ) &&
+    $! == EPERM
+  ),
+  "didn't have permission to remove event e"
+);
+
+$! = 0;
+ok(
+  ( eq_array( [$q->remove_item(8, \&always_ok)], [] ) &&
+    $! == ESRCH
+  ),
+  "couldn't remove nonexistent event d"
+);
+
+ok(
+  eq_array( [$q->dequeue_next()], [1, 4, "a"] ),
+  "dequeued event a correctly"
+);
+
+ok(
+  eq_array( [$q->dequeue_next()], [5, 6, "e"] ),
+  "dequeued event e correctly"
+);
+
+ok(
+  eq_array( [$q->dequeue_next()], [] ),
+  "empty queue marker dequeued correctly"
+);
+
+{ my @events = (
+    [ a => 1 ],
+    [ c => 3 ],
+    [ e => 5 ],
+    [ b => 2 ],
+    [ d => 4 ],
+    [ f => 6 ],
+  );
+
+  my $base_event_id = 9;
+  enqueue_events(\@events, $base_event_id);
+}
+
+ok($q->get_item_count() == 6, "queue contains six events");
+
+sub odd_letters  { $_[0] =~ /[ace]/ }
+sub even_letters { $_[0] =~ /[bdf]/ }
+
+{ my @items = $q->remove_items(\&odd_letters, 3);
+  my @target = (
+    [ 1,  9, "a" ],
+    [ 3, 10, "c" ],
+    [ 5, 11, "e" ],
+  );
+
+  ok(eq_array(\@items, \@target), "removed odd letters from queue");
+  ok($q->get_item_count() == 3, "leaving three events");
+}
+
+{ my @items = $q->remove_items(\&odd_letters, 3);
+  my @target;
+
+  ok(eq_array(\@items, \@target), "no more odd letters to remove");
+}
+
+{ my @items = $q->remove_items(\&even_letters, 3);
+  my @target = (
+    [ 2, 12, "b" ],
+    [ 4, 13, "d" ],
+    [ 6, 14, "f" ],
+  );
+
+  is_deeply(\@items, \@target, "removed even letters from queue");
+  ok($q->get_item_count() == 0, "leaving the queue empty");
+}
+
+{ my @events = (
+    [ a => 10 ],
+    [ b => 20 ],
+    [ c => 30 ],
+    [ d => 40 ],
+    [ e => 50 ],
+    [ f => 60 ],
+  );
+
+  my $base_event_id = 15;
+  enqueue_events(\@events, $base_event_id);
+}
+
+ok($q->get_item_count() == 6, "leaving six events in the queue");
+
+{ my @items = $q->peek_items(\&even_letters);
+  my @target = (
+    [ 20, 16, "b" ],
+    [ 40, 18, "d" ],
+    [ 60, 20, "f" ],
+  );
+
+  ok(eq_array(\@items, \@target), "found even letters in queue");
+}
+
+ok(
+  $q->adjust_priority(19, \&always_ok, -15) == 35,
+  "adjusted event e priority by -15"
+);
+
+ok(
+  $q->adjust_priority(16, \&always_ok, +15) == 35,
+  "adjusted event b priority by +15"
+);
+
+{ my @items = $q->remove_items(\&always_ok);
+  my @target = (
+    [ 10, 15, "a" ],
+    [ 30, 17, "c" ],
+    [ 35, 19, "e" ], # e got there first
+    [ 35, 16, "b" ], # b got there second
+    [ 40, 18, "d" ],
+    [ 60, 20, "f" ],
+  );
+
+  ok(eq_array(\@items, \@target), "colliding priorities are FIFO");
+}
+
+ok($q->get_item_count() == 0, "full queue removal leaves zero events");
+
+### Large Queue Tests.  The only functions that use large queues are
+### enqueue(), adjust_priority(), and set_priority().  Large queues
+### are over ~500 elements.
+
+# Generate a list of events in random priority order.
+
+sub shuffled_list {
+  my $limit = shift() - 1;
+  my @list = (0..$limit);
+  my $i = @list;
+  while (--$i) {
+    my $j = int rand($i+1);
+    @list[$i,$j] = @list[$j,$i];
+  }
+  @list;
+}
+
+sub is_even { !($_[0] % 2) }
+sub is_odd  {   $_[0] % 2  }
+
+sub verify_queue {
+  my $target_diff = shift;
+
+  my $low_priority = -999999;
+
+  while (my ($pri, $id, $item) = $q->dequeue_next()) {
+    my $diff;
+    if ($pri < 0) {
+      $diff = $item - $pri;
+    }
+    else {
+      $diff = $pri - $item;
+    }
+
+    ok(
+      ($pri > $low_priority) && ($diff == $target_diff),
+      "$item - $pri == $diff (should be $target_diff)"
+    );
+
+    $low_priority = $pri;
+  }
+}
+
+# Enqueue all the events, then adjust their priorities.  The
+# even-numbered events have their priorities reduced by 1000; the odd
+# ones have their priorities increased by 1000.
+
+{ my @ids;
+  for my $major (shuffled_list(10)) {
+    for my $minor (shuffled_list(100)) {
+      my $priority = sprintf("%2d%02d", $major, $minor);
+      push @ids, $q->enqueue($priority, $priority);
+    }
+  }
+
+  foreach my $id (@ids) { $q->adjust_priority($id, \&is_even, -1000); }
+  foreach my $id (@ids) { $q->adjust_priority($id, \&is_odd,   1000); }
+}
+
+# Verify that the queue remains in order, and that the adjusted
+# priorities are correct.
+
+print "!!!!!!!! 1\n";
+verify_queue(1000);
+
+# Now set priorities to absolute values.  The values are
+
+{ my @id_recs;
+  for my $major (shuffled_list(10)) {
+    for my $minor (shuffled_list(100)) {
+      my $priority = sprintf("%2d%02d", $major, $minor);
+      push @id_recs, [ $q->enqueue($priority, $priority), $priority ];
+    }
+  }
+
+  foreach my $id_rec (@id_recs) {
+    my ($id, $pri) = @$id_rec;
+    $q->set_priority($id, \&is_even, $pri + 500);
+  }
+
+  foreach my $id_rec (@id_recs) {
+    my ($id, $pri) = @$id_rec;
+    $q->set_priority($id, \&is_odd, $pri + 500);
+  }
+
+  verify_queue(500);
+}
+
+### Helper functions.
+
+sub enqueue_events {
+  my ($events, $id) = @_;
+  foreach (@$events) {
+    my ($ev, $prio) = @$_;
+    ok($q->enqueue($prio, $ev) == $id++, "enqueued event $ev correctly");
+  }
+}
diff --git a/typemap b/typemap
new file mode 100644 (file)
index 0000000..c348f2d
--- /dev/null
+++ b/typemap
@@ -0,0 +1 @@
+POE::XS::Queue::Array  T_PTROBJ