00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 #ifndef STXXL_PRIORITY_QUEUE_HEADER
00017 #define STXXL_PRIORITY_QUEUE_HEADER
00018
00019 #include <vector>
00020
00021 #include <stxxl/bits/deprecated.h>
00022 #include <stxxl/bits/mng/typed_block.h>
00023 #include <stxxl/bits/mng/block_alloc.h>
00024 #include <stxxl/bits/mng/read_write_pool.h>
00025 #include <stxxl/bits/mng/prefetch_pool.h>
00026 #include <stxxl/bits/mng/write_pool.h>
00027 #include <stxxl/bits/common/tmeta.h>
00028 #include <stxxl/bits/algo/sort_base.h>
00029 #include <stxxl/bits/parallel.h>
00030 #include <stxxl/bits/common/is_sorted.h>
00031
00032 #if STXXL_PARALLEL
00033
00034 #if defined(STXXL_PARALLEL_MODE) && ((__GNUC__ * 10000 + __GNUC_MINOR__ * 100) < 40400)
00035 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00036 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00037 #undef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00038 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 0
00039 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 0
00040 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 0
00041 #endif
00042
00043
00044 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00045 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL 1
00046 #endif
00047 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00048 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL 1
00049 #endif
00050 #ifndef STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00051 #define STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER 1
00052 #endif
00053
00054 #endif //STXXL_PARALLEL
00055
00056 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_EXTERNAL
00057 #define STXXL_PQ_EXTERNAL_LOSER_TREE 0 // no loser tree for the external sequences
00058 #else
00059 #define STXXL_PQ_EXTERNAL_LOSER_TREE 1
00060 #endif
00061
00062 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_INTERNAL
00063 #define STXXL_PQ_INTERNAL_LOSER_TREE 0 // no loser tree for the internal sequences
00064 #else
00065 #define STXXL_PQ_INTERNAL_LOSER_TREE 1
00066 #endif
00067
00068 #define STXXL_VERBOSE_PQ(msg) STXXL_VERBOSE2("[" << static_cast<void *>(this) << "] priority_queue::" << msg)
00069
00070 #include <stxxl/bits/containers/pq_helpers.h>
00071 #include <stxxl/bits/containers/pq_mergers.h>
00072 #include <stxxl/bits/containers/pq_ext_merger.h>
00073 #include <stxxl/bits/containers/pq_losertree.h>
00074
00075 __STXXL_BEGIN_NAMESPACE
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087 template <
00088 class Tp_,
00089 class Cmp_,
00090 unsigned BufferSize1_ = 32,
00091 unsigned N_ = 512,
00092 unsigned IntKMAX_ = 64,
00093 unsigned IntLevels_ = 4,
00094 unsigned BlockSize_ = (2 * 1024 * 1024),
00095 unsigned ExtKMAX_ = 64,
00096 unsigned ExtLevels_ = 2,
00097 class AllocStr_ = STXXL_DEFAULT_ALLOC_STRATEGY
00098 >
00099 struct priority_queue_config
00100 {
00101 typedef Tp_ value_type;
00102 typedef Cmp_ comparator_type;
00103 typedef AllocStr_ alloc_strategy_type;
00104 enum
00105 {
00106 delete_buffer_size = BufferSize1_,
00107 N = N_,
00108 IntKMAX = IntKMAX_,
00109 num_int_groups = IntLevels_,
00110 num_ext_groups = ExtLevels_,
00111 BlockSize = BlockSize_,
00112 ExtKMAX = ExtKMAX_,
00113 element_size = sizeof(Tp_)
00114 };
00115 };
00116
00117 __STXXL_END_NAMESPACE
00118
00119 namespace std
00120 {
00121 template <class BlockType_,
00122 class Cmp_,
00123 unsigned Arity_,
00124 class AllocStr_>
00125 void swap(stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & a,
00126 stxxl::priority_queue_local::ext_merger<BlockType_, Cmp_, Arity_, AllocStr_> & b)
00127 {
00128 a.swap(b);
00129 }
00130 template <class ValTp_, class Cmp_, unsigned KNKMAX>
00131 void swap(stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & a,
00132 stxxl::priority_queue_local::loser_tree<ValTp_, Cmp_, KNKMAX> & b)
00133 {
00134 a.swap(b);
00135 }
00136 }
00137
00138 __STXXL_BEGIN_NAMESPACE
00139
00141 template <class Config_>
00142 class priority_queue : private noncopyable
00143 {
00144 public:
00145 typedef Config_ Config;
00146 enum
00147 {
00148 delete_buffer_size = Config::delete_buffer_size,
00149 N = Config::N,
00150 IntKMAX = Config::IntKMAX,
00151 num_int_groups = Config::num_int_groups,
00152 num_ext_groups = Config::num_ext_groups,
00153 total_num_groups = Config::num_int_groups + Config::num_ext_groups,
00154 BlockSize = Config::BlockSize,
00155 ExtKMAX = Config::ExtKMAX
00156 };
00157
00159 typedef typename Config::value_type value_type;
00161 typedef typename Config::comparator_type comparator_type;
00162 typedef typename Config::alloc_strategy_type alloc_strategy_type;
00164 typedef stxxl::uint64 size_type;
00165 typedef typed_block<BlockSize, value_type> block_type;
00166 typedef read_write_pool<block_type> pool_type;
00167
00168 protected:
00169 typedef priority_queue_local::internal_priority_queue<value_type, std::vector<value_type>, comparator_type>
00170 insert_heap_type;
00171
00172 typedef priority_queue_local::loser_tree<
00173 value_type,
00174 comparator_type,
00175 IntKMAX> int_merger_type;
00176
00177 typedef priority_queue_local::ext_merger<
00178 block_type,
00179 comparator_type,
00180 ExtKMAX,
00181 alloc_strategy_type> ext_merger_type;
00182
00183
00184 int_merger_type int_mergers[num_int_groups];
00185 pool_type * pool;
00186 bool pool_owned;
00187 ext_merger_type ** ext_mergers;
00188
00189
00190 value_type group_buffers[total_num_groups][N + 1];
00191 value_type * group_buffer_current_mins[total_num_groups];
00192
00193
00194 value_type delete_buffer[delete_buffer_size + 1];
00195 value_type * delete_buffer_current_min;
00196 value_type * delete_buffer_end;
00197
00198 comparator_type cmp;
00199
00200
00201 insert_heap_type insert_heap;
00202
00203
00204 unsigned_type num_active_groups;
00205
00206
00207 size_type size_;
00208
00209 private:
00210 void init();
00211
00212 void refill_delete_buffer();
00213 unsigned_type refill_group_buffer(unsigned_type k);
00214
00215 unsigned_type make_space_available(unsigned_type level);
00216 void empty_insert_heap();
00217
00218 value_type get_supremum() const { return cmp.min_value(); }
00219 unsigned_type current_delete_buffer_size() const { return delete_buffer_end - delete_buffer_current_min; }
00220 unsigned_type current_group_buffer_size(unsigned_type i) const { return &(group_buffers[i][N]) - group_buffer_current_mins[i]; }
00221
00222 public:
00228 priority_queue(pool_type & pool_);
00229
00239 _STXXL_DEPRECATED(priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_));
00240
00250 priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem);
00251
00252 #if 0
00253
00254
00255 void swap(priority_queue & obj)
00256 {
00257
00258 for (unsigned_type i = 0; i < num_int_groups; ++i)
00259 std::swap(int_mergers[i], obj.int_mergers[i]);
00260
00261
00262
00263 std::swap(ext_mergers, obj.ext_mergers);
00264 for (unsigned_type i1 = 0; i1 < total_num_groups; ++i1)
00265 for (unsigned_type i2 = 0; i2 < (N + 1); ++i2)
00266 std::swap(group_buffers[i1][i2], obj.group_buffers[i1][i2]);
00267
00268 STXXL_STATIC_ASSERT(false);
00269
00270
00271 swap_1D_arrays(group_buffer_current_mins, obj.group_buffer_current_mins, total_num_groups);
00272 swap_1D_arrays(delete_buffer, obj.delete_buffer, delete_buffer_size + 1);
00273 std::swap(delete_buffer_current_min, obj.delete_buffer_current_min);
00274 std::swap(delete_buffer_end, obj.delete_buffer_end);
00275 std::swap(cmp, obj.cmp);
00276 std::swap(insert_heap, obj.insert_heap);
00277 std::swap(num_active_groups, obj.num_active_groups);
00278 std::swap(size_, obj.size_);
00279 }
00280 #endif
00281
00282 virtual ~priority_queue();
00283
00286 size_type size() const;
00287
00290 bool empty() const { return (size() == 0); }
00291
00303 const value_type & top() const;
00304
00311 void pop();
00312
00317 void push(const value_type & obj);
00318
00323 unsigned_type mem_cons() const
00324 {
00325 unsigned_type dynam_alloc_mem = 0;
00326
00327
00328 for (int i = 0; i < num_int_groups; ++i)
00329 dynam_alloc_mem += int_mergers[i].mem_cons();
00330
00331 for (int i = 0; i < num_ext_groups; ++i)
00332 dynam_alloc_mem += ext_mergers[i]->mem_cons();
00333
00334
00335 return (sizeof(*this) +
00336 sizeof(ext_merger_type) * num_ext_groups +
00337 dynam_alloc_mem);
00338 }
00339
00340 void dump_sizes() const;
00341 void dump_params() const;
00342 };
00343
00344
00345 template <class Config_>
00346 inline typename priority_queue<Config_>::size_type priority_queue<Config_>::size() const
00347 {
00348 return size_ +
00349 insert_heap.size() - 1 +
00350 (delete_buffer_end - delete_buffer_current_min);
00351 }
00352
00353
00354 template <class Config_>
00355 inline const typename priority_queue<Config_>::value_type & priority_queue<Config_>::top() const
00356 {
00357 assert(!insert_heap.empty());
00358
00359 const typename priority_queue<Config_>::value_type & t = insert_heap.top();
00360 if ( cmp(*delete_buffer_current_min, t))
00361 return t;
00362 else
00363 return *delete_buffer_current_min;
00364 }
00365
00366 template <class Config_>
00367 inline void priority_queue<Config_>::pop()
00368 {
00369
00370 assert(!insert_heap.empty());
00371
00372 if ( cmp(*delete_buffer_current_min, insert_heap.top()))
00373 insert_heap.pop();
00374 else
00375 {
00376 assert(delete_buffer_current_min < delete_buffer_end);
00377 ++delete_buffer_current_min;
00378 if (delete_buffer_current_min == delete_buffer_end)
00379 refill_delete_buffer();
00380 }
00381 }
00382
00383 template <class Config_>
00384 inline void priority_queue<Config_>::push(const value_type & obj)
00385 {
00386
00387 assert(int_mergers->not_sentinel(obj));
00388 if (insert_heap.size() == N + 1)
00389 empty_insert_heap();
00390
00391
00392 assert(!insert_heap.empty());
00393
00394 insert_heap.push(obj);
00395 }
00396
00397
00399
00400 template <class Config_>
00401 priority_queue<Config_>::priority_queue(pool_type & pool_) :
00402 pool(&pool_),
00403 pool_owned(false),
00404 delete_buffer_end(delete_buffer + delete_buffer_size),
00405 insert_heap(N + 2),
00406 num_active_groups(0), size_(0)
00407 {
00408 STXXL_VERBOSE_PQ("priority_queue(pool)");
00409 init();
00410 }
00411
00412
00413 template <class Config_>
00414 priority_queue<Config_>::priority_queue(prefetch_pool<block_type> & p_pool_, write_pool<block_type> & w_pool_) :
00415 pool(new pool_type(p_pool_, w_pool_)),
00416 pool_owned(true),
00417 delete_buffer_end(delete_buffer + delete_buffer_size),
00418 insert_heap(N + 2),
00419 num_active_groups(0), size_(0)
00420 {
00421 STXXL_VERBOSE_PQ("priority_queue(p_pool, w_pool)");
00422 init();
00423 }
00424
00425 template <class Config_>
00426 priority_queue<Config_>::priority_queue(unsigned_type p_pool_mem, unsigned_type w_pool_mem) :
00427 pool(new pool_type(p_pool_mem / BlockSize, w_pool_mem / BlockSize)),
00428 pool_owned(true),
00429 delete_buffer_end(delete_buffer + delete_buffer_size),
00430 insert_heap(N + 2),
00431 num_active_groups(0), size_(0)
00432 {
00433 STXXL_VERBOSE_PQ("priority_queue(pool sizes)");
00434 init();
00435 }
00436
00437 template <class Config_>
00438 void priority_queue<Config_>::init()
00439 {
00440 assert(!cmp(cmp.min_value(), cmp.min_value()));
00441
00442 ext_mergers = new ext_merger_type*[num_ext_groups];
00443 for (unsigned_type j = 0; j < num_ext_groups; ++j) {
00444 ext_mergers[j] = new ext_merger_type;
00445 ext_mergers[j]->set_pool(pool);
00446 }
00447
00448 value_type sentinel = cmp.min_value();
00449 insert_heap.push(sentinel);
00450 delete_buffer[delete_buffer_size] = sentinel;
00451 delete_buffer_current_min = delete_buffer_end;
00452 for (unsigned_type i = 0; i < total_num_groups; i++)
00453 {
00454 group_buffers[i][N] = sentinel;
00455 group_buffer_current_mins[i] = &(group_buffers[i][N]);
00456 }
00457 }
00458
00459 template <class Config_>
00460 priority_queue<Config_>::~priority_queue()
00461 {
00462 STXXL_VERBOSE_PQ("~priority_queue()");
00463 if (pool_owned)
00464 delete pool;
00465
00466 for (unsigned_type j = 0; j < num_ext_groups; ++j)
00467 delete ext_mergers[j];
00468 delete[] ext_mergers;
00469 }
00470
00471
00472
00473
00474 template <class Config_>
00475 unsigned_type priority_queue<Config_>::refill_group_buffer(unsigned_type group)
00476 {
00477 STXXL_VERBOSE_PQ("refill_group_buffer(" << group << ")");
00478
00479 value_type * target;
00480 unsigned_type length;
00481 size_type group_size = (group < num_int_groups) ?
00482 int_mergers[group].size() :
00483 ext_mergers[group - num_int_groups]->size();
00484 unsigned_type left_elements = group_buffers[group] + N - group_buffer_current_mins[group];
00485 if (group_size + left_elements >= size_type(N))
00486 {
00487 target = group_buffers[group];
00488 length = N - left_elements;
00489 }
00490 else
00491 {
00492 target = group_buffers[group] + N - group_size - left_elements;
00493 length = group_size;
00494 }
00495
00496 if (length > 0)
00497 {
00498
00499 memmove(target, group_buffer_current_mins[group], left_elements * sizeof(value_type));
00500 group_buffer_current_mins[group] = target;
00501
00502
00503 if (group < num_int_groups)
00504 int_mergers[group].multi_merge(target + left_elements, length);
00505 else
00506 ext_mergers[group - num_int_groups]->multi_merge(
00507 target + left_elements,
00508 target + left_elements + length);
00509 }
00510
00511
00512
00513 #if STXXL_CHECK_ORDER_IN_SORTS
00514 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00515 if (!stxxl::is_sorted(group_buffer_current_mins[group], group_buffers[group] + N, inv_cmp))
00516 {
00517 STXXL_VERBOSE_PQ("refill_grp... length: " << length << " left_elements: " << left_elements);
00518 for (value_type * v = group_buffer_current_mins[group] + 1; v < group_buffer_current_mins[group] + left_elements; ++v)
00519 {
00520 if (inv_cmp(*v, *(v - 1)))
00521 {
00522 STXXL_MSG("Error in buffer " << group << " at position " << (v - group_buffer_current_mins[group] - 1) << "/" << (v - group_buffer_current_mins[group]) << " " << *(v - 2) << " " << *(v - 1) << " " << *v << " " << *(v + 1));
00523 }
00524 }
00525 assert(false);
00526 }
00527 #endif
00528
00529 return length + left_elements;
00530 }
00531
00532
00533 template <class Config_>
00534 void priority_queue<Config_>::refill_delete_buffer()
00535 {
00536 STXXL_VERBOSE_PQ("refill_delete_buffer()");
00537
00538 size_type total_group_size = 0;
00539
00540 for (int i = num_active_groups - 1; i >= 0; i--)
00541 {
00542 if ((group_buffers[i] + N) - group_buffer_current_mins[i] < delete_buffer_size)
00543 {
00544 unsigned_type length = refill_group_buffer(i);
00545
00546 if (length == 0 && unsigned(i) == num_active_groups - 1)
00547 --num_active_groups;
00548
00549 total_group_size += length;
00550 }
00551 else
00552 total_group_size += delete_buffer_size;
00553 }
00554
00555 unsigned_type length;
00556 if (total_group_size >= delete_buffer_size)
00557 {
00558 length = delete_buffer_size;
00559 size_ -= size_type(delete_buffer_size);
00560 }
00561 else
00562 {
00563 length = total_group_size;
00564 assert(size_ == size_type(length));
00565 size_ = 0;
00566 }
00567
00568 priority_queue_local::invert_order<typename Config::comparator_type, value_type, value_type> inv_cmp(cmp);
00569
00570
00571
00572
00573 delete_buffer_current_min = delete_buffer_end - length;
00574 STXXL_VERBOSE_PQ("refill_del... Active groups = " << num_active_groups);
00575 switch (num_active_groups)
00576 {
00577 case 0:
00578 break;
00579 case 1:
00580 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + length, delete_buffer_current_min);
00581 group_buffer_current_mins[0] += length;
00582 break;
00583 case 2:
00584 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00585 {
00586 std::pair<value_type *, value_type *> seqs[2] =
00587 {
00588 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00589 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N)
00590 };
00591 parallel::multiway_merge_sentinel(seqs, seqs + 2, delete_buffer_current_min, inv_cmp, length);
00592
00593 group_buffer_current_mins[0] = seqs[0].first;
00594 group_buffer_current_mins[1] = seqs[1].first;
00595 }
00596 #else
00597 priority_queue_local::merge_iterator(
00598 group_buffer_current_mins[0],
00599 group_buffer_current_mins[1], delete_buffer_current_min, length, cmp);
00600 #endif
00601 break;
00602 case 3:
00603 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00604 {
00605 std::pair<value_type *, value_type *> seqs[3] =
00606 {
00607 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00608 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00609 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N)
00610 };
00611 parallel::multiway_merge_sentinel(seqs, seqs + 3, delete_buffer_current_min, inv_cmp, length);
00612
00613 group_buffer_current_mins[0] = seqs[0].first;
00614 group_buffer_current_mins[1] = seqs[1].first;
00615 group_buffer_current_mins[2] = seqs[2].first;
00616 }
00617 #else
00618 priority_queue_local::merge3_iterator(
00619 group_buffer_current_mins[0],
00620 group_buffer_current_mins[1],
00621 group_buffer_current_mins[2], delete_buffer_current_min, length, cmp);
00622 #endif
00623 break;
00624 case 4:
00625 #if STXXL_PARALLEL && STXXL_PARALLEL_PQ_MULTIWAY_MERGE_DELETE_BUFFER
00626 {
00627 std::pair<value_type *, value_type *> seqs[4] =
00628 {
00629 std::make_pair(group_buffer_current_mins[0], group_buffers[0] + N),
00630 std::make_pair(group_buffer_current_mins[1], group_buffers[1] + N),
00631 std::make_pair(group_buffer_current_mins[2], group_buffers[2] + N),
00632 std::make_pair(group_buffer_current_mins[3], group_buffers[3] + N)
00633 };
00634 parallel::multiway_merge_sentinel(seqs, seqs + 4, delete_buffer_current_min, inv_cmp, length);
00635
00636 group_buffer_current_mins[0] = seqs[0].first;
00637 group_buffer_current_mins[1] = seqs[1].first;
00638 group_buffer_current_mins[2] = seqs[2].first;
00639 group_buffer_current_mins[3] = seqs[3].first;
00640 }
00641 #else
00642 priority_queue_local::merge4_iterator(
00643 group_buffer_current_mins[0],
00644 group_buffer_current_mins[1],
00645 group_buffer_current_mins[2],
00646 group_buffer_current_mins[3], delete_buffer_current_min, length, cmp);
00647 #endif
00648 break;
00649 default:
00650 STXXL_THROW(std::runtime_error, "priority_queue<...>::refill_delete_buffer()",
00651 "Overflow! The number of buffers on 2nd level in stxxl::priority_queue is currently limited to 4");
00652 }
00653
00654 #if STXXL_CHECK_ORDER_IN_SORTS
00655 if (!stxxl::is_sorted(delete_buffer_current_min, delete_buffer_end, inv_cmp))
00656 {
00657 for (value_type * v = delete_buffer_current_min + 1; v < delete_buffer_end; ++v)
00658 {
00659 if (inv_cmp(*v, *(v - 1)))
00660 {
00661 STXXL_MSG("Error at position " << (v - delete_buffer_current_min - 1) << "/" << (v - delete_buffer_current_min) << " " << *(v - 1) << " " << *v);
00662 }
00663 }
00664 assert(false);
00665 }
00666 #endif
00667
00668 }
00669
00670
00671
00672
00673
00674
00675 template <class Config_>
00676 unsigned_type priority_queue<Config_>::make_space_available(unsigned_type level)
00677 {
00678 STXXL_VERBOSE_PQ("make_space_available(" << level << ")");
00679 unsigned_type finalLevel;
00680 assert(level < total_num_groups);
00681 assert(level <= num_active_groups);
00682
00683 if (level == num_active_groups)
00684 ++num_active_groups;
00685
00686 const bool spaceIsAvailable_ =
00687 (level < num_int_groups) ? int_mergers[level].is_space_available()
00688 : (ext_mergers[level - num_int_groups]->is_space_available());
00689
00690 if (spaceIsAvailable_)
00691 {
00692 finalLevel = level;
00693 }
00694 else if (level == total_num_groups - 1)
00695 {
00696 size_type capacity = N;
00697 for (int i = 0; i < num_int_groups; ++i)
00698 capacity *= IntKMAX;
00699 for (int i = 0; i < num_ext_groups; ++i)
00700 capacity *= ExtKMAX;
00701 STXXL_ERRMSG("priority_queue OVERFLOW - all groups full, size=" << size() <<
00702 ", capacity(last externel group (" << num_int_groups + num_ext_groups - 1 << "))=" << capacity);
00703 dump_sizes();
00704
00705 int extLevel = level - num_int_groups;
00706 const size_type segmentSize = ext_mergers[extLevel]->size();
00707 STXXL_VERBOSE1("Inserting segment into last level external: " << level << " " << segmentSize);
00708 ext_merger_type * overflow_merger = new ext_merger_type;
00709 overflow_merger->set_pool(pool);
00710 overflow_merger->insert_segment(*ext_mergers[extLevel], segmentSize);
00711 std::swap(ext_mergers[extLevel], overflow_merger);
00712 delete overflow_merger;
00713 finalLevel = level;
00714 }
00715 else
00716 {
00717 finalLevel = make_space_available(level + 1);
00718
00719 if (level < num_int_groups - 1)
00720 {
00721 unsigned_type segmentSize = int_mergers[level].size();
00722 value_type * newSegment = new value_type[segmentSize + 1];
00723 int_mergers[level].multi_merge(newSegment, segmentSize);
00724
00725 newSegment[segmentSize] = delete_buffer[delete_buffer_size];
00726
00727
00728
00729 int_mergers[level + 1].insert_segment(newSegment, segmentSize);
00730 }
00731 else
00732 {
00733 if (level == num_int_groups - 1)
00734 {
00735 const unsigned_type segmentSize = int_mergers[num_int_groups - 1].size();
00736 STXXL_VERBOSE_PQ("make_space... Inserting segment into first level external: " << level << " " << segmentSize);
00737 ext_mergers[0]->insert_segment(int_mergers[num_int_groups - 1], segmentSize);
00738 }
00739 else
00740 {
00741 const size_type segmentSize = ext_mergers[level - num_int_groups]->size();
00742 STXXL_VERBOSE_PQ("make_space... Inserting segment into second level external: " << level << " " << segmentSize);
00743 ext_mergers[level - num_int_groups + 1]->insert_segment(*ext_mergers[level - num_int_groups], segmentSize);
00744 }
00745 }
00746 }
00747 return finalLevel;
00748 }
00749
00750
00751
00752 template <class Config_>
00753 void priority_queue<Config_>::empty_insert_heap()
00754 {
00755 STXXL_VERBOSE_PQ("empty_insert_heap()");
00756 assert(insert_heap.size() == (N + 1));
00757
00758 const value_type sup = get_supremum();
00759
00760
00761 value_type * newSegment = new value_type[N + 1];
00762 value_type * newPos = newSegment;
00763
00764
00765
00766 value_type * SortTo = newSegment;
00767
00768 insert_heap.sort_to(SortTo);
00769
00770 SortTo = newSegment + N;
00771 insert_heap.clear();
00772 insert_heap.push(*SortTo);
00773
00774 assert(insert_heap.size() == 1);
00775
00776 newSegment[N] = sup;
00777
00778
00779
00780 const unsigned_type tempSize = N + delete_buffer_size;
00781 value_type temp[tempSize + 1];
00782 unsigned_type sz1 = current_delete_buffer_size();
00783 unsigned_type sz2 = current_group_buffer_size(0);
00784 value_type * pos = temp + tempSize - sz1 - sz2;
00785 std::copy(delete_buffer_current_min, delete_buffer_current_min + sz1, pos);
00786 std::copy(group_buffer_current_mins[0], group_buffer_current_mins[0] + sz2, pos + sz1);
00787 temp[tempSize] = sup;
00788
00789
00790
00791
00792 priority_queue_local::merge_iterator(pos, newPos, delete_buffer_current_min, sz1, cmp);
00793
00794
00795
00796
00797 priority_queue_local::merge_iterator(pos, newPos, group_buffer_current_mins[0], sz2, cmp);
00798
00799
00800
00801
00802 priority_queue_local::merge_iterator(pos, newPos, newSegment, N, cmp);
00803
00804
00805 unsigned_type freeLevel = make_space_available(0);
00806 assert(freeLevel == 0 || int_mergers[0].size() == 0);
00807 int_mergers[0].insert_segment(newSegment, N);
00808
00809
00810
00811 if (freeLevel > 0)
00812 {
00813 for (int_type i = freeLevel; i >= 0; i--)
00814 {
00815
00816
00817
00818 newSegment = new value_type[current_group_buffer_size(i) + 1];
00819 std::copy(group_buffer_current_mins[i], group_buffer_current_mins[i] + current_group_buffer_size(i) + 1, newSegment);
00820 int_mergers[0].insert_segment(newSegment, current_group_buffer_size(i));
00821 group_buffer_current_mins[i] = group_buffers[i] + N;
00822 }
00823 }
00824
00825
00826 size_ += size_type(N);
00827
00828
00829 if (delete_buffer_current_min == delete_buffer_end)
00830 refill_delete_buffer();
00831 }
00832
00833 template <class Config_>
00834 void priority_queue<Config_>::dump_sizes() const
00835 {
00836 unsigned_type capacity = N;
00837 STXXL_MSG("pq::size()\t= " << size());
00838 STXXL_MSG(" insert_heap\t= " << insert_heap.size() - 1 << "/" << capacity);
00839 STXXL_MSG(" delete_buffer\t= " << (delete_buffer_end - delete_buffer_current_min) << "/" << delete_buffer_size);
00840 for (int i = 0; i < num_int_groups; ++i) {
00841 capacity *= IntKMAX;
00842 STXXL_MSG(" grp " << i << " int" <<
00843 " grpbuf=" << current_group_buffer_size(i) <<
00844 " size=" << int_mergers[i].size() << "/" << capacity <<
00845 " space=" << int_mergers[i].is_space_available());
00846 }
00847 for (int i = 0; i < num_ext_groups; ++i) {
00848 capacity *= ExtKMAX;
00849 STXXL_MSG(" grp " << i + num_int_groups << " ext" <<
00850 " grpbuf=" << current_group_buffer_size(i + num_int_groups) <<
00851 " size=" << ext_mergers[i]->size() << "/" << capacity <<
00852 " space=" << ext_mergers[i]->is_space_available());
00853 }
00854 dump_params();
00855 }
00856
00857 template <class Config_>
00858 void priority_queue<Config_>::dump_params() const
00859 {
00860 STXXL_MSG("params: delete_buffer_size=" << delete_buffer_size << " N=" << N << " IntKMAX=" << IntKMAX << " num_int_groups=" << num_int_groups << " ExtKMAX=" << ExtKMAX << " num_ext_groups=" << num_ext_groups << " BlockSize=" << BlockSize);
00861 }
00862
00863 namespace priority_queue_local
00864 {
00865 struct Parameters_for_priority_queue_not_found_Increase_IntM
00866 {
00867 enum { fits = false };
00868 typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00869 };
00870
00871 struct dummy
00872 {
00873 enum { fits = false };
00874 typedef dummy result;
00875 };
00876
00877 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_, bool stop = false>
00878 struct find_B_m
00879 {
00880 typedef find_B_m<E_, IntM_, MaxS_, B_, m_, stop> Self;
00881 enum {
00882 k = IntM_ / B_,
00883 element_size = E_,
00884 IntM = IntM_,
00885 B = B_,
00886 m = m_,
00887 c = k - m_,
00888
00889
00890 fits = c > 10 && ((k - m) * (m) * (m * B / (element_size * 4 * 1024))) >= MaxS_
00891 && ((MaxS_ < ((k - m) * m / (2 * element_size)) * 1024) || m >= 128),
00892 step = 1
00893 };
00894
00895 typedef typename find_B_m<element_size, IntM, MaxS_, B, m + step, fits || (m >= k - step)>::result candidate1;
00896 typedef typename find_B_m<element_size, IntM, MaxS_, B / 2, 1, fits || candidate1::fits>::result candidate2;
00897 typedef typename IF<fits, Self, typename IF<candidate1::fits, candidate1, candidate2>::result>::result result;
00898 };
00899
00900
00901 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, bool stop>
00902 struct find_B_m<E_, IntM_, MaxS_, 2048, 1, stop>
00903 {
00904 enum { fits = false };
00905 typedef Parameters_for_priority_queue_not_found_Increase_IntM result;
00906 };
00907
00908
00909 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_, unsigned_type B_, unsigned_type m_>
00910 struct find_B_m<E_, IntM_, MaxS_, B_, m_, true>
00911 {
00912 enum { fits = false };
00913 typedef dummy result;
00914 };
00915
00916
00917 template <unsigned_type E_, unsigned_type IntM_, unsigned_type MaxS_>
00918 struct find_settings
00919 {
00920
00921 typedef typename find_B_m<E_, IntM_, MaxS_, (8 * 1024 * 1024), 1>::result result;
00922 };
00923
00924 struct Parameters_not_found_Try_to_change_the_Tune_parameter
00925 {
00926 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00927 };
00928
00929
00930 template <unsigned_type AI_, unsigned_type X_, unsigned_type CriticalSize_>
00931 struct compute_N
00932 {
00933 typedef compute_N<AI_, X_, CriticalSize_> Self;
00934 enum
00935 {
00936 X = X_,
00937 AI = AI_,
00938 N = X / (AI * AI)
00939 };
00940 typedef typename IF<(N >= CriticalSize_), Self, typename compute_N<AI / 2, X, CriticalSize_>::result>::result result;
00941 };
00942
00943 template <unsigned_type X_, unsigned_type CriticalSize_>
00944 struct compute_N<1, X_, CriticalSize_>
00945 {
00946 typedef Parameters_not_found_Try_to_change_the_Tune_parameter result;
00947 };
00948 }
00949
00951
00954
00956
01019 template <class Tp_, class Cmp_, unsigned_type IntM_, unsigned MaxS_, unsigned Tune_ = 6>
01020 class PRIORITY_QUEUE_GENERATOR
01021 {
01022 public:
01023
01024 typedef typename priority_queue_local::find_settings<sizeof(Tp_), IntM_, MaxS_>::result settings;
01025 enum {
01026 B = settings::B,
01027 m = settings::m,
01028 X = B * (settings::k - m) / settings::element_size,
01029 Buffer1Size = 32
01030 };
01031
01032 typedef typename priority_queue_local::compute_N<(1 << Tune_), X, 4 * Buffer1Size>::result ComputeN;
01033 enum
01034 {
01035 N = ComputeN::N,
01036 AI = ComputeN::AI,
01037 AE = (m / 2 < 2) ? 2 : (m / 2)
01038 };
01039 enum {
01040
01041 EConsumption = X * settings::element_size + settings::B * AE + ((MaxS_ / X) / AE) * settings::B * 1024
01042 };
01043
01044
01045
01046
01047
01048
01049
01050
01051
01052 typedef priority_queue<priority_queue_config<Tp_, Cmp_, Buffer1Size, N, AI, 2, B, AE, 2> > result;
01053 };
01054
01056
01057 __STXXL_END_NAMESPACE
01058
01059
01060 namespace std
01061 {
01062 template <class Config_>
01063 void swap(stxxl::priority_queue<Config_> & a,
01064 stxxl::priority_queue<Config_> & b)
01065 {
01066 a.swap(b);
01067 }
01068 }
01069
01070 #endif // !STXXL_PRIORITY_QUEUE_HEADER
01071