revert threads changes, they didn't work with pseudo-fork and it
authorTony Cook <tony@develop=help.com>
Sun, 15 Mar 2009 00:11:10 +0000 (00:11 +0000)
committerTony Cook <tony@develop=help.com>
Sun, 15 Mar 2009 00:11:10 +0000 (00:11 +0000)
doesn't seem possible to make it work, we'll try the quick and easy
way

Array.xs
queue.c
queue.h
t/04_thread.t [deleted file]

index 8a64c64..6d515aa 100644 (file)
--- a/Array.xs
+++ b/Array.xs
 conversion between perl class and C type and back again */
 typedef poe_queue *POE__XS__Queue__Array;
 
-/* This gives us correct destruction */
+/* This gives us our new method and correct destruction */
+#define pq_new(class) pq_create()
 #define pq_DESTROY(pq) pq_delete(pq)
 
-#ifdef USE_ITHREADS
-
-#define INDEX_AV "POE::XS::Queue::Array::__obj_index"
-
-
-void
-index_object(SV *pq_sv) {
-  SV *copy;
-  IV i;
-  AV *av = get_av(INDEX_AV, 1);
-  IV len = av_len(av);
-  
-  copy = newSVsv(pq_sv);
-
-  /* put it in our store */
-  for (i = 0; i <= len; ++i) {
-    SV **entry = av_fetch(av, i, 0);
-    if (!entry || !SvOK(*entry)) {
-      SV **stored  = av_store(av, i, copy);
-      if (stored) {
-        sv_rvweaken(*stored);
-       return;
-      }
-    }
-  }
-
-  /* add it to the end */
-  {
-    SV **stored = av_store(av, len+1, copy);
-    if (stored) {
-      sv_rvweaken(*stored);
-      return;
-    }
-    else {
-      SvREFCNT_dec(copy);
-      croak("Cannot store weak copy at index %d in @" INDEX_AV, len+1);
-    }
-  }
-}
-
-#else
-#define index_object(obj)
-#endif
-
 MODULE = POE::XS::Queue::Array  PACKAGE = POE::XS::Queue::Array PREFIX = pq_
 
 PROTOTYPES: DISABLE
 
-# I hate ithreads, this was:
-#
-#  POE::XS::Queue::Array
-#  pq_new(class)
-
-SV *
+POE::XS::Queue::Array
 pq_new(class)
-      PREINIT:
-        poe_queue *pq;
-      CODE:
-       pq = pq_create();
-        RETVAL = NEWSV(1, 0);
-       sv_setref_pv(RETVAL, "POE::XS::Queue::Array", (void *)pq);
-        index_object(RETVAL);
-      OUTPUT:
-       RETVAL
 
 void
 pq_DESTROY(pq)
@@ -245,63 +188,3 @@ pq__set_errno_xs(value)
 void
 pq__set_errno_queue(value)
        int value
-
-#ifdef USE_ITHREADS
-
-# here be dragons and way too deep a magic
-
-void
-pq_CLONE(...)
-      PREINIT:
-       IV i;
-        AV *av = get_av(INDEX_AV, 1);
-        IV len = av_len(av);
-        CLONE_PARAMS params;
-      CODE:
-        /*fprintf(stderr, "cloning queues\n");*/
-       params.flags = CLONEf_KEEP_PTR_TABLE;
-       params.stashes = NULL;
-       params.proto_perl = NULL;
-       for (i = 0; i <= len; ++i) {
-          IV tmp;
-          poe_queue *pq, *dup_pq;
-          SV *rv;
-          SV **psv = av_fetch(av, i, 0);
-          if (psv && SvOK(*psv)) {
-           /* find the actual object and dupe the queue object */
-            rv = SvRV(*psv);
-            tmp = SvIV((SV *)rv);
-           pq = INT2PTR(poe_queue *, tmp);
-           dup_pq = pq_clone(pq);
-            sv_setiv(rv, PTR2IV(dup_pq));
-          }
-       }
-
-
-int
-pq__active_refs()
-      PREINIT:
-        AV *av = get_av(INDEX_AV, 1);
-        IV len = av_len(av);
-        IV i;
-      CODE:
-        RETVAL = 0;
-       for (i = 0; i <= len; ++i) {
-          SV **psv = av_fetch(av, i, 0);
-         if (psv && SvOK(*psv)) 
-            ++RETVAL;
-       }
-      OUTPUT:
-       RETVAL
-
-int
-pq__ref_store_size()
-      PREINIT:
-        AV *av = get_av(INDEX_AV, 1);
-      CODE:
-        RETVAL = av_len(av)+1;
-      OUTPUT:
-       RETVAL
-
-
-#endif
diff --git a/queue.c b/queue.c
index bdbbaa5..f56140f 100644 (file)
--- a/queue.c
+++ b/queue.c
@@ -1,7 +1,6 @@
 #include "EXTERN.h"\r
 #include "perl.h"\r
 #include "XSUB.h"\r
-#include "embed.h"\r
 \r
 #include "queue.h"\r
 #include "alloc.h"\r
@@ -117,8 +116,6 @@ pq_delete(poe_queue *pq) {
   DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
   if (pq->end > pq->start) {\r
     for (i = pq->start; i < pq->end; ++i) {\r
-      DEBUG( fprintf(stderr, "  %d: %p (%u)\n", i, pq->entries[i].payload, SvREFCNT(pq->entries[i].payload)) );\r
-      DEBUG( sv_dump(pq->entries[i].payload) );\r
       SvREFCNT_dec(pq->entries[i].payload);\r
     }\r
   }\r
@@ -786,20 +783,19 @@ pq_dump(poe_queue *pq) {
   int i;\r
   HE *he;\r
 \r
-  fprintf(stderr, "poe_queue %p\n", pq);\r
+  fprintf(stderr, "poe_queue\n");\r
   fprintf(stderr, "  start: %d\n", pq->start);\r
   fprintf(stderr, "    end: %d\n", pq->end);\r
   fprintf(stderr, "  alloc: %d\n", pq->alloc);\r
   fprintf(stderr, "    seq: %d\n", pq->queue_seq);\r
-  fprintf(stderr, "  **Queue Entries (base %p):\n"\r
-         "      index:   id  priority    SV\n", pq->entries);\r
+  fprintf(stderr, "  **Queue Entries:\n"\r
+         "      index:   id  priority    SV\n");\r
   for (i = pq->start; i < pq->end; ++i) {\r
     pq_entry *entry = pq->entries + i;\r
     fprintf(stderr, "      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,\r
           entry->payload, (unsigned)SvREFCNT(entry->payload));\r
-      DEBUG( sv_dump(entry->payload) );\r
   }\r
-  fprintf(stderr, "  **Hash entries (%p):\n", pq->ids);\r
+  fprintf(stderr, "  **Hash entries:\n");\r
   hv_iterinit(pq->ids);\r
   while ((he = hv_iternext(pq->ids)) != NULL) {\r
     STRLEN len;\r
@@ -845,42 +841,3 @@ pq__set_errno_queue(int value) {
   errno = value;\r
 }\r
 \r
-#ifdef USE_ITHREADS\r
-\r
-/*\r
-pq_clone - clone a queue (typically for a thread)\r
-\r
-*/\r
-poe_queue *\r
-pq_clone(poe_queue *pq) {\r
-  poe_queue *result = mymalloc(sizeof(poe_queue));\r
-  pq_entry const *from;\r
-  pq_entry *to;\r
-  int i;\r
-  CLONE_PARAMS params;\r
-\r
-  if (pq == NULL)\r
-    croak("Out of memory");\r
-\r
-  /* this isn't documented <shrug> */\r
-  params.flags = CLONEf_KEEP_PTR_TABLE;\r
-  params.stashes = NULL;\r
-  params.proto_perl = NULL;\r
-\r
-  *result = *pq;\r
-  result->ids = (HV *)SvREFCNT_inc(sv_dup((SV *)pq->ids, &params));\r
-  result->entries = mymalloc(sizeof(pq_entry) * pq->alloc);\r
-  from = pq->entries + pq->start;\r
-  to = result->entries + pq->start;\r
-  for (i = pq->start; i < pq->end; ++i) {\r
-    *to = *from;\r
-    to->payload = SvREFCNT_inc(sv_dup(from->payload, &params));\r
-  }\r
-\r
-  DEBUG( fprintf(stderr, "pq_clone(%p) => %p\n", pq, result) );\r
-\r
-  return result;\r
-}\r
-\r
-#endif\r
-\r
diff --git a/queue.h b/queue.h
index 004c753..62a4328 100644 (file)
--- a/queue.h
+++ b/queue.h
@@ -37,10 +37,6 @@ pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items);
 extern void pq_dump(poe_queue *pq);\r
 extern void pq_verify(poe_queue *pq);\r
 \r
-#ifdef USE_ITHREADS\r
-extern poe_queue *pq_clone(poe_queue *pq);\r
-#endif\r
-\r
 extern void\r
 pq__set_errno_queue(int value);\r
 \r
diff --git a/t/04_thread.t b/t/04_thread.t
deleted file mode 100644 (file)
index 4f83ebb..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-#!perl -w
-use strict;
-use threads;
-use Test::More;
-use Config;
-use POE::XS::Queue::Array;
-
-$| =1;
-
-$Config{useithreads} && $Config{useithreads} eq 'define'
-  or plan skip_all => "No threads to test against";
-
-plan tests => 28;
-
-# check the weak ref logic
-{
-  is(POE::XS::Queue::Array::_active_refs(), 0, "start with none active");
-  my $q1 = POE::XS::Queue::Array->new;
-  is(POE::XS::Queue::Array::_active_refs(), 1, "one active");
-  my $q2 = POE::XS::Queue::Array->new;
-  is(POE::XS::Queue::Array::_active_refs(), 2, "two active");
-  undef $q1;
-  is(POE::XS::Queue::Array::_active_refs(), 1, "destroy one - one active");
-  undef $q2;
-  is(POE::XS::Queue::Array::_active_refs(), 0, "destroyed both - none active");
-}
-
-{
-  # simple clone check
-  my $q1 = POE::XS::Queue::Array->new;
-  my $first_id = $q1->enqueue(100, 101);
-  #print STDERR $q1;
-  #$q1->dump;
-  my $thread = threads->create
-    (
-     sub {
-       #print STDERR $q1;
-       #$q1->dump;
-       my $second_id =$q1->enqueue(200, 201);
-       is($second_id, 2, "check id of new item");
-       my ($pri, $id, $pay) = $q1->dequeue_next;
-       is($pri, 100, "check item queued first");
-       is($id, $first_id, "check id");
-       is($pay, 101, "check payload");
-
-       ($pri, $id, $pay) = $q1->dequeue_next;
-       is($pri, 200, "check item queued second");
-       is($id, $second_id, "check id");
-       is($pay, 201, "check payload");
-       is($q1->get_item_count, 0, "should be empty");
-     }
-    );
-  $thread->join;
-  is($q1->get_item_count, 1, "only one item");
-}
-
-{
-  # more complex clone check
-  package Obj;
-  our $created = 0;
-  our $destroyed = 0;
-  sub new {
-    ++$created;
-    my ($class, $id) = @_;
-    print "# create $id in thread ", threads->tid, "\n";
-    return bless \$id, $class;
-  }
-  sub id {
-    ${$_[0]};
-  }
-  sub DESTROY { 
-    my $self = shift;
-    print "# destroy $$self in thread ", threads->tid, "\n";
-    ++$destroyed;
-  }
-
-  package main;
-
-  my $q1 = POE::XS::Queue::Array->new;
-  my $first_id = $q1->enqueue(100, Obj->new(101));
-  my $thread = threads->create
-    (
-     sub {
-       my $second_id = $q1->enqueue(200, Obj->new(201));
-       is($second_id, 2, "check id of new item");
-       my ($pri, $id, $pay) = $q1->dequeue_next;
-       is($pri, 100, "check item queued first");
-       is($id, $first_id, "check id");
-       is($pay->id, 101, "check payload");
-       
-       ($pri, $id, $pay) = $q1->dequeue_next;
-       is($pri, 200, "check item queued second");
-       is($id, $second_id, "check id");
-       is($pay->id, 201, "check payload");
-       is($q1->get_item_count, 0, "should be empty");
-       undef $pay;
-       is($Obj::created, 2, "2 objects created in thread");
-       is($Obj::destroyed, 2, "2 objects destroyed in thread");
-     }
-    );
-  $thread->join;
-  is($q1->get_item_count, 1, "only 1 item left");
-  is($Obj::created, 1, "1 objects created in main");
-  is($Obj::destroyed, 0, "no objects destroyed in main");
-  undef $q1;
-  is($Obj::destroyed, 1, "1 objects destroyed in main after destroying queue");
-}