]> git.imager.perl.org - poe-xs-queue-array.git/commitdiff
thread support for people running in hell
authorTony Cook <tony@develop=help.com>
Fri, 13 Mar 2009 08:24:33 +0000 (08:24 +0000)
committerTony Cook <tony@develop=help.com>
Fri, 13 Mar 2009 08:24:33 +0000 (08:24 +0000)
Array.xs
queue.c
queue.h
t/04_thread.t [new file with mode: 0644]

index 6d515aad98e11aa35a44226734beedf5d18db2a2..8a64c64ff6b18dbe10e0fa1dbdc36a402d3fffc3 100644 (file)
--- a/Array.xs
+++ b/Array.xs
 conversion between perl class and C type and back again */
 typedef poe_queue *POE__XS__Queue__Array;
 
-/* This gives us our new method and correct destruction */
-#define pq_new(class) pq_create()
+/* This gives us correct destruction */
 #define pq_DESTROY(pq) pq_delete(pq)
 
+#ifdef USE_ITHREADS
+
+#define INDEX_AV "POE::XS::Queue::Array::__obj_index"
+
+
+void
+index_object(SV *pq_sv) {
+  SV *copy;
+  IV i;
+  AV *av = get_av(INDEX_AV, 1);
+  IV len = av_len(av);
+  
+  copy = newSVsv(pq_sv);
+
+  /* put it in our store */
+  for (i = 0; i <= len; ++i) {
+    SV **entry = av_fetch(av, i, 0);
+    if (!entry || !SvOK(*entry)) {
+      SV **stored  = av_store(av, i, copy);
+      if (stored) {
+        sv_rvweaken(*stored);
+       return;
+      }
+    }
+  }
+
+  /* add it to the end */
+  {
+    SV **stored = av_store(av, len+1, copy);
+    if (stored) {
+      sv_rvweaken(*stored);
+      return;
+    }
+    else {
+      SvREFCNT_dec(copy);
+      croak("Cannot store weak copy at index %d in @" INDEX_AV, len+1);
+    }
+  }
+}
+
+#else
+#define index_object(obj)
+#endif
+
 MODULE = POE::XS::Queue::Array  PACKAGE = POE::XS::Queue::Array PREFIX = pq_
 
 PROTOTYPES: DISABLE
 
-POE::XS::Queue::Array
+# I hate ithreads, this was:
+#
+#  POE::XS::Queue::Array
+#  pq_new(class)
+
+SV *
 pq_new(class)
+      PREINIT:
+        poe_queue *pq;
+      CODE:
+       pq = pq_create();
+        RETVAL = NEWSV(1, 0);
+       sv_setref_pv(RETVAL, "POE::XS::Queue::Array", (void *)pq);
+        index_object(RETVAL);
+      OUTPUT:
+       RETVAL
 
 void
 pq_DESTROY(pq)
@@ -188,3 +245,63 @@ pq__set_errno_xs(value)
 void
 pq__set_errno_queue(value)
        int value
+
+#ifdef USE_ITHREADS
+
+# here be dragons and way too deep a magic
+
+void
+pq_CLONE(...)
+      PREINIT:
+       IV i;
+        AV *av = get_av(INDEX_AV, 1);
+        IV len = av_len(av);
+        CLONE_PARAMS params;
+      CODE:
+        /*fprintf(stderr, "cloning queues\n");*/
+       params.flags = CLONEf_KEEP_PTR_TABLE;
+       params.stashes = NULL;
+       params.proto_perl = NULL;
+       for (i = 0; i <= len; ++i) {
+          IV tmp;
+          poe_queue *pq, *dup_pq;
+          SV *rv;
+          SV **psv = av_fetch(av, i, 0);
+          if (psv && SvOK(*psv)) {
+           /* find the actual object and dupe the queue object */
+            rv = SvRV(*psv);
+            tmp = SvIV((SV *)rv);
+           pq = INT2PTR(poe_queue *, tmp);
+           dup_pq = pq_clone(pq);
+            sv_setiv(rv, PTR2IV(dup_pq));
+          }
+       }
+
+
+int
+pq__active_refs()
+      PREINIT:
+        AV *av = get_av(INDEX_AV, 1);
+        IV len = av_len(av);
+        IV i;
+      CODE:
+        RETVAL = 0;
+       for (i = 0; i <= len; ++i) {
+          SV **psv = av_fetch(av, i, 0);
+         if (psv && SvOK(*psv)) 
+            ++RETVAL;
+       }
+      OUTPUT:
+       RETVAL
+
+int
+pq__ref_store_size()
+      PREINIT:
+        AV *av = get_av(INDEX_AV, 1);
+      CODE:
+        RETVAL = av_len(av)+1;
+      OUTPUT:
+       RETVAL
+
+
+#endif
diff --git a/queue.c b/queue.c
index f56140fc2b7a7864e3fe2a67ca3dfad93de73dfc..bdbbaa56fbbbc0871cec5d47eeea6603bff0a873 100644 (file)
--- a/queue.c
+++ b/queue.c
@@ -1,6 +1,7 @@
 #include "EXTERN.h"\r
 #include "perl.h"\r
 #include "XSUB.h"\r
+#include "embed.h"\r
 \r
 #include "queue.h"\r
 #include "alloc.h"\r
@@ -116,6 +117,8 @@ pq_delete(poe_queue *pq) {
   DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
   if (pq->end > pq->start) {\r
     for (i = pq->start; i < pq->end; ++i) {\r
+      DEBUG( fprintf(stderr, "  %d: %p (%u)\n", i, pq->entries[i].payload, SvREFCNT(pq->entries[i].payload)) );\r
+      DEBUG( sv_dump(pq->entries[i].payload) );\r
       SvREFCNT_dec(pq->entries[i].payload);\r
     }\r
   }\r
@@ -783,19 +786,20 @@ pq_dump(poe_queue *pq) {
   int i;\r
   HE *he;\r
 \r
-  fprintf(stderr, "poe_queue\n");\r
+  fprintf(stderr, "poe_queue %p\n", pq);\r
   fprintf(stderr, "  start: %d\n", pq->start);\r
   fprintf(stderr, "    end: %d\n", pq->end);\r
   fprintf(stderr, "  alloc: %d\n", pq->alloc);\r
   fprintf(stderr, "    seq: %d\n", pq->queue_seq);\r
-  fprintf(stderr, "  **Queue Entries:\n"\r
-         "      index:   id  priority    SV\n");\r
+  fprintf(stderr, "  **Queue Entries (base %p):\n"\r
+         "      index:   id  priority    SV\n", pq->entries);\r
   for (i = pq->start; i < pq->end; ++i) {\r
     pq_entry *entry = pq->entries + i;\r
     fprintf(stderr, "      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,\r
           entry->payload, (unsigned)SvREFCNT(entry->payload));\r
+      DEBUG( sv_dump(entry->payload) );\r
   }\r
-  fprintf(stderr, "  **Hash entries:\n");\r
+  fprintf(stderr, "  **Hash entries (%p):\n", pq->ids);\r
   hv_iterinit(pq->ids);\r
   while ((he = hv_iternext(pq->ids)) != NULL) {\r
     STRLEN len;\r
@@ -841,3 +845,42 @@ pq__set_errno_queue(int value) {
   errno = value;\r
 }\r
 \r
+#ifdef USE_ITHREADS\r
+\r
+/*\r
+pq_clone - clone a queue (typically for a thread)\r
+\r
+*/\r
+poe_queue *\r
+pq_clone(poe_queue *pq) {\r
+  poe_queue *result = mymalloc(sizeof(poe_queue));\r
+  pq_entry const *from;\r
+  pq_entry *to;\r
+  int i;\r
+  CLONE_PARAMS params;\r
+\r
+  if (pq == NULL)\r
+    croak("Out of memory");\r
+\r
+  /* this isn't documented <shrug> */\r
+  params.flags = CLONEf_KEEP_PTR_TABLE;\r
+  params.stashes = NULL;\r
+  params.proto_perl = NULL;\r
+\r
+  *result = *pq;\r
+  result->ids = (HV *)SvREFCNT_inc(sv_dup((SV *)pq->ids, &params));\r
+  result->entries = mymalloc(sizeof(pq_entry) * pq->alloc);\r
+  from = pq->entries + pq->start;\r
+  to = result->entries + pq->start;\r
+  for (i = pq->start; i < pq->end; ++i) {\r
+    *to = *from;\r
+    to->payload = SvREFCNT_inc(sv_dup(from->payload, &params));\r
+  }\r
+\r
+  DEBUG( fprintf(stderr, "pq_clone(%p) => %p\n", pq, result) );\r
+\r
+  return result;\r
+}\r
+\r
+#endif\r
+\r
diff --git a/queue.h b/queue.h
index 62a43286f8cc29ca8bccd5d192b2c6d881dd583b..004c753b8a7942f5d4e46ecd38aa2ed965e94ffc 100644 (file)
--- a/queue.h
+++ b/queue.h
@@ -37,6 +37,10 @@ pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items);
 extern void pq_dump(poe_queue *pq);\r
 extern void pq_verify(poe_queue *pq);\r
 \r
+#ifdef USE_ITHREADS\r
+extern poe_queue *pq_clone(poe_queue *pq);\r
+#endif\r
+\r
 extern void\r
 pq__set_errno_queue(int value);\r
 \r
diff --git a/t/04_thread.t b/t/04_thread.t
new file mode 100644 (file)
index 0000000..4f83ebb
--- /dev/null
@@ -0,0 +1,107 @@
+#!perl -w
+use strict;
+use threads;
+use Test::More;
+use Config;
+use POE::XS::Queue::Array;
+
+$| =1;
+
+$Config{useithreads} && $Config{useithreads} eq 'define'
+  or plan skip_all => "No threads to test against";
+
+plan tests => 28;
+
+# check the weak ref logic
+{
+  is(POE::XS::Queue::Array::_active_refs(), 0, "start with none active");
+  my $q1 = POE::XS::Queue::Array->new;
+  is(POE::XS::Queue::Array::_active_refs(), 1, "one active");
+  my $q2 = POE::XS::Queue::Array->new;
+  is(POE::XS::Queue::Array::_active_refs(), 2, "two active");
+  undef $q1;
+  is(POE::XS::Queue::Array::_active_refs(), 1, "destroy one - one active");
+  undef $q2;
+  is(POE::XS::Queue::Array::_active_refs(), 0, "destroyed both - none active");
+}
+
+{
+  # simple clone check
+  my $q1 = POE::XS::Queue::Array->new;
+  my $first_id = $q1->enqueue(100, 101);
+  #print STDERR $q1;
+  #$q1->dump;
+  my $thread = threads->create
+    (
+     sub {
+       #print STDERR $q1;
+       #$q1->dump;
+       my $second_id =$q1->enqueue(200, 201);
+       is($second_id, 2, "check id of new item");
+       my ($pri, $id, $pay) = $q1->dequeue_next;
+       is($pri, 100, "check item queued first");
+       is($id, $first_id, "check id");
+       is($pay, 101, "check payload");
+
+       ($pri, $id, $pay) = $q1->dequeue_next;
+       is($pri, 200, "check item queued second");
+       is($id, $second_id, "check id");
+       is($pay, 201, "check payload");
+       is($q1->get_item_count, 0, "should be empty");
+     }
+    );
+  $thread->join;
+  is($q1->get_item_count, 1, "only one item");
+}
+
+{
+  # more complex clone check
+  package Obj;
+  our $created = 0;
+  our $destroyed = 0;
+  sub new {
+    ++$created;
+    my ($class, $id) = @_;
+    print "# create $id in thread ", threads->tid, "\n";
+    return bless \$id, $class;
+  }
+  sub id {
+    ${$_[0]};
+  }
+  sub DESTROY { 
+    my $self = shift;
+    print "# destroy $$self in thread ", threads->tid, "\n";
+    ++$destroyed;
+  }
+
+  package main;
+
+  my $q1 = POE::XS::Queue::Array->new;
+  my $first_id = $q1->enqueue(100, Obj->new(101));
+  my $thread = threads->create
+    (
+     sub {
+       my $second_id = $q1->enqueue(200, Obj->new(201));
+       is($second_id, 2, "check id of new item");
+       my ($pri, $id, $pay) = $q1->dequeue_next;
+       is($pri, 100, "check item queued first");
+       is($id, $first_id, "check id");
+       is($pay->id, 101, "check payload");
+       
+       ($pri, $id, $pay) = $q1->dequeue_next;
+       is($pri, 200, "check item queued second");
+       is($id, $second_id, "check id");
+       is($pay->id, 201, "check payload");
+       is($q1->get_item_count, 0, "should be empty");
+       undef $pay;
+       is($Obj::created, 2, "2 objects created in thread");
+       is($Obj::destroyed, 2, "2 objects destroyed in thread");
+     }
+    );
+  $thread->join;
+  is($q1->get_item_count, 1, "only 1 item left");
+  is($Obj::created, 1, "1 objects created in main");
+  is($Obj::destroyed, 0, "no objects destroyed in main");
+  undef $q1;
+  is($Obj::destroyed, 1, "1 objects destroyed in main after destroying queue");
+}