conversion between perl class and C type and back again */
typedef poe_queue *POE__XS__Queue__Array;
-/* This gives us our new method and correct destruction */
-#define pq_new(class) pq_create()
+/* This gives us correct destruction */
#define pq_DESTROY(pq) pq_delete(pq)
+#ifdef USE_ITHREADS
+
+#define INDEX_AV "POE::XS::Queue::Array::__obj_index"
+
+
+void
+index_object(SV *pq_sv) {
+ SV *copy;
+ IV i;
+ AV *av = get_av(INDEX_AV, 1);
+ IV len = av_len(av);
+
+ copy = newSVsv(pq_sv);
+
+ /* put it in our store */
+ for (i = 0; i <= len; ++i) {
+ SV **entry = av_fetch(av, i, 0);
+ if (!entry || !SvOK(*entry)) {
+ SV **stored = av_store(av, i, copy);
+ if (stored) {
+ sv_rvweaken(*stored);
+ return;
+ }
+ }
+ }
+
+ /* add it to the end */
+ {
+ SV **stored = av_store(av, len+1, copy);
+ if (stored) {
+ sv_rvweaken(*stored);
+ return;
+ }
+ else {
+ SvREFCNT_dec(copy);
+ croak("Cannot store weak copy at index %d in @" INDEX_AV, len+1);
+ }
+ }
+}
+
+#else
+#define index_object(obj)
+#endif
+
MODULE = POE::XS::Queue::Array PACKAGE = POE::XS::Queue::Array PREFIX = pq_
PROTOTYPES: DISABLE
-POE::XS::Queue::Array
+# I hate ithreads, this was:
+#
+# POE::XS::Queue::Array
+# pq_new(class)
+
+SV *
pq_new(class)
+ PREINIT:
+ poe_queue *pq;
+ CODE:
+ pq = pq_create();
+ RETVAL = NEWSV(1, 0);
+ sv_setref_pv(RETVAL, "POE::XS::Queue::Array", (void *)pq);
+ index_object(RETVAL);
+ OUTPUT:
+ RETVAL
void
pq_DESTROY(pq)
void
pq__set_errno_queue(value)
int value
+
+#ifdef USE_ITHREADS
+
+# here be dragons and way too deep a magic
+
+void
+pq_CLONE(...)
+ PREINIT:
+ IV i;
+ AV *av = get_av(INDEX_AV, 1);
+ IV len = av_len(av);
+ CLONE_PARAMS params;
+ CODE:
+ /*fprintf(stderr, "cloning queues\n");*/
+ params.flags = CLONEf_KEEP_PTR_TABLE;
+ params.stashes = NULL;
+ params.proto_perl = NULL;
+ for (i = 0; i <= len; ++i) {
+ IV tmp;
+ poe_queue *pq, *dup_pq;
+ SV *rv;
+ SV **psv = av_fetch(av, i, 0);
+ if (psv && SvOK(*psv)) {
+ /* find the actual object and dupe the queue object */
+ rv = SvRV(*psv);
+ tmp = SvIV((SV *)rv);
+ pq = INT2PTR(poe_queue *, tmp);
+ dup_pq = pq_clone(pq);
+ sv_setiv(rv, PTR2IV(dup_pq));
+ }
+ }
+
+
+int
+pq__active_refs()
+ PREINIT:
+ AV *av = get_av(INDEX_AV, 1);
+ IV len = av_len(av);
+ IV i;
+ CODE:
+ RETVAL = 0;
+ for (i = 0; i <= len; ++i) {
+ SV **psv = av_fetch(av, i, 0);
+ if (psv && SvOK(*psv))
+ ++RETVAL;
+ }
+ OUTPUT:
+ RETVAL
+
+int
+pq__ref_store_size()
+ PREINIT:
+ AV *av = get_av(INDEX_AV, 1);
+ CODE:
+ RETVAL = av_len(av)+1;
+ OUTPUT:
+ RETVAL
+
+
+#endif
#include "EXTERN.h"\r
#include "perl.h"\r
#include "XSUB.h"\r
+#include "embed.h"\r
\r
#include "queue.h"\r
#include "alloc.h"\r
DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
if (pq->end > pq->start) {\r
for (i = pq->start; i < pq->end; ++i) {\r
+ DEBUG( fprintf(stderr, " %d: %p (%u)\n", i, pq->entries[i].payload, SvREFCNT(pq->entries[i].payload)) );\r
+ DEBUG( sv_dump(pq->entries[i].payload) );\r
SvREFCNT_dec(pq->entries[i].payload);\r
}\r
}\r
int i;\r
HE *he;\r
\r
- fprintf(stderr, "poe_queue\n");\r
+ fprintf(stderr, "poe_queue %p\n", pq);\r
fprintf(stderr, " start: %d\n", pq->start);\r
fprintf(stderr, " end: %d\n", pq->end);\r
fprintf(stderr, " alloc: %d\n", pq->alloc);\r
fprintf(stderr, " seq: %d\n", pq->queue_seq);\r
- fprintf(stderr, " **Queue Entries:\n"\r
- " index: id priority SV\n");\r
+ fprintf(stderr, " **Queue Entries (base %p):\n"\r
+ " index: id priority SV\n", pq->entries);\r
for (i = pq->start; i < pq->end; ++i) {\r
pq_entry *entry = pq->entries + i;\r
fprintf(stderr, " %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,\r
entry->payload, (unsigned)SvREFCNT(entry->payload));\r
+ DEBUG( sv_dump(entry->payload) );\r
}\r
- fprintf(stderr, " **Hash entries:\n");\r
+ fprintf(stderr, " **Hash entries (%p):\n", pq->ids);\r
hv_iterinit(pq->ids);\r
while ((he = hv_iternext(pq->ids)) != NULL) {\r
STRLEN len;\r
errno = value;\r
}\r
\r
+#ifdef USE_ITHREADS\r
+\r
+/*\r
+pq_clone - clone a queue (typically for a thread)\r
+\r
+*/\r
+poe_queue *\r
+pq_clone(poe_queue *pq) {\r
+ poe_queue *result = mymalloc(sizeof(poe_queue));\r
+ pq_entry const *from;\r
+ pq_entry *to;\r
+ int i;\r
+ CLONE_PARAMS params;\r
+\r
+ if (pq == NULL)\r
+ croak("Out of memory");\r
+\r
+ /* this isn't documented <shrug> */\r
+ params.flags = CLONEf_KEEP_PTR_TABLE;\r
+ params.stashes = NULL;\r
+ params.proto_perl = NULL;\r
+\r
+ *result = *pq;\r
+ result->ids = (HV *)SvREFCNT_inc(sv_dup((SV *)pq->ids, ¶ms));\r
+ result->entries = mymalloc(sizeof(pq_entry) * pq->alloc);\r
+ from = pq->entries + pq->start;\r
+ to = result->entries + pq->start;\r
+ for (i = pq->start; i < pq->end; ++i) {\r
+ *to = *from;\r
+ to->payload = SvREFCNT_inc(sv_dup(from->payload, ¶ms));\r
+ }\r
+\r
+ DEBUG( fprintf(stderr, "pq_clone(%p) => %p\n", pq, result) );\r
+\r
+ return result;\r
+}\r
+\r
+#endif\r
+\r
extern void pq_dump(poe_queue *pq);\r
extern void pq_verify(poe_queue *pq);\r
\r
+#ifdef USE_ITHREADS\r
+extern poe_queue *pq_clone(poe_queue *pq);\r
+#endif\r
+\r
extern void\r
pq__set_errno_queue(int value);\r
\r
--- /dev/null
+#!perl -w
+use strict;
+use threads;
+use Test::More;
+use Config;
+use POE::XS::Queue::Array;
+
+$| =1;
+
+$Config{useithreads} && $Config{useithreads} eq 'define'
+ or plan skip_all => "No threads to test against";
+
+plan tests => 28;
+
+# check the weak ref logic
+{
+ is(POE::XS::Queue::Array::_active_refs(), 0, "start with none active");
+ my $q1 = POE::XS::Queue::Array->new;
+ is(POE::XS::Queue::Array::_active_refs(), 1, "one active");
+ my $q2 = POE::XS::Queue::Array->new;
+ is(POE::XS::Queue::Array::_active_refs(), 2, "two active");
+ undef $q1;
+ is(POE::XS::Queue::Array::_active_refs(), 1, "destroy one - one active");
+ undef $q2;
+ is(POE::XS::Queue::Array::_active_refs(), 0, "destroyed both - none active");
+}
+
+{
+ # simple clone check
+ my $q1 = POE::XS::Queue::Array->new;
+ my $first_id = $q1->enqueue(100, 101);
+ #print STDERR $q1;
+ #$q1->dump;
+ my $thread = threads->create
+ (
+ sub {
+ #print STDERR $q1;
+ #$q1->dump;
+ my $second_id =$q1->enqueue(200, 201);
+ is($second_id, 2, "check id of new item");
+ my ($pri, $id, $pay) = $q1->dequeue_next;
+ is($pri, 100, "check item queued first");
+ is($id, $first_id, "check id");
+ is($pay, 101, "check payload");
+
+ ($pri, $id, $pay) = $q1->dequeue_next;
+ is($pri, 200, "check item queued second");
+ is($id, $second_id, "check id");
+ is($pay, 201, "check payload");
+ is($q1->get_item_count, 0, "should be empty");
+ }
+ );
+ $thread->join;
+ is($q1->get_item_count, 1, "only one item");
+}
+
+{
+ # more complex clone check
+ package Obj;
+ our $created = 0;
+ our $destroyed = 0;
+ sub new {
+ ++$created;
+ my ($class, $id) = @_;
+ print "# create $id in thread ", threads->tid, "\n";
+ return bless \$id, $class;
+ }
+ sub id {
+ ${$_[0]};
+ }
+ sub DESTROY {
+ my $self = shift;
+ print "# destroy $$self in thread ", threads->tid, "\n";
+ ++$destroyed;
+ }
+
+ package main;
+
+ my $q1 = POE::XS::Queue::Array->new;
+ my $first_id = $q1->enqueue(100, Obj->new(101));
+ my $thread = threads->create
+ (
+ sub {
+ my $second_id = $q1->enqueue(200, Obj->new(201));
+ is($second_id, 2, "check id of new item");
+ my ($pri, $id, $pay) = $q1->dequeue_next;
+ is($pri, 100, "check item queued first");
+ is($id, $first_id, "check id");
+ is($pay->id, 101, "check payload");
+
+ ($pri, $id, $pay) = $q1->dequeue_next;
+ is($pri, 200, "check item queued second");
+ is($id, $second_id, "check id");
+ is($pay->id, 201, "check payload");
+ is($q1->get_item_count, 0, "should be empty");
+ undef $pay;
+ is($Obj::created, 2, "2 objects created in thread");
+ is($Obj::destroyed, 2, "2 objects destroyed in thread");
+ }
+ );
+ $thread->join;
+ is($q1->get_item_count, 1, "only 1 item left");
+ is($Obj::created, 1, "1 objects created in main");
+ is($Obj::destroyed, 0, "no objects destroyed in main");
+ undef $q1;
+ is($Obj::destroyed, 1, "1 objects destroyed in main after destroying queue");
+}