split the working code out from the XS file
[poe-xs-queue-array.git] / queue.c
CommitLineData
a0e4f61f
TC
1#include "EXTERN.h"\r
2#include "perl.h"\r
3#include "XSUB.h"\r
4\r
5#include "queue.h"\r
6\r
7#define DEBUG(x) x\r
8/*#define DEBUG(x)*/\r
9\r
10#define PQ_START_SIZE 10\r
11#define AT_START 0\r
12#define AT_END 1\r
13\r
14pq_id_t queue_seq;\r
15\r
16/*\r
17We store the queue in a similar way to the way perl deals with arrays,\r
18we keep a block of memory, but the first element may or may not be in use,\r
19depending on the pattern of usage.\r
20\r
21There's 3 value controlling usage of the array:\r
22\r
23 - alloc - the number of elements allocated in total\r
24 - start - the first element in use in the array\r
25 - end - one past the end of the last element in the array\r
26\r
27This has the properties that:\r
28\r
29 start == 0 - no space at the front\r
30 end == alloc - no space at the end\r
31 end - start - number of elements in the queue\r
32\r
33We use a perl hash (HV *) to store the mapping from ids to priorities.\r
34\r
35*/\r
36struct poe_queue_tag {\r
37 /* the first entry in use */\r
38 int start;\r
39\r
40 /* 1 past the last entry in use, hence end - start is the number of \r
41 entries in the queue */\r
42 int end;\r
43\r
44 /* the total number of entries allocated */\r
45 int alloc;\r
46\r
47 /* used to generate item ids */\r
48 pq_id_t queue_seq;\r
49\r
50 /* used to track in use item ids */\r
51 HV *ids;\r
52\r
53 /* the actual entries */\r
54 pq_entry *entries;\r
55};\r
56\r
57/*\r
58poe_create - create a new queue object.\r
59\r
60No parameters. returns the new queue object.\r
61\r
62*/\r
63poe_queue *\r
64pq_create(void) {\r
65 poe_queue *pq = malloc(sizeof(poe_queue));\r
66 \r
67 if (pq == NULL)\r
68 croak("Out of memory");\r
69 pq->start = 0;\r
70 pq->end = 0;\r
71 pq->alloc = PQ_START_SIZE;\r
72 pq->queue_seq = 0;\r
73 pq->ids = newHV();\r
74 pq->entries = calloc(sizeof(pq_entry), PQ_START_SIZE);\r
75 if (pq->entries == NULL)\r
76 croak("Out of memory");\r
77\r
78 DEBUG( fprintf(stderr, "pq_create() => %p\n", pq) );\r
79\r
80 return pq;\r
81}\r
82\r
83/*\r
84pq_delete - release the queue object.\r
85\r
86This also releases one reference from each SV in the queue.\r
87\r
88*/\r
89void\r
90pq_delete(poe_queue *pq) {\r
91 int i;\r
92\r
93 DEBUG( fprintf(stderr, "pq_delete(%p)\n", pq) );\r
94 if (pq->end > pq->start) {\r
95 for (i = pq->start; i < pq->end; ++i) {\r
96 SvREFCNT_dec(pq->entries[i].payload);\r
97 }\r
98 }\r
99 SvREFCNT_dec((SV *)pq->ids);\r
100 pq->ids = NULL;\r
101 if (pq->entries)\r
102 free(pq->entries);\r
103 pq->entries = NULL;\r
104 free(pq);\r
105}\r
106\r
107/*\r
108pq_new_id - generate a new item id.\r
109\r
110Internal use only.\r
111\r
112This, the following 3 functions and pq_create, pq_delete, should be\r
113all that needs to be modified if we change hash implementations.\r
114\r
115*/\r
116static\r
117pq_id_t\r
118pq_new_id(poe_queue *pq, pq_priority_t priority) {\r
119 int seq = ++pq->queue_seq;\r
120 SV *index = newSViv(seq);\r
121\r
122 while (hv_exists_ent(pq->ids, index, 0)) {\r
123 seq = ++pq->queue_seq;\r
124 sv_setiv(index, seq);\r
125 }\r
126 hv_store_ent(pq->ids, index, newSVnv(priority), 0);\r
127\r
128 return seq;\r
129}\r
130\r
131/*\r
132pq_release_id - releases an id for future use.\r
133*/\r
134static\r
135void\r
136pq_release_id(poe_queue *pq, pq_id_t id) {\r
137 SV *id_sv = sv_2mortal(newSViv(id));\r
138\r
139 hv_delete_ent(pq->ids, id_sv, 0, 0);\r
140}\r
141\r
142/*\r
143pq_item_priority - get the priority of an item given it's id\r
144*/\r
145static\r
146int\r
147pq_item_priority(poe_queue *pq, pq_id_t id, pq_priority_t *priority) {\r
148 HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);\r
149\r
150 if (!entry)\r
151 return 0;\r
152\r
153 *priority = SvNV(HeVAL(entry));\r
154\r
155 return 1;\r
156}\r
157\r
158/*\r
159pq_set_id_priority - set the priority of an item in the id hash\r
160*/\r
161static\r
162void\r
163pq_set_id_priority(poe_queue *pq, pq_id_t id, pq_priority_t new_priority) {\r
164 HE *entry = hv_fetch_ent(pq->ids, sv_2mortal(newSViv(id)), 0, 0);\r
165\r
166 if (!entry)\r
167 croak("pq_set_priority: id not found");\r
168\r
169 sv_setnv(HeVAL(entry), new_priority);\r
170}\r
171\r
172/*\r
173pq_realloc - make space at the front of back of the queue.\r
174\r
175This adjusts the queue to allow insertion of a single item at the\r
176front or the back of the queue.\r
177\r
178If the queue has 33% or more space available we simple adjust the\r
179position of the in-use items within the array. We try not to push the\r
180items right up against the opposite end of the array, since we might\r
181need to insert items there too.\r
182\r
183If the queue has less than 33% space available we allocate another 50%\r
184space. We then only move the queue elements if we need space at the\r
185front, since the reallocation has just opened up a huge space at the\r
186back. Since we're reallocating exponentially larger sizes we should\r
187have a constant time cost on reallocation per queue item stored (but\r
188other costs are going to be higher.)\r
189\r
190*/\r
191static\r
192void\r
193pq_realloc(poe_queue *pq, int at_end) {\r
194 int count = pq->end - pq->start;\r
195\r
196 DEBUG( fprintf(stderr, "pq_realloc((%d, %d, %d), %d)\n", pq->start, pq->end, pq->alloc, at_end) );\r
197 pq_dump(pq);\r
198 if (count * 3 / 2 < pq->alloc) {\r
199 /* 33 % or more space available, use some of it */\r
200 int new_start;\r
201\r
202 if (at_end) {\r
203 new_start = (pq->alloc - count) / 3;\r
204 }\r
205 else {\r
206 new_start = (pq->alloc - count) * 2 / 3;\r
207 }\r
208 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );\r
209 memmove(pq->entries + new_start, pq->entries + pq->start,\r
210 count * sizeof(pq_entry));\r
211 pq->start = new_start;\r
212 pq->end = new_start + count;\r
213 }\r
214 else {\r
215 int new_alloc = pq->alloc * 3 / 2;\r
216 pq->entries = realloc(pq->entries, sizeof(pq_entry) * new_alloc);\r
217 pq->alloc = new_alloc;\r
218\r
219 if (!pq->entries)\r
220 croak("Out of memory");\r
221\r
222 DEBUG( fprintf(stderr, " - expanding to %d entries\n", new_alloc) );\r
223\r
224 if (!at_end) {\r
225 int new_start = (new_alloc - count) * 2 / 3;\r
226 DEBUG( fprintf(stderr, " moving start to %d\n", new_start) );\r
227 memmove(pq->entries + new_start, pq->entries + pq->start,\r
228 count * sizeof(pq_entry));\r
229 pq->start = new_start;\r
230 pq->end = new_start + count;\r
231 }\r
232 }\r
233 pq_dump(pq);\r
234 DEBUG( fprintf(stderr, " final: %d %d %d\n", pq->start, pq->end, pq->alloc) );\r
235}\r
236\r
237/*\r
238pq_insertion_point - figure out where to insert an item with the given\r
239priority\r
240\r
241Internal.\r
242*/\r
243static\r
244int\r
245pq_insertion_point(poe_queue *pq, pq_priority_t priority) {\r
246 /* for now this is just a linear search, later we should make it \r
247 binary */\r
248 int i = pq->end;\r
249 while (i > pq->start &&\r
250 priority < pq->entries[i-1].priority) {\r
251 --i;\r
252 }\r
253\r
254 return i;\r
255}\r
256\r
257int\r
258pq_enqueue(poe_queue *pq, pq_priority_t priority, SV *payload) {\r
259 int fill_at;\r
260 pq_id_t id = pq_new_id(pq, priority);\r
261\r
262 DEBUG( fprintf(stderr, "pq_enqueue(%f, %p)\n", priority, payload) );\r
263 if (pq->start == pq->end) {\r
264 DEBUG( fprintf(stderr, " - on empty queue\n") );\r
265 /* allow room at front and back for new entries */\r
266 pq->start = pq->alloc / 3;\r
267 pq->end = pq->start + 1;\r
268 fill_at = pq->start;\r
269 }\r
270 else if (priority >= pq->entries[pq->end-1].priority) {\r
271 DEBUG( fprintf(stderr, " - at the end\n") );\r
272 if (pq->end == pq->alloc)\r
273 /* past the end - need to realloc or make some space */\r
274 pq_realloc(pq, AT_END);\r
275 \r
276 fill_at = pq->end;\r
277 ++pq->end;\r
278 }\r
279 else if (priority < pq->entries[pq->start].priority) {\r
280 DEBUG( fprintf(stderr, " - at the front\n") );\r
281 if (pq->start == 0)\r
282 /* no space at the front, make some */\r
283 pq_realloc(pq, AT_START);\r
284\r
285 --pq->start;\r
286 fill_at = pq->start;\r
287 }\r
288 else {\r
289 int i;\r
290 DEBUG( fprintf(stderr, " - in the middle\n") );\r
291 i = pq_insertion_point(pq, priority);\r
292 \r
293 /* if we're near the end we want to push entries up, otherwise down */\r
294 if (i - pq->start > (pq->end - pq->start) / 2) {\r
295 DEBUG( fprintf(stderr, " - closer to the back (%d -> [ %d %d ])\n",\r
296 i, pq->start, pq->end) );\r
297 /* make sure we have space, this might end up copying twice, \r
298 but too bad for now */\r
299 if (pq->end == pq->alloc) {\r
300 int old_start = pq->start;\r
301 pq_realloc(pq, AT_END);\r
302 i += pq->start - old_start;\r
303 }\r
304 \r
305 memmove(pq->entries + i + 1, pq->entries + i, (pq->end - i) * sizeof(pq_entry));\r
306 ++pq->end;\r
307 fill_at = i;\r
308 }\r
309 else {\r
310 DEBUG( fprintf(stderr, " - closer to the front (%d -> [ %d %d ])\n",\r
311 i, pq->start, pq->end) );\r
312 if (pq->start == 0) {\r
313 pq_realloc(pq, AT_START);\r
314 i += pq->start;\r
315 }\r
316 memmove(pq->entries + pq->start - 1, pq->entries + pq->start,\r
317 (i - pq->start) * sizeof(pq_entry));\r
318 --pq->start;\r
319 fill_at = i-1;\r
320 }\r
321 }\r
322 pq->entries[fill_at].priority = priority;\r
323 pq->entries[fill_at].id = id;\r
324 pq->entries[fill_at].payload = newSVsv(payload);\r
325\r
326 return id;\r
327}\r
328\r
329/*\r
330 Note: it's up to the caller to release the SV. The XS code does this \r
331 by making it mortal.\r
332*/\r
333int\r
334pq_dequeue_next(poe_queue *pq, pq_priority_t *priority, pq_id_t *id, SV **payload) {\r
335 pq_entry *entry;\r
336 /* the caller needs to release the payload (somehow) */\r
337 if (pq->start == pq->end)\r
338 return 0;\r
339\r
340 entry = pq->entries + pq->start++;\r
341 *priority = entry->priority;\r
342 *id = entry->id;\r
343 *payload = entry->payload;\r
344 pq_release_id(pq, entry->id);\r
345\r
346 return 1;\r
347}\r
348\r
349int\r
350pq_get_next_priority(poe_queue *pq, pq_priority_t *priority) {\r
351 if (pq->start == pq->end)\r
352 return 0;\r
353\r
354 *priority = pq->entries[pq->start].priority;\r
355 return 1;\r
356}\r
357\r
358int\r
359pq_get_item_count(poe_queue *pq) {\r
360 return pq->end - pq->start;\r
361}\r
362\r
363/*\r
364pq_test_filter - the XS magic involved in passing the payload to a\r
365filter function.\r
366*/\r
367static\r
368int\r
369pq_test_filter(pq_entry *entry, SV *filter) {\r
370 /* man perlcall for the magic here */\r
371 dSP;\r
372 int count;\r
373 SV *result_sv;\r
374 int result;\r
375\r
376 ENTER;\r
377 SAVETMPS;\r
378 PUSHMARK(SP);\r
379 XPUSHs(sv_2mortal(newSVsv(entry->payload)));\r
380 PUTBACK;\r
381\r
382 count = call_sv(filter, G_SCALAR);\r
383\r
384 SPAGAIN;\r
385\r
386 if (count != 1) \r
387 croak("got other than 1 value in scalar context");\r
388\r
389 result_sv = POPs;\r
390 result = SvTRUE(result_sv);\r
391\r
392 PUTBACK;\r
393 FREETMPS;\r
394 LEAVE;\r
395\r
396 return result;\r
397}\r
398\r
399/*\r
400pq_find_item - search for an item we know is there.\r
401\r
402Internal.\r
403*/\r
404static\r
405int\r
406pq_find_item(poe_queue *pq, pq_id_t id, pq_priority_t priority) {\r
407 int i;\r
408\r
409 for (i = pq->start; i < pq->end; ++i) {\r
410 if (pq->entries[i].id == id)\r
411 return i;\r
412 }\r
413 DEBUG(fprintf(stderr, "pq_find_item %d => %f\n", id, priority) );\r
414 croak("Internal inconsistency: event should have been found");\r
415}\r
416\r
417int\r
418pq_remove_item(poe_queue *pq, pq_id_t id, SV *filter, pq_entry *removed) {\r
419 pq_priority_t priority;\r
420 int index;\r
421\r
422 if (!pq_item_priority(pq, id, &priority)) {\r
423 errno = ESRCH;\r
424 return 0;\r
425 }\r
426\r
427 index = pq_find_item(pq, id, priority);\r
428\r
429 if (!pq_test_filter(pq->entries + index, filter)) {\r
430 errno = EPERM;\r
431 return 0;\r
432 }\r
433\r
434 *removed = pq->entries[index];\r
435 pq_release_id(pq, id);\r
436 if (index == pq->start) {\r
437 ++pq->start;\r
438 }\r
439 else if (index == pq->end - 1) {\r
440 --pq->end;\r
441 }\r
442 else {\r
443 memmove(pq->entries + index, pq->entries + index + 1,\r
444 sizeof(pq_entry) * (pq->end - index - 1));\r
445 --pq->end;\r
446 }\r
447\r
448 return 1;\r
449}\r
450\r
451int\r
452pq_remove_items(poe_queue *pq, SV *filter, int max_count, pq_entry **entries) {\r
453 int in_index, out_index;\r
454 int remove_count = 0;\r
455 \r
456 *entries = NULL;\r
457 if (pq->start == pq->end)\r
458 return 0;\r
459\r
460 *entries = malloc(sizeof(pq_entry) * (pq->end - pq->start));\r
461 if (!*entries)\r
462 croak("Out of memory");\r
463 \r
464 in_index = out_index = pq->start;\r
465 while (in_index < pq->end && remove_count < max_count) {\r
466 if (pq_test_filter(pq->entries + in_index, filter)) {\r
467 pq_release_id(pq, pq->entries[in_index].id);\r
468 (*entries)[remove_count++] = pq->entries[in_index++];\r
469 }\r
470 else {\r
471 pq->entries[out_index++] = pq->entries[in_index++];\r
472 }\r
473 }\r
474 while (in_index < pq->end) {\r
475 pq->entries[out_index++] = pq->entries[in_index++];\r
476 }\r
477 pq->end = out_index;\r
478 \r
479 return remove_count;\r
480}\r
481\r
482/*\r
483We need to keep the following 2 functions in sync (or combine the\r
484common code.)\r
485*/\r
486int\r
487pq_set_priority(poe_queue *pq, pq_id_t id, SV *filter, pq_priority_t new_priority) {\r
488 pq_priority_t old_priority;\r
489 int index, insert_at;\r
490\r
491 if (!pq_item_priority(pq, id, &old_priority)) {\r
492 errno = ESRCH;\r
493 return 0;\r
494 }\r
495\r
496 index = pq_find_item(pq, id, old_priority);\r
497\r
498 if (!pq_test_filter(pq->entries + index, filter)) {\r
499 errno = EPERM;\r
500 return 0;\r
501 }\r
502\r
503 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );\r
504\r
505 if (pq->end - pq->start == 1) {\r
506 DEBUG( fprintf(stderr, " -- one item\n") );\r
507 /* only the one item anyway */\r
508 pq->entries[pq->start].priority = new_priority;\r
509 }\r
510 else {\r
511 insert_at = pq_insertion_point(pq, new_priority);\r
512 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );\r
513 /* the item is still in the queue, so either side of it means it\r
514 won't move */\r
515 if (insert_at == index || insert_at == index+1) {\r
516 DEBUG( fprintf(stderr, " -- change in place\n") );\r
517 pq->entries[index].priority = new_priority;\r
518 }\r
519 else {\r
520 pq_entry saved = pq->entries[index];\r
521 saved.priority = new_priority;\r
522\r
523 if (insert_at < index) {\r
524 DEBUG( fprintf(stderr, " - insert_at < index\n") );\r
525 memmove(pq->entries + insert_at + 1, pq->entries + insert_at,\r
526 sizeof(pq_entry) * (index - insert_at));\r
527 pq->entries[insert_at] = saved;\r
528 }\r
529 else {\r
530 DEBUG( fprintf(stderr, " - insert_at > index\n") );\r
531 --insert_at;\r
532 memmove(pq->entries + index, pq->entries + index + 1,\r
533 sizeof(pq_entry) * (insert_at - index));\r
534 pq->entries[insert_at] = saved;\r
535 }\r
536 }\r
537 }\r
538\r
539 pq_set_id_priority(pq, id, new_priority);\r
540\r
541 return 1; \r
542}\r
543\r
544int\r
545pq_adjust_priority(poe_queue *pq, pq_id_t id, SV *filter, double delta, pq_priority_t *priority) {\r
546 pq_priority_t old_priority, new_priority;\r
547 int index, insert_at;\r
548\r
549 DEBUG( fprintf(stderr, "pq_adjust_priority(..., %d, %p, %f, ...)\n", id, filter, delta) );\r
550\r
551 if (!pq_item_priority(pq, id, &old_priority)) {\r
552 errno = ESRCH;\r
553 return 0;\r
554 }\r
555\r
556 index = pq_find_item(pq, id, old_priority);\r
557\r
558 if (!pq_test_filter(pq->entries + index, filter)) {\r
559 errno = EPERM;\r
560 return 0;\r
561 }\r
562\r
563 new_priority = old_priority + delta;\r
564\r
565 DEBUG( fprintf(stderr, " - index %d oldp %f newp %f\n", index, old_priority, new_priority) );\r
566\r
567 if (pq->end - pq->start == 1) {\r
568 DEBUG( fprintf(stderr, " -- one item\n") );\r
569 /* only the one item anyway */\r
570 pq->entries[pq->start].priority = new_priority;\r
571 }\r
572 else {\r
573 insert_at = pq_insertion_point(pq, new_priority);\r
574 DEBUG( fprintf(stderr, " - new index %d\n", insert_at) );\r
575 /* the item is still in the queue, so either side of it means it\r
576 won't move */\r
577 if (insert_at == index || insert_at == index+1) {\r
578 DEBUG( fprintf(stderr, " -- change in place\n") );\r
579 pq->entries[index].priority = new_priority;\r
580 }\r
581 else {\r
582 pq_entry saved = pq->entries[index];\r
583 saved.priority = new_priority;\r
584\r
585 if (insert_at < index) {\r
586 DEBUG( fprintf(stderr, " - insert_at < index\n") );\r
587 memmove(pq->entries + insert_at + 1, pq->entries + insert_at,\r
588 sizeof(pq_entry) * (index - insert_at));\r
589 pq->entries[insert_at] = saved;\r
590 }\r
591 else {\r
592 DEBUG( fprintf(stderr, " - insert_at > index\n") );\r
593 --insert_at;\r
594 memmove(pq->entries + index, pq->entries + index + 1,\r
595 sizeof(pq_entry) * (insert_at - index));\r
596 pq->entries[insert_at] = saved;\r
597 }\r
598 }\r
599 }\r
600\r
601 pq_set_id_priority(pq, id, new_priority);\r
602 *priority = new_priority;\r
603\r
604 return 1; \r
605}\r
606\r
607int\r
608pq_peek_items(poe_queue *pq, SV *filter, int max_count, pq_entry **items) {\r
609 int count = 0;\r
610 int i;\r
611\r
612 *items = NULL;\r
613 if (pq->end == pq->start)\r
614 return 0;\r
615\r
616 *items = malloc(sizeof(pq_entry) * (pq->end - pq->start));\r
617 for (i = pq->start; i < pq->end; ++i) {\r
618 if (pq_test_filter(pq->entries + i, filter)) {\r
619 (*items)[count++] = pq->entries[i];\r
620 }\r
621 }\r
622 if (!count) {\r
623 free(*items);\r
624 *items = NULL;\r
625 }\r
626\r
627 return count;\r
628}\r
629\r
630/*\r
631pq_dump - dump the internals of the queue structure.\r
632*/\r
633void\r
634pq_dump(poe_queue *pq) {\r
635 int i;\r
636 HE *he;\r
637\r
638 fprintf(stderr, "poe_queue\n");\r
639 fprintf(stderr, " start: %d\n", pq->start);\r
640 fprintf(stderr, " end: %d\n", pq->end);\r
641 fprintf(stderr, " alloc: %d\n", pq->alloc);\r
642 fprintf(stderr, " seq: %d\n", pq->queue_seq);\r
643 fprintf(stderr, " **Queue Entries:\n"\r
644 " index: id priority SV\n");\r
645 for (i = pq->start; i < pq->end; ++i) {\r
646 pq_entry *entry = pq->entries + i;\r
647 fprintf(stderr, " %5d: %5d %8f %p (%u)\n", i, entry->id, entry->priority,\r
648 entry->payload, (unsigned)SvREFCNT(entry->payload));\r
649 }\r
650 fprintf(stderr, " **Hash entries:\n");\r
651 hv_iterinit(pq->ids);\r
652 while ((he = hv_iternext(pq->ids)) != NULL) {\r
653 STRLEN len;\r
654 fprintf(stderr, " %s => %f\n", HePV(he, len), SvNV(hv_iterval(pq->ids, he)));\r
655 }\r
656}\r