conversion between perl class and C type and back again */
typedef poe_queue *POE__XS__Queue__Array;
-/* This gives us correct destruction */
+/* This gives us our new method and correct destruction */
+#define pq_new(class) pq_create()
#define pq_DESTROY(pq) pq_delete(pq)
-#ifdef USE_ITHREADS
-
-#define INDEX_AV "POE::XS::Queue::Array::__obj_index"
-
-
-void
-index_object(SV *pq_sv) {
- SV *copy;
- IV i;
- AV *av = get_av(INDEX_AV, 1);
- IV len = av_len(av);
-
- copy = newSVsv(pq_sv);
-
- /* put it in our store */
- for (i = 0; i <= len; ++i) {
- SV **entry = av_fetch(av, i, 0);
- if (!entry || !SvOK(*entry)) {
- SV **stored = av_store(av, i, copy);
- if (stored) {
- sv_rvweaken(*stored);
- return;
- }
- }
- }
-
- /* add it to the end */
- {
- SV **stored = av_store(av, len+1, copy);
- if (stored) {
- sv_rvweaken(*stored);
- return;
- }
- else {
- SvREFCNT_dec(copy);
- croak("Cannot store weak copy at index %d in @" INDEX_AV, len+1);
- }
- }
-}
-
-#else
-#define index_object(obj)
-#endif
-
MODULE = POE::XS::Queue::Array PACKAGE = POE::XS::Queue::Array PREFIX = pq_
PROTOTYPES: DISABLE
-# I hate ithreads, this was:
-#
-# POE::XS::Queue::Array
-# pq_new(class)
-
-SV *
+POE::XS::Queue::Array
pq_new(class)
- PREINIT:
- poe_queue *pq;
- CODE:
- pq = pq_create();
- RETVAL = NEWSV(1, 0);
- sv_setref_pv(RETVAL, "POE::XS::Queue::Array", (void *)pq);
- index_object(RETVAL);
- OUTPUT:
- RETVAL
void
pq_DESTROY(pq)
void
pq__set_errno_queue(value)
int value
-
-#ifdef USE_ITHREADS
-
-# here be dragons and way too deep a magic
-
-void
-pq_CLONE(...)
- PREINIT:
- IV i;
- AV *av = get_av(INDEX_AV, 1);
- IV len = av_len(av);
- CLONE_PARAMS params;
- CODE:
- /*fprintf(stderr, "cloning queues\n");*/
- params.flags = CLONEf_KEEP_PTR_TABLE;
- params.stashes = NULL;
- params.proto_perl = NULL;
- for (i = 0; i <= len; ++i) {
- IV tmp;
- poe_queue *pq, *dup_pq;
- SV *rv;
- SV **psv = av_fetch(av, i, 0);
- if (psv && SvOK(*psv)) {
- /* find the actual object and dupe the queue object */
- rv = SvRV(*psv);
- tmp = SvIV((SV *)rv);
- pq = INT2PTR(poe_queue *, tmp);
- dup_pq = pq_clone(pq);
- sv_setiv(rv, PTR2IV(dup_pq));
- }
- }
-
-
-int
-pq__active_refs()
- PREINIT:
- AV *av = get_av(INDEX_AV, 1);
- IV len = av_len(av);
- IV i;
- CODE:
- RETVAL = 0;
- for (i = 0; i <= len; ++i) {
- SV **psv = av_fetch(av, i, 0);
- if (psv && SvOK(*psv))
- ++RETVAL;
- }
- OUTPUT:
- RETVAL
-
-int
-pq__ref_store_size()
- PREINIT:
- AV *av = get_av(INDEX_AV, 1);
- CODE:
- RETVAL = av_len(av)+1;
- OUTPUT:
- RETVAL
-
-
-#endif
#include "EXTERN.h"\r
#include "perl.h"\r
#include "XSUB.h"\r
-#include "embed.h"\r
\r
#include "queue.h"\r
#include "alloc.h"\r
DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
if (pq->end > pq->start) {\r
for (i = pq->start; i < pq->end; ++i) {\r
- DEBUG( fprintf(stderr, " %d: %p (%u)\n", i, pq->entries[i].payload, SvREFCNT(pq->entries[i].payload)) );\r
- DEBUG( sv_dump(pq->entries[i].payload) );\r
SvREFCNT_dec(pq->entries[i].payload);\r
}\r
}\r
int i;\r
HE *he;\r
\r
- fprintf(stderr, "poe_queue %p\n", pq);\r
+ fprintf(stderr, "poe_queue\n");\r
fprintf(stderr, " start: %d\n", pq->start);\r
fprintf(stderr, " end: %d\n", pq->end);\r
fprintf(stderr, " alloc: %d\n", pq->alloc);\r
fprintf(stderr, " seq: %d\n", pq->queue_seq);\r
- fprintf(stderr, " **Queue Entries (base %p):\n"\r
- " index: id priority SV\n", pq->entries);\r
+ fprintf(stderr, " **Queue Entries:\n"\r
+ " index: id priority SV\n");\r
for (i = pq->start; i < pq->end; ++i) {\r
pq_entry *entry = pq->entries + i;\r
fprintf(stderr, " %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,\r
entry->payload, (unsigned)SvREFCNT(entry->payload));\r
- DEBUG( sv_dump(entry->payload) );\r
}\r
- fprintf(stderr, " **Hash entries (%p):\n", pq->ids);\r
+ fprintf(stderr, " **Hash entries:\n");\r
hv_iterinit(pq->ids);\r
while ((he = hv_iternext(pq->ids)) != NULL) {\r
STRLEN len;\r
errno = value;\r
}\r
\r
-#ifdef USE_ITHREADS\r
-\r
-/*\r
-pq_clone - clone a queue (typically for a thread)\r
-\r
-*/\r
-poe_queue *\r
-pq_clone(poe_queue *pq) {\r
- poe_queue *result = mymalloc(sizeof(poe_queue));\r
- pq_entry const *from;\r
- pq_entry *to;\r
- int i;\r
- CLONE_PARAMS params;\r
-\r
- if (pq == NULL)\r
- croak("Out of memory");\r
-\r
- /* this isn't documented <shrug> */\r
- params.flags = CLONEf_KEEP_PTR_TABLE;\r
- params.stashes = NULL;\r
- params.proto_perl = NULL;\r
-\r
- *result = *pq;\r
- result->ids = (HV *)SvREFCNT_inc(sv_dup((SV *)pq->ids, ¶ms));\r
- result->entries = mymalloc(sizeof(pq_entry) * pq->alloc);\r
- from = pq->entries + pq->start;\r
- to = result->entries + pq->start;\r
- for (i = pq->start; i < pq->end; ++i) {\r
- *to = *from;\r
- to->payload = SvREFCNT_inc(sv_dup(from->payload, ¶ms));\r
- }\r
-\r
- DEBUG( fprintf(stderr, "pq_clone(%p) => %p\n", pq, result) );\r
-\r
- return result;\r
-}\r
-\r
-#endif\r
-\r
extern void pq_dump(poe_queue *pq);\r
extern void pq_verify(poe_queue *pq);\r
\r
-#ifdef USE_ITHREADS\r
-extern poe_queue *pq_clone(poe_queue *pq);\r
-#endif\r
-\r
extern void\r
pq__set_errno_queue(int value);\r
\r
+++ /dev/null
-#!perl -w
-use strict;
-use threads;
-use Test::More;
-use Config;
-use POE::XS::Queue::Array;
-
-$| =1;
-
-$Config{useithreads} && $Config{useithreads} eq 'define'
- or plan skip_all => "No threads to test against";
-
-plan tests => 28;
-
-# check the weak ref logic
-{
- is(POE::XS::Queue::Array::_active_refs(), 0, "start with none active");
- my $q1 = POE::XS::Queue::Array->new;
- is(POE::XS::Queue::Array::_active_refs(), 1, "one active");
- my $q2 = POE::XS::Queue::Array->new;
- is(POE::XS::Queue::Array::_active_refs(), 2, "two active");
- undef $q1;
- is(POE::XS::Queue::Array::_active_refs(), 1, "destroy one - one active");
- undef $q2;
- is(POE::XS::Queue::Array::_active_refs(), 0, "destroyed both - none active");
-}
-
-{
- # simple clone check
- my $q1 = POE::XS::Queue::Array->new;
- my $first_id = $q1->enqueue(100, 101);
- #print STDERR $q1;
- #$q1->dump;
- my $thread = threads->create
- (
- sub {
- #print STDERR $q1;
- #$q1->dump;
- my $second_id =$q1->enqueue(200, 201);
- is($second_id, 2, "check id of new item");
- my ($pri, $id, $pay) = $q1->dequeue_next;
- is($pri, 100, "check item queued first");
- is($id, $first_id, "check id");
- is($pay, 101, "check payload");
-
- ($pri, $id, $pay) = $q1->dequeue_next;
- is($pri, 200, "check item queued second");
- is($id, $second_id, "check id");
- is($pay, 201, "check payload");
- is($q1->get_item_count, 0, "should be empty");
- }
- );
- $thread->join;
- is($q1->get_item_count, 1, "only one item");
-}
-
-{
- # more complex clone check
- package Obj;
- our $created = 0;
- our $destroyed = 0;
- sub new {
- ++$created;
- my ($class, $id) = @_;
- print "# create $id in thread ", threads->tid, "\n";
- return bless \$id, $class;
- }
- sub id {
- ${$_[0]};
- }
- sub DESTROY {
- my $self = shift;
- print "# destroy $$self in thread ", threads->tid, "\n";
- ++$destroyed;
- }
-
- package main;
-
- my $q1 = POE::XS::Queue::Array->new;
- my $first_id = $q1->enqueue(100, Obj->new(101));
- my $thread = threads->create
- (
- sub {
- my $second_id = $q1->enqueue(200, Obj->new(201));
- is($second_id, 2, "check id of new item");
- my ($pri, $id, $pay) = $q1->dequeue_next;
- is($pri, 100, "check item queued first");
- is($id, $first_id, "check id");
- is($pay->id, 101, "check payload");
-
- ($pri, $id, $pay) = $q1->dequeue_next;
- is($pri, 200, "check item queued second");
- is($id, $second_id, "check id");
- is($pay->id, 201, "check payload");
- is($q1->get_item_count, 0, "should be empty");
- undef $pay;
- is($Obj::created, 2, "2 objects created in thread");
- is($Obj::destroyed, 2, "2 objects destroyed in thread");
- }
- );
- $thread->join;
- is($q1->get_item_count, 1, "only 1 item left");
- is($Obj::created, 1, "1 objects created in main");
- is($Obj::destroyed, 0, "no objects destroyed in main");
- undef $q1;
- is($Obj::destroyed, 1, "1 objects destroyed in main after destroying queue");
-}