split the working code out from the XS file
[poe-xs-queue-array.git] / queue.c
1 #include "EXTERN.h"\r
2 #include "perl.h"\r
3 #include "XSUB.h"\r
4 \r
5 #include "queue.h"\r
6 \r
7 #define DEBUG(x) x\r
8 /*#define DEBUG(x)*/\r
9 \r
10 #define PQ_START_SIZE 10\r
11 #define AT_START 0\r
12 #define AT_END 1\r
13 \r
14 pq_id_t queue_seq;\r
15 \r
16 /*\r
17 We store the queue in a similar way to the way perl deals with arrays,\r
18 we keep a block of memory, but the first element may or may not be in use,\r
19 depending on the pattern of usage.\r
20 \r
21 There's 3 value controlling usage of the array:\r
22 \r
23   - alloc - the number of elements allocated in total\r
24   - start - the first element in use in the array\r
25   - end - one past the end of the last element in the array\r
26 \r
27 This has the properties that:\r
28 \r
29   start == 0 - no space at the front\r
30   end == alloc - no space at the end\r
31   end - start - number of elements in the queue\r
32 \r
33 We use a perl hash (HV *) to store the mapping from ids to priorities.\r
34 \r
35 */\r
36 struct poe_queue_tag {\r
37   /* the first entry in use */\r
38   int start;\r
39 \r
40   /* 1 past the last entry in use, hence end - start is the number of \r
41      entries in the queue */\r
42   int end;\r
43 \r
44   /* the total number of entries allocated */\r
45   int alloc;\r
46 \r
47   /* used to generate item ids */\r
48   pq_id_t queue_seq;\r
49 \r
50   /* used to track in use item ids */\r
51   HV *ids;\r
52 \r
53   /* the actual entries */\r
54   pq_entry *entries;\r
55 };\r
56 \r
57 /*\r
58 poe_create - create a new queue object.\r
59 \r
60 No parameters.  returns the new queue object.\r
61 \r
62 */\r
63 poe_queue *\r
64 pq_create(void) {\r
65   poe_queue *pq = malloc(sizeof(poe_queue));\r
66   \r
67   if (pq == NULL)\r
68     croak("Out of memory");\r
69   pq->start = 0;\r
70   pq->end = 0;\r
71   pq->alloc = PQ_START_SIZE;\r
72   pq->queue_seq = 0;\r
73   pq->ids = newHV();\r
74   pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);\r
75   if (pq->entries == NULL)\r
76     croak("Out of memory");\r
77 \r
78   DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );\r
79 \r
80   return pq;\r
81 }\r
82 \r
83 /*\r
84 pq_delete - release the queue object.\r
85 \r
86 This also releases one reference from each SV in the queue.\r
87 \r
88 */\r
89 void\r
90 pq_delete(poe_queue *pq) {\r
91   int i;\r
92 \r
93   DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
94   if (pq->end > pq->start) {\r
95     for (i = pq->start; i < pq->end; ++i) {\r
96       SvREFCNT_dec(pq->entries[i].payload);\r
97     }\r
98   }\r
99   SvREFCNT_dec((SV *)pq->ids);\r
100   pq->ids = NULL;\r
101   if (pq->entries)\r
102     free(pq->entries);\r
103   pq->entries = NULL;\r
104   free(pq);\r
105 }\r
106 \r
107 /*\r
108 pq_new_id - generate a new item id.\r
109 \r
110 Internal use only.\r
111 \r
112 This, the following 3 functions and pq_create, pq_delete, should be\r
113 all that needs to be modified if we change hash implementations.\r
114 \r
115 */\r
116 static\r
117 pq_id_t\r
118 pq_new_id(poe_queue *pq, pq_priority_t priority) {\r
119   int seq = ++pq->queue_seq;\r
120   SV *index = newSViv(seq);\r
121 \r
122   while (hv_exists_ent(pq->ids, index, 0)) {\r
123     seq = ++pq->queue_seq;\r
124     sv_setiv(index, seq);\r
125   }\r
126   hv_store_ent(pq->ids, index, newSVnv(priority), 0);\r
127 \r
128   return seq;\r
129 }\r
130 \r
131 /*\r
132 pq_release_id - releases an id for future use.\r
133 */\r
134 static\r
135 void\r
136 pq_release_id(poe_queue *pq, pq_id_t id) {\r
137   SV *id_sv = sv_2mortal(newSViv(id));\r
138 \r
139   hv_delete_ent(pq->ids, id_sv, 0, 0);\r
140 }\r
141 \r
142 /*\r
143 pq_item_priority - get the priority of an item given it's id\r
144 */\r
145 static\r
146 int\r
147 pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {\r
148   HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);\r
149 \r
150   if (!entry)\r
151     return 0;\r
152 \r
153   *priority = SvNV(HeVAL(entry));\r
154 \r
155   return 1;\r
156 }\r
157 \r
158 /*\r
159 pq_set_id_priority - set the priority of an item in the id hash\r
160 */\r
161 static\r
162 void\r
163 pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {\r
164   HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);\r
165 \r
166   if (!entry)\r
167     croak("pq_set_priority: id not found");\r
168 \r
169   sv_setnv(HeVAL(entry), new_priority);\r
170 }\r
171 \r
172 /*\r
173 pq_realloc - make space at the front of back of the queue.\r
174 \r
175 This adjusts the queue to allow insertion of a single item at the\r
176 front or the back of the queue.\r
177 \r
178 If the queue has 33% or more space available we simple adjust the\r
179 position of the in-use items within the array.  We try not to push the\r
180 items right up against the opposite end of the array, since we might\r
181 need to insert items there too.\r
182 \r
183 If the queue has less than 33% space available we allocate another 50%\r
184 space.  We then only move the queue elements if we need space at the\r
185 front, since the reallocation has just opened up a huge space at the\r
186 back.  Since we're reallocating exponentially larger sizes we should\r
187 have a constant time cost on reallocation per queue item stored (but\r
188 other costs are going to be higher.)\r
189 \r
190 */\r
191 static\r
192 void\r
193 pq_realloc(poe_queue *pq, int at_end) {\r
194   int count = pq->end - pq->start;\r
195 \r
196   DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );\r
197   pq_dump(pq);\r
198   if (count * 3 / 2 < pq->alloc) {\r
199     /* 33 % or more space available, use some of it */\r
200     int new_start;\r
201 \r
202     if (at_end) {\r
203       new_start = (pq->alloc - count) / 3;\r
204     }\r
205     else {\r
206       new_start = (pq->alloc - count) * 2 / 3;\r
207     }\r
208     DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );\r
209     memmove(pq->entries + new_start, pq->entries + pq->start,\r
210             count * sizeof(pq_entry));\r
211     pq->start = new_start;\r
212     pq->end = new_start + count;\r
213   }\r
214   else {\r
215     int new_alloc = pq->alloc * 3 / 2;\r
216     pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);\r
217     pq->alloc = new_alloc;\r
218 \r
219     if (!pq->entries)\r
220       croak("Out of memory");\r
221 \r
222     DEBUG( fprintf(stderr, "  - expanding to %d entries\n", new_alloc) );\r
223 \r
224     if (!at_end) {\r
225       int new_start = (new_alloc - count) * 2 / 3;\r
226       DEBUG( fprintf(stderr, "  moving start to %d\n", new_start) );\r
227       memmove(pq->entries + new_start, pq->entries + pq->start,\r
228               count * sizeof(pq_entry));\r
229       pq->start = new_start;\r
230       pq->end = new_start + count;\r
231     }\r
232   }\r
233   pq_dump(pq);\r
234   DEBUG( fprintf(stderr, "  final: %d %d %d\n", pq->start, pq->end, pq->alloc) );\r
235 }\r
236 \r
237 /*\r
238 pq_insertion_point - figure out where to insert an item with the given\r
239 priority\r
240 \r
241 Internal.\r
242 */\r
243 static\r
244 int\r
245 pq_insertion_point(poe_queue *pq, pq_priority_t priority) {\r
246   /* for now this is just a linear search, later we should make it \r
247      binary */\r
248   int i = pq->end;\r
249   while (i > pq->start &&\r
250          priority < pq->entries[i-1].priority) {\r
251     --i;\r
252   }\r
253 \r
254   return i;\r
255 }\r
256 \r
257 int\r
258 pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {\r
259   int fill_at;\r
260   pq_id_t id = pq_new_id(pq, priority);\r
261 \r
262   DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );\r
263   if (pq->start == pq->end) {\r
264     DEBUG( fprintf(stderr, "  - on empty queue\n") );\r
265     /* allow room at front and back for new entries */\r
266     pq->start = pq->alloc / 3;\r
267     pq->end = pq->start + 1;\r
268     fill_at = pq->start;\r
269   }\r
270   else if (priority >= pq->entries[pq->end-1].priority) {\r
271     DEBUG( fprintf(stderr, "  - at the end\n") );\r
272     if (pq->end == pq->alloc)\r
273       /* past the end - need to realloc or make some space */\r
274       pq_realloc(pq, AT_END);\r
275     \r
276     fill_at = pq->end;\r
277     ++pq->end;\r
278   }\r
279   else if (priority < pq->entries[pq->start].priority) {\r
280     DEBUG( fprintf(stderr, "  - at the front\n") );\r
281     if (pq->start == 0)\r
282       /* no space at the front, make some */\r
283       pq_realloc(pq, AT_START);\r
284 \r
285     --pq->start;\r
286     fill_at = pq->start;\r
287   }\r
288   else {\r
289     int i;\r
290     DEBUG( fprintf(stderr, "  - in the middle\n") );\r
291     i = pq_insertion_point(pq, priority);\r
292     \r
293     /* if we're near the end we want to push entries up, otherwise down */\r
294     if (i - pq->start > (pq->end - pq->start) / 2) {\r
295       DEBUG( fprintf(stderr, "    - closer to the back (%d -> [ %d %d ])\n",\r
296                      i, pq->start, pq->end) );\r
297       /* make sure we have space, this might end up copying twice, \r
298          but too bad for now */\r
299       if (pq->end == pq->alloc) {\r
300         int old_start = pq->start;\r
301         pq_realloc(pq, AT_END);\r
302         i += pq->start - old_start;\r
303       }\r
304       \r
305       memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));\r
306       ++pq->end;\r
307       fill_at = i;\r
308     }\r
309     else {\r
310       DEBUG( fprintf(stderr, "    - closer to the front (%d -> [ %d %d ])\n",\r
311                      i, pq->start, pq->end) );\r
312       if (pq->start == 0) {\r
313         pq_realloc(pq, AT_START);\r
314         i += pq->start;\r
315       }\r
316       memmove(pq->entries + pq->start - 1, pq->entries + pq->start,\r
317              (i - pq->start) * sizeof(pq_entry));\r
318       --pq->start;\r
319       fill_at = i-1;\r
320     }\r
321   }\r
322   pq->entries[fill_at].priority = priority;\r
323   pq->entries[fill_at].id = id;\r
324   pq->entries[fill_at].payload = newSVsv(payload);\r
325 \r
326   return id;\r
327 }\r
328 \r
329 /*\r
330   Note: it's up to the caller to release the SV.  The XS code does this \r
331   by making it mortal.\r
332 */\r
333 int\r
334 pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {\r
335   pq_entry *entry;\r
336   /* the caller needs to release the payload (somehow) */\r
337   if (pq->start == pq->end)\r
338     return 0;\r
339 \r
340   entry = pq->entries + pq->start++;\r
341   *priority = entry->priority;\r
342   *id = entry->id;\r
343   *payload = entry->payload;\r
344   pq_release_id(pq, entry->id);\r
345 \r
346   return 1;\r
347 }\r
348 \r
349 int\r
350 pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {\r
351   if (pq->start == pq->end)\r
352     return 0;\r
353 \r
354   *priority = pq->entries[pq->start].priority;\r
355   return 1;\r
356 }\r
357 \r
358 int\r
359 pq_get_item_count(poe_queue *pq) {\r
360   return pq->end - pq->start;\r
361 }\r
362 \r
363 /*\r
364 pq_test_filter - the XS magic involved in passing the payload to a\r
365 filter function.\r
366 */\r
367 static\r
368 int\r
369 pq_test_filter(pq_entry *entry, SV *filter) {\r
370   /* man perlcall for the magic here */\r
371   dSP;\r
372   int count;\r
373   SV *result_sv;\r
374   int result;\r
375 \r
376   ENTER;\r
377   SAVETMPS;\r
378   PUSHMARK(SP);\r
379   XPUSHs(sv_2mortal(newSVsv(entry->payload)));\r
380   PUTBACK;\r
381 \r
382   count = call_sv(filter, G_SCALAR);\r
383 \r
384   SPAGAIN;\r
385 \r
386   if (count != 1) \r
387     croak("got other than 1 value in scalar context");\r
388 \r
389   result_sv = POPs;\r
390   result = SvTRUE(result_sv);\r
391 \r
392   PUTBACK;\r
393   FREETMPS;\r
394   LEAVE;\r
395 \r
396   return result;\r
397 }\r
398 \r
399 /*\r
400 pq_find_item - search for an item we know is there.\r
401 \r
402 Internal.\r
403 */\r
404 static\r
405 int\r
406 pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {\r
407   int i;\r
408 \r
409   for (i = pq->start; i < pq->end; ++i) {\r
410     if (pq->entries[i].id == id)\r
411       return i;\r
412   }\r
413   DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );\r
414   croak("Internal inconsistency: event should have been found");\r
415 }\r
416 \r
417 int\r
418 pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {\r
419   pq_priority_t priority;\r
420   int index;\r
421 \r
422   if (!pq_item_priority(pq, id, &priority)) {\r
423     errno = ESRCH;\r
424     return 0;\r
425   }\r
426 \r
427   index = pq_find_item(pq, id, priority);\r
428 \r
429   if (!pq_test_filter(pq->entries + index, filter)) {\r
430     errno = EPERM;\r
431     return 0;\r
432   }\r
433 \r
434   *removed = pq->entries[index];\r
435   pq_release_id(pq, id);\r
436   if (index == pq->start) {\r
437     ++pq->start;\r
438   }\r
439   else if (index == pq->end - 1) {\r
440     --pq->end;\r
441   }\r
442   else {\r
443     memmove(pq->entries + index, pq->entries + index + 1,\r
444             sizeof(pq_entry) * (pq->end - index - 1));\r
445     --pq->end;\r
446   }\r
447 \r
448   return 1;\r
449 }\r
450 \r
451 int\r
452 pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {\r
453   int in_index, out_index;\r
454   int remove_count = 0;\r
455   \r
456   *entries = NULL;\r
457   if (pq->start == pq->end)\r
458     return 0;\r
459 \r
460   *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));\r
461   if (!*entries)\r
462     croak("Out of memory");\r
463   \r
464   in_index = out_index = pq->start;\r
465   while (in_index < pq->end && remove_count < max_count) {\r
466     if (pq_test_filter(pq->entries + in_index, filter)) {\r
467       pq_release_id(pq, pq->entries[in_index].id);\r
468       (*entries)[remove_count++] = pq->entries[in_index++];\r
469     }\r
470     else {\r
471       pq->entries[out_index++] = pq->entries[in_index++];\r
472     }\r
473   }\r
474   while (in_index < pq->end) {\r
475     pq->entries[out_index++] = pq->entries[in_index++];\r
476   }\r
477   pq->end = out_index;\r
478   \r
479   return remove_count;\r
480 }\r
481 \r
482 /*\r
483 We need to keep the following 2 functions in sync (or combine the\r
484 common code.)\r
485 */\r
486 int\r
487 pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {\r
488   pq_priority_t old_priority;\r
489   int index, insert_at;\r
490 \r
491   if (!pq_item_priority(pq, id, &old_priority)) {\r
492     errno = ESRCH;\r
493     return 0;\r
494   }\r
495 \r
496   index = pq_find_item(pq, id, old_priority);\r
497 \r
498   if (!pq_test_filter(pq->entries + index, filter)) {\r
499     errno = EPERM;\r
500     return 0;\r
501   }\r
502 \r
503   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );\r
504 \r
505   if (pq->end - pq->start == 1) {\r
506     DEBUG( fprintf(stderr, "   -- one item\n") );\r
507     /* only the one item anyway */\r
508     pq->entries[pq->start].priority = new_priority;\r
509   }\r
510   else {\r
511     insert_at = pq_insertion_point(pq, new_priority);\r
512     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );\r
513     /* the item is still in the queue, so either side of it means it\r
514        won't move */\r
515     if (insert_at == index || insert_at == index+1) {\r
516       DEBUG( fprintf(stderr, "   -- change in place\n") );\r
517       pq->entries[index].priority = new_priority;\r
518     }\r
519     else {\r
520       pq_entry saved = pq->entries[index];\r
521       saved.priority = new_priority;\r
522 \r
523       if (insert_at < index) {\r
524         DEBUG( fprintf(stderr, "  - insert_at < index\n") );\r
525         memmove(pq->entries + insert_at + 1, pq->entries + insert_at,\r
526                 sizeof(pq_entry) * (index - insert_at));\r
527         pq->entries[insert_at] = saved;\r
528       }\r
529       else {\r
530         DEBUG( fprintf(stderr, "  - insert_at > index\n") );\r
531         --insert_at;\r
532         memmove(pq->entries + index, pq->entries + index + 1,\r
533                 sizeof(pq_entry) * (insert_at - index));\r
534         pq->entries[insert_at] = saved;\r
535       }\r
536     }\r
537   }\r
538 \r
539   pq_set_id_priority(pq, id, new_priority);\r
540 \r
541   return 1;  \r
542 }\r
543 \r
544 int\r
545 pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {\r
546   pq_priority_t old_priority, new_priority;\r
547   int index, insert_at;\r
548 \r
549   DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );\r
550 \r
551   if (!pq_item_priority(pq, id, &old_priority)) {\r
552     errno = ESRCH;\r
553     return 0;\r
554   }\r
555 \r
556   index = pq_find_item(pq, id, old_priority);\r
557 \r
558   if (!pq_test_filter(pq->entries + index, filter)) {\r
559     errno = EPERM;\r
560     return 0;\r
561   }\r
562 \r
563   new_priority = old_priority + delta;\r
564 \r
565   DEBUG( fprintf(stderr, " - index %d  oldp %f newp %f\n", index, old_priority, new_priority) );\r
566 \r
567   if (pq->end - pq->start == 1) {\r
568     DEBUG( fprintf(stderr, "   -- one item\n") );\r
569     /* only the one item anyway */\r
570     pq->entries[pq->start].priority = new_priority;\r
571   }\r
572   else {\r
573     insert_at = pq_insertion_point(pq, new_priority);\r
574     DEBUG( fprintf(stderr, "   - new index %d\n", insert_at) );\r
575     /* the item is still in the queue, so either side of it means it\r
576        won't move */\r
577     if (insert_at == index || insert_at == index+1) {\r
578       DEBUG( fprintf(stderr, "   -- change in place\n") );\r
579       pq->entries[index].priority = new_priority;\r
580     }\r
581     else {\r
582       pq_entry saved = pq->entries[index];\r
583       saved.priority = new_priority;\r
584 \r
585       if (insert_at < index) {\r
586         DEBUG( fprintf(stderr, "  - insert_at < index\n") );\r
587         memmove(pq->entries + insert_at + 1, pq->entries + insert_at,\r
588                 sizeof(pq_entry) * (index - insert_at));\r
589         pq->entries[insert_at] = saved;\r
590       }\r
591       else {\r
592         DEBUG( fprintf(stderr, "  - insert_at > index\n") );\r
593         --insert_at;\r
594         memmove(pq->entries + index, pq->entries + index + 1,\r
595                 sizeof(pq_entry) * (insert_at - index));\r
596         pq->entries[insert_at] = saved;\r
597       }\r
598     }\r
599   }\r
600 \r
601   pq_set_id_priority(pq, id, new_priority);\r
602   *priority = new_priority;\r
603 \r
604   return 1;  \r
605 }\r
606 \r
607 int\r
608 pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {\r
609   int count = 0;\r
610   int i;\r
611 \r
612   *items = NULL;\r
613   if (pq->end == pq->start)\r
614     return 0;\r
615 \r
616   *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));\r
617   for (i = pq->start; i < pq->end; ++i) {\r
618     if (pq_test_filter(pq->entries + i, filter)) {\r
619       (*items)[count++] = pq->entries[i];\r
620     }\r
621   }\r
622   if (!count) {\r
623     free(*items);\r
624     *items = NULL;\r
625   }\r
626 \r
627   return count;\r
628 }\r
629 \r
630 /*\r
631 pq_dump - dump the internals of the queue structure.\r
632 */\r
633 void\r
634 pq_dump(poe_queue *pq) {\r
635   int i;\r
636   HE *he;\r
637 \r
638   fprintf(stderr, "poe_queue\n");\r
639   fprintf(stderr, "  start: %d\n", pq->start);\r
640   fprintf(stderr, "    end: %d\n", pq->end);\r
641   fprintf(stderr, "  alloc: %d\n", pq->alloc);\r
642   fprintf(stderr, "    seq: %d\n", pq->queue_seq);\r
643   fprintf(stderr, "  **Queue Entries:\n"\r
644          "      index:   id  priority    SV\n");\r
645   for (i = pq->start; i < pq->end; ++i) {\r
646     pq_entry *entry = pq->entries + i;\r
647     fprintf(stderr, "      %5d: %5d %8f  %p (%u)\n", i, entry->id, entry->priority,\r
648            entry->payload, (unsigned)SvREFCNT(entry->payload));\r
649   }\r
650   fprintf(stderr, "  **Hash entries:\n");\r
651   hv_iterinit(pq->ids);\r
652   while ((he = hv_iternext(pq->ids)) != NULL) {\r
653     STRLEN len;\r
654     fprintf(stderr, "   %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));\r
655   }\r
656 }\r