8 /*#define DEBUG(x) x*/
\r
10 /*#define DEBUG_ERR(x) x*/
\r
11 #define DEBUG_ERR(x)
\r
13 /*#define KEEP_STATS 1*/
\r
21 #define PQ_START_SIZE 10
\r
25 #define STUPID_IDS 0
\r
27 #define LARGE_QUEUE_SIZE 50
\r
30 We store the queue in a similar way to the way perl deals with arrays,
\r
31 we keep a block of memory, but the first element may or may not be in use,
\r
32 depending on the pattern of usage.
\r
34 There's 3 value controlling usage of the array:
\r
36 - alloc - the number of elements allocated in total
\r
37 - start - the first element in use in the array
\r
38 - end - one past the end of the last element in the array
\r
40 This has the properties that:
\r
42 start == 0 - no space at the front
\r
43 end == alloc - no space at the end
\r
44 end - start - number of elements in the queue
\r
46 We use a perl hash (HV *) to store the mapping from ids to priorities.
\r
49 struct poe_queue_tag {
\r
50 /* the first entry in use */
\r
53 /* 1 past the last entry in use, hence end - start is the number of
\r
54 entries in the queue */
\r
57 /* the total number of entries allocated */
\r
60 /* used to generate item ids */
\r
63 /* used to track in use item ids */
\r
66 /* the actual entries */
\r
76 poe_create - create a new queue object.
\r
78 No parameters. returns the new queue object.
\r
83 poe_queue *pq = mymalloc(sizeof(poe_queue));
\r
86 croak("Out of memory");
\r
89 pq->alloc = PQ_START_SIZE;
\r
92 pq->entries = mymalloc(sizeof(pq_entry) * PQ_START_SIZE);
\r
93 memset(pq->entries, 0, sizeof(pq_entry) * PQ_START_SIZE);
\r
94 if (pq->entries == NULL)
\r
95 croak("Out of memory");
\r
98 pq->total_finds = pq->binary_finds = 0;
\r
101 DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );
\r
107 pq_delete - release the queue object.
\r
109 This also releases one reference from each SV in the queue.
\r
113 pq_delete(poe_queue *pq) {
\r
116 DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );
\r
117 if (pq->end > pq->start) {
\r
118 for (i = pq->start; i < pq->end; ++i) {
\r
119 SvREFCNT_dec(pq->entries[i].payload);
\r
122 SvREFCNT_dec((SV *)pq->ids);
\r
125 myfree(pq->entries);
\r
126 pq->entries = NULL;
\r
131 pq_new_id - generate a new item id.
\r
135 This, the following 3 functions and pq_create, pq_delete, should be
\r
136 all that needs to be modified if we change hash implementations.
\r
141 pq_new_id(poe_queue *pq, pq_priority_t priority) {
\r
148 seq = ++pq->queue_seq;
\r
150 for (i = pq->start; i < pq->end; ++i) {
\r
151 if (pq->entries[i].id == seq) {
\r
160 pq_id_t seq = ++pq->queue_seq;;
\r
162 while (hv_exists(pq->ids, (char *)&seq, sizeof(seq))) {
\r
163 seq = ++pq->queue_seq;
\r
165 hv_store(pq->ids, (char *)&seq, sizeof(seq), newSVnv(priority), 0);
\r
172 pq_release_id - releases an id for future use.
\r
176 pq_release_id(poe_queue *pq, pq_id_t id) {
\r
179 hv_delete(pq->ids, (char *)&id, sizeof(id), 0);
\r
184 pq_item_priority - get the priority of an item given it's id
\r
188 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {
\r
192 for (i = pq->start; i < pq->end; ++i) {
\r
193 if (pq->entries[i].id == id) {
\r
194 *priority = pq->entries[i].priority;
\r
201 SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);
\r
203 if (!entry || !*entry)
\r
206 *priority = SvNV(*entry);
\r
213 pq_set_id_priority - set the priority of an item in the id hash
\r
217 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {
\r
219 /* nothing to do, caller set it in the array */
\r
221 SV **entry = hv_fetch(pq->ids, (char *)&id, sizeof(id), 0);
\r
223 if (!entry && !*entry)
\r
224 croak("pq_set_priority: id not found");
\r
226 sv_setnv(*entry, new_priority);
\r
231 pq_move_items - moves items around.
\r
233 This encapsulates the old calls to memmove(), providing a single place
\r
234 to add error checking.
\r
237 pq_move_items(poe_queue *pq, int target, int src, int count) {
\r
242 if (src < pq->start) {
\r
243 fprintf(stderr, "src %d less than start %d\n", src, pq->start);
\r
246 if (src + count > pq->end) {
\r
247 fprintf(stderr, "src %d + count %d beyond end %d\n", src, count, pq->end);
\r
251 fprintf(stderr, "target %d < 0\n", target);
\r
254 if (target + count > pq->alloc) {
\r
255 fprintf(stderr, "target %d + count %d > alloc %d\n", target, count, pq->alloc);
\r
258 if (die) *(char *)0 = '\0';
\r
261 memmove(pq->entries + target, pq->entries + src, count * sizeof(pq_entry));
\r
265 pq_realloc - make space at the front of back of the queue.
\r
267 This adjusts the queue to allow insertion of a single item at the
\r
268 front or the back of the queue.
\r
270 If the queue has 33% or more space available we simple adjust the
\r
271 position of the in-use items within the array. We try not to push the
\r
272 items right up against the opposite end of the array, since we might
\r
273 need to insert items there too.
\r
275 If the queue has less than 33% space available we allocate another 50%
\r
276 space. We then only move the queue elements if we need space at the
\r
277 front, since the reallocation has just opened up a huge space at the
\r
278 back. Since we're reallocating exponentially larger sizes we should
\r
279 have a constant time cost on reallocation per queue item stored (but
\r
280 other costs are going to be higher.)
\r
285 pq_realloc(poe_queue *pq, int at_end) {
\r
286 int count = pq->end - pq->start;
\r
288 DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );
\r
289 if (count * 3 / 2 < pq->alloc) {
\r
290 /* 33 % or more space available, use some of it */
\r
294 new_start = (pq->alloc - count) / 3;
\r
297 new_start = (pq->alloc - count) * 2 / 3;
\r
299 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
\r
300 pq_move_items(pq, new_start, pq->start, count);
\r
301 pq->start = new_start;
\r
302 pq->end = new_start + count;
\r
305 int new_alloc = pq->alloc * 3 / 2;
\r
306 pq->entries = myrealloc(pq->entries, sizeof(pq_entry) * new_alloc);
\r
307 pq->alloc = new_alloc;
\r
310 croak("Out of memory");
\r
312 DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );
\r
315 int new_start = (new_alloc - count) * 2 / 3;
\r
316 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );
\r
317 pq_move_items(pq, new_start, pq->start, count);
\r
318 pq->start = new_start;
\r
319 pq->end = new_start + count;
\r
322 DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );
\r
326 pq_insertion_point - figure out where to insert an item with the given
\r
333 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {
\r
334 if (pq->end - pq->start < LARGE_QUEUE_SIZE) {
\r
336 while (i > pq->start &&
\r
337 priority < pq->entries[i-1].priority) {
\r
343 int lower = pq->start;
\r
344 int upper = pq->end - 1;
\r
346 int midpoint = (lower + upper) >> 1;
\r
351 if (priority < pq->entries[midpoint].priority) {
\r
352 upper = midpoint - 1;
\r
355 if (priority > pq->entries[midpoint].priority) {
\r
356 lower = midpoint + 1;
\r
359 while (midpoint < pq->end &&
\r
360 priority == pq->entries[midpoint].priority) {
\r
369 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {
\r
371 pq_id_t id = pq_new_id(pq, priority);
\r
373 DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );
\r
374 if (pq->start == pq->end) {
\r
375 DEBUG( fprintf(stderr, " - on empty queue\n") );
\r
376 /* allow room at front and back for new entries */
\r
377 pq->start = pq->alloc / 3;
\r
378 pq->end = pq->start + 1;
\r
379 fill_at = pq->start;
\r
381 else if (priority >= pq->entries[pq->end-1].priority) {
\r
382 DEBUG( fprintf(stderr, " - at the end\n") );
\r
383 if (pq->end == pq->alloc)
\r
384 /* past the end - need to realloc or make some space */
\r
385 pq_realloc(pq, AT_END);
\r
390 else if (priority < pq->entries[pq->start].priority) {
\r
391 DEBUG( fprintf(stderr, " - at the front\n") );
\r
392 if (pq->start == 0)
\r
393 /* no space at the front, make some */
\r
394 pq_realloc(pq, AT_START);
\r
397 fill_at = pq->start;
\r
401 DEBUG( fprintf(stderr, " - in the middle\n") );
\r
402 i = pq_insertion_point(pq, priority);
\r
404 /* if we're near the end we want to push entries up, otherwise down */
\r
405 if (i - pq->start > (pq->end - pq->start) / 2) {
\r
406 DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",
\r
407 i, pq->start, pq->end) );
\r
408 /* make sure we have space, this might end up copying twice,
\r
409 but too bad for now */
\r
410 if (pq->end == pq->alloc) {
\r
411 int old_start = pq->start;
\r
412 pq_realloc(pq, AT_END);
\r
413 i += pq->start - old_start;
\r
416 pq_move_items(pq, i+1, i, pq->end - i);
\r
421 DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",
\r
422 i, pq->start, pq->end) );
\r
423 if (pq->start == 0) {
\r
424 pq_realloc(pq, AT_START);
\r
427 pq_move_items(pq, pq->start-1, pq->start, i - pq->start);
\r
432 pq->entries[fill_at].priority = priority;
\r
433 pq->entries[fill_at].id = id;
\r
434 pq->entries[fill_at].payload = newSVsv(payload);
\r
440 Note: it's up to the caller to release the SV. The XS code does this
\r
441 by making it mortal.
\r
444 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {
\r
446 /* the caller needs to release the payload (somehow) */
\r
447 if (pq->start == pq->end)
\r
450 entry = pq->entries + pq->start++;
\r
451 *priority = entry->priority;
\r
453 *payload = entry->payload;
\r
454 pq_release_id(pq, entry->id);
\r
460 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {
\r
461 if (pq->start == pq->end)
\r
464 *priority = pq->entries[pq->start].priority;
\r
469 pq_get_item_count(poe_queue *pq) {
\r
470 return pq->end - pq->start;
\r
474 pq_test_filter - the XS magic involved in passing the payload to a
\r
479 pq_test_filter(pq_entry *entry, SV *filter) {
\r
480 /* man perlcall for the magic here */
\r
489 XPUSHs(sv_2mortal(newSVsv(entry->payload)));
\r
492 count = call_sv(filter, G_SCALAR);
\r
497 croak("got other than 1 value in scalar context");
\r
500 result = SvTRUE(result_sv);
\r
510 pq_find_item - search for an item we know is there.
\r
516 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {
\r
519 STATS(++pq->total_finds);
\r
520 if (pq->end - pq->start < LARGE_QUEUE_SIZE) {
\r
521 for (i = pq->start; i < pq->end; ++i) {
\r
522 if (pq->entries[i].id == id)
\r
525 DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );
\r
526 croak("Internal inconsistency: event should have been found");
\r
529 /* try a binary search */
\r
530 /* simply translated from the perl */
\r
531 STATS(++pq->binary_finds);
\r
533 int lower = pq->start;
\r
534 int upper = pq->end - 1;
\r
537 int midpoint = (upper + lower) >> 1;
\r
538 if (upper < lower) {
\r
539 croak("Internal inconsistency, priorities out of order");
\r
541 if (priority < pq->entries[midpoint].priority) {
\r
542 upper = midpoint - 1;
\r
545 if (priority > pq->entries[midpoint].priority) {
\r
546 lower = midpoint + 1;
\r
549 linear_point = midpoint;
\r
550 while (linear_point >= pq->start &&
\r
551 priority == pq->entries[linear_point].priority) {
\r
552 if (pq->entries[linear_point].id == id)
\r
553 return linear_point;
\r
556 linear_point = midpoint;
\r
557 while ( (++linear_point < pq->end) &&
\r
558 priority == pq->entries[linear_point].priority) {
\r
559 if (pq->entries[linear_point].id == id)
\r
560 return linear_point;
\r
563 croak("internal inconsistency: event should have been found");
\r
569 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {
\r
570 pq_priority_t priority;
\r
573 if (!pq_item_priority(pq, id, &priority)) {
\r
578 index = pq_find_item(pq, id, priority);
\r
580 if (!pq_test_filter(pq->entries + index, filter)) {
\r
585 *removed = pq->entries[index];
\r
586 pq_release_id(pq, id);
\r
587 if (index == pq->start) {
\r
590 else if (index == pq->end - 1) {
\r
594 pq_move_items(pq, index, index+1, pq->end - index - 1);
\r
597 DEBUG( fprintf(stderr, "removed (%d, %p (%d))\n", id, removed->payload,
\r
598 SvREFCNT(removed->payload)) );
\r
604 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {
\r
605 int in_index, out_index;
\r
606 int remove_count = 0;
\r
609 if (pq->start == pq->end)
\r
612 *entries = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));
\r
614 croak("Out of memory");
\r
616 in_index = out_index = pq->start;
\r
617 while (in_index < pq->end && remove_count < max_count) {
\r
618 if (pq_test_filter(pq->entries + in_index, filter)) {
\r
619 pq_release_id(pq, pq->entries[in_index].id);
\r
620 (*entries)[remove_count++] = pq->entries[in_index++];
\r
623 pq->entries[out_index++] = pq->entries[in_index++];
\r
626 while (in_index < pq->end) {
\r
627 pq->entries[out_index++] = pq->entries[in_index++];
\r
629 pq->end = out_index;
\r
631 return remove_count;
\r
635 We need to keep the following 2 functions in sync (or combine the
\r
639 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {
\r
640 pq_priority_t old_priority;
\r
641 int index, insert_at;
\r
643 if (!pq_item_priority(pq, id, &old_priority)) {
\r
648 index = pq_find_item(pq, id, old_priority);
\r
650 if (!pq_test_filter(pq->entries + index, filter)) {
\r
655 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
\r
657 if (pq->end - pq->start == 1) {
\r
658 DEBUG( fprintf(stderr, " -- one item\n") );
\r
659 /* only the one item anyway */
\r
660 pq->entries[pq->start].priority = new_priority;
\r
663 insert_at = pq_insertion_point(pq, new_priority);
\r
664 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
\r
665 /* the item is still in the queue, so either side of it means it
\r
667 if (insert_at == index || insert_at == index+1) {
\r
668 DEBUG( fprintf(stderr, " -- change in place\n") );
\r
669 pq->entries[index].priority = new_priority;
\r
672 pq_entry saved = pq->entries[index];
\r
673 saved.priority = new_priority;
\r
675 if (insert_at < index) {
\r
676 DEBUG( fprintf(stderr, " - insert_at < index\n") );
\r
677 pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);
\r
678 pq->entries[insert_at] = saved;
\r
681 DEBUG( fprintf(stderr, " - insert_at > index\n") );
\r
683 pq_move_items(pq, index, index + 1, insert_at - index);
\r
684 pq->entries[insert_at] = saved;
\r
689 pq_set_id_priority(pq, id, new_priority);
\r
695 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {
\r
696 pq_priority_t old_priority, new_priority;
\r
697 int index, insert_at;
\r
699 DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );
\r
701 if (!pq_item_priority(pq, id, &old_priority)) {
\r
706 index = pq_find_item(pq, id, old_priority);
\r
708 if (!pq_test_filter(pq->entries + index, filter)) {
\r
713 new_priority = old_priority + delta;
\r
715 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );
\r
717 if (pq->end - pq->start == 1) {
\r
718 DEBUG( fprintf(stderr, " -- one item\n") );
\r
719 /* only the one item anyway */
\r
720 pq->entries[pq->start].priority = new_priority;
\r
723 insert_at = pq_insertion_point(pq, new_priority);
\r
724 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );
\r
725 /* the item is still in the queue, so either side of it means it
\r
727 if (insert_at == index || insert_at == index+1) {
\r
728 DEBUG( fprintf(stderr, " -- change in place\n") );
\r
729 pq->entries[index].priority = new_priority;
\r
732 pq_entry saved = pq->entries[index];
\r
733 saved.priority = new_priority;
\r
735 if (insert_at < index) {
\r
736 DEBUG( fprintf(stderr, " - insert_at < index\n") );
\r
737 pq_move_items(pq, insert_at + 1, insert_at, index - insert_at);
\r
738 pq->entries[insert_at] = saved;
\r
741 DEBUG( fprintf(stderr, " - insert_at > index\n") );
\r
743 pq_move_items(pq, index, index + 1, insert_at - index);
\r
744 pq->entries[insert_at] = saved;
\r
749 pq_set_id_priority(pq, id, new_priority);
\r
750 *priority = new_priority;
\r
756 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {
\r
761 if (pq->end == pq->start)
\r
764 *items = mymalloc(sizeof(pq_entry) * (pq->end - pq->start));
\r
765 for (i = pq->start; i < pq->end; ++i) {
\r
766 if (pq_test_filter(pq->entries + i, filter)) {
\r
767 (*items)[count++] = pq->entries[i];
\r
779 pq_dump - dump the internals of the queue structure.
\r
782 pq_dump(poe_queue *pq) {
\r
786 fprintf(stderr, "poe_queue\n");
\r
787 fprintf(stderr, " start: %d\n", pq->start);
\r
788 fprintf(stderr, " end: %d\n", pq->end);
\r
789 fprintf(stderr, " alloc: %d\n", pq->alloc);
\r
790 fprintf(stderr, " seq: %d\n", pq->queue_seq);
\r
791 fprintf(stderr, " **Queue Entries:\n"
\r
792 " index: id priority SV\n");
\r
793 for (i = pq->start; i < pq->end; ++i) {
\r
794 pq_entry *entry = pq->entries + i;
\r
795 fprintf(stderr, " %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,
\r
796 entry->payload, (unsigned)SvREFCNT(entry->payload));
\r
798 fprintf(stderr, " **Hash entries:\n");
\r
799 hv_iterinit(pq->ids);
\r
800 while ((he = hv_iternext(pq->ids)) != NULL) {
\r
802 fprintf(stderr, " %d => %f\n", *(pq_id_t *)HePV(he, len), SvNV(hv_iterval(pq->ids, he)));
\r
807 pq_verify - basic verification of the structure of the queue
\r
809 For now check for duplicate ids in sequence.
\r
812 pq_verify(poe_queue *pq) {
\r
817 if (pq->start != pq->end) {
\r
818 lastid = pq->entries[pq->start].id;
\r
820 while (i < pq->end) {
\r
821 if (pq->entries[i].id == lastid) {
\r
822 fprintf(stderr, "Duplicate id %d at %d\n", lastid, i);
\r
835 pq__set_errno_queue - set errno
\r
837 This just sets errno for testing purposes.
\r
840 pq__set_errno_queue(int value) {
\r