diff --git a/src/core/utils/mapper_fast.h b/src/core/utils/mapper_fast.h new file mode 100644 index 0000000000000000000000000000000000000000..1386200477a825aedc9c4f58881eaf6148131dc3 --- /dev/null +++ b/src/core/utils/mapper_fast.h @@ -0,0 +1,571 @@ +// +// Created by z00576261 on 2024/4/15. +// + +#ifndef FAST_MAPPERFAST_H +#define FAST_MAPPERFAST_H + +#include +#include +#include +#include +#include +#include +#include "securec.h" + +namespace RecMapper { + constexpr size_t BUCKCAPACITY = 3; + + enum BuckStatus { + BUCK_POS_0 = 0, + BUCK_POS_1, + BUCK_POS_2, + BUCK_EXIST, + BUCK_NOEXIST, + BUCK_ERROR + }; + + class SpinLock { + public: + SpinLock() = default; + SpinLock(const SpinLock&) = delete; + SpinLock& operator=(const SpinLock) = delete; + + void lock() + { + while (f.test_and_set(std::memory_order_acquire)) {}; + } + + void unlock() + { + f.clear(std::memory_order_release); + } + + private: + std::atomic_flag f; + }; + + template + struct InnerBuck { + std::pair, V> datas_[BUCKCAPACITY]; + InnerBuck* next_ = nullptr; + SpinLock spin; + + BuckStatus Insert(K key, V& value, std::function ValueSet) + { + for (size_t i = 0; i < BUCKCAPACITY; ++i) { + K old_key = 0; + if (datas_[i].first.load(std::memory_order_relaxed) == 0 && + datas_[i].first.compare_exchange_strong(old_key, key)) { + bool ret = ValueSet(); + if (!ret) { + datas_[i].first.store(0); + return BuckStatus::BUCK_ERROR; + } + datas_[i].second = value; + if (i == 0) { + return BuckStatus::BUCK_POS_0; + } else if (i == 1) { + return BuckStatus::BUCK_POS_1; + } else { + return BuckStatus::BUCK_POS_2; + }; + } + } + return BuckStatus::BUCK_ERROR; + } + + BuckStatus Find(K key) + { + for (size_t i = 0; i < BUCKCAPACITY; ++i) { + if (datas_[i].first.load(std::memory_order_relaxed) == key) { + if (i == 0) { + return BuckStatus::BUCK_POS_0; + } else if (i == 1) { + return BuckStatus::BUCK_POS_1; + } else { + return BuckStatus::BUCK_POS_2; + }; + } + } + return BuckStatus::BUCK_NOEXIST; + } + + BuckStatus Remove(K key) + { + for (size_t i = 0; i < BUCKCAPACITY; ++i) { + K oldkey = key; + if (datas_[i].first.load(std::memory_order_relaxed) == key) { + if (datas_[i].first.compare_exchange_strong(oldkey, 0)) { + datas_[i].second = 0; + return BuckStatus::BUCK_EXIST; + } + } + } + return BUCK_ERROR; + } + }; + + template + class FasterMapper; + + template + class Iterator { + template + friend class FasterMapper; + public: + typedef Iterator Self; + + Iterator(InnerBuck* node, size_t pos, FasterMapper* map): node_(node), pos_(pos), map_(map) {} + Iterator(InnerBuck* node, size_t pos, const FasterMapper* map):node_(node), pos_(pos), map_(map) {} + Iterator(const Iterator& other):node_(other.node_), pos_(other.pos_), map_(other.map_) {} + + Ref operator*() + { + return node_->datas_[pos_]; + } + + Ptr operator->() + { + return &node_->datas_[pos_]; + } + + Self operator++() + { + if (map_->use_spec_buck && IsSpecBuck()) { + InnerBuck* temp_buck = nullptr; + size_t temp_pos = 0; + if (FindNextBuck(0, 0, temp_buck, temp_pos)) { + node_ = temp_buck; + pos_ = temp_pos; + return *this; + } + node_ = nullptr; + pos_ = 0; + return *this; + } + + size_t temp_pos = pos_ + 1; + while (temp_pos < BUCKCAPACITY) { + if (node_->datas_[temp_pos].first.load(std::memory_order_relaxed) != 0) { + pos_ = temp_pos; + return *this; + } + temp_pos++; + } + while (node_->next_ != nullptr) { + for (size_t i = 0; i < BUCKCAPACITY; ++i) { + if (node_->next_->datas_[i].first.load(std::memory_order_relaxed) != 0) { + pos_ = i; + node_ = node_->next_; + return *this; + } + } + } + size_t map_index = node_->datas_[pos_].first.load(std::memory_order_relaxed) % map_->sub_map_count_; + size_t buck_index = node_->datas_[pos_].first.load(std::memory_order_relaxed) % map_->buck_count_; + GetNextIndex(map_index, buck_index); + InnerBuck* temp_buck = nullptr; + temp_pos = 0; + for (; map_index < map_->sub_map_count_; map_index++) { + if (map_->buck_maps_[map_index] == nullptr) { + continue; + } + if (FindNextBuck(map_index, buck_index, temp_buck, temp_pos)) { + node_ = temp_buck; + pos_ = temp_pos; + return *this; + } + buck_index = 0; + } + node_ = nullptr; + pos_ = 0; + return *this; + } + + bool operator==(const Self& other) + { + return node_ == other.node_; + } + + bool operator!=(const Self& other) + { + return node_ != other.node_; + } + + private: + InnerBuck* node_; + size_t pos_; + const FasterMapper* map_; + bool enter_spec = false; + + void GetNextIndex(size_t& map_index, size_t& buck_index) + { + map_index = (buck_index == (map_->buck_count_ - 1) ? ++map_index : map_index); + buck_index = buck_index % (map_->buck_count_ - 1) + (buck_index == (map_->buck_count_ - 1) ? 0 : 1); + } + + bool IsSpecBuck() + { + return node_ == map_->spec_buck; + } + + bool FindNextBuck(size_t map_index, size_t buck_index, InnerBuck*& buck, size_t& pos) + { + for (; buck_index < map_->buck_count_; buck_index++) { + InnerBuck* temp_buck = &map_->buck_maps_[map_index][buck_index]; + for (size_t i = 0; i < BUCKCAPACITY; ++i) { + if (temp_buck->datas_[i].first.load(std::memory_order_relaxed) != 0) { + pos = i; + buck = temp_buck; + return true; + } + } + } + return false; + } + }; + + template + class FasterMapper { + public: + typedef std::pair, V> T; + + typedef Iterator iterator; + typedef Iterator const_iterator; + + FasterMapper(size_t cap, size_t res) : capacity_(cap), reserve_(res) {}; + + ~FasterMapper() = default; + + bool InitializeBuck() + { + size_t i = 0; + + while (i <= prime_max_) { + if (pow(pow_base_, i) < reserve_) { + i++; + continue; + } + break; + } + buck_count_ = std::max(min_buck_num_, static_cast(pow(pow_base_, i))); + if (buck_count_ == 0 || buck_count_ > pow(pow_base_, prime_max_)) { + return false; + } + for (auto &buck_map : buck_maps_) { + InnerBuck* buck_map_temp = new (std::nothrow) InnerBuck[buck_count_]; + if (buck_map_temp == nullptr) { + FreeBuckMaps(); + return false; + } + memset_s(buck_map_temp, sizeof(InnerBuck) * buck_count_, 0, + sizeof(InnerBuck) * buck_count_); + buck_map = buck_map_temp; + } + return true; + } + + void UnInitializeBuck() + { + FreeBuckExpend(); + FreeBuckMaps(); + } + + std::pair PutNotFind(InnerBuck* buck, const K& key, V& value) + { + for (int i = 0; i < loop_max_; ++i) { + // insert exist buck + while (buck != nullptr) { + buck->spin.lock(); + auto value_func = [&]() ->bool { + value = offset_.fetch_add(1); + return true; + }; + BuckStatus ret = buck->Insert(key, value, value_func); + buck->spin.unlock(); + + if (ret == BuckStatus::BUCK_ERROR) { + return std::pair(iterator(nullptr, 0, this), false); + } else if (ret != BuckStatus::BUCK_ERROR) { + size_.fetch_add(1); + return std::pair(iterator(buck, static_cast(ret), this), true); + } + + if (buck->next_ != nullptr) { + buck = buck->next_; + } else { + break; + } + } + + // insert not exist buck + auto& old_spin = buck->spin; + old_spin.lock(); + if (buck->next_ != nullptr) { + buck = buck->next_; + old_spin.unlock(); + continue; + } + + InnerBuck* new_buck = new (std::nothrow) InnerBuck; + memset_s(new_buck, sizeof(InnerBuck), 0, sizeof(InnerBuck)); + buck->next_ = new_buck; + buck = new_buck; + old_spin.unlock(); + } + return std::pair(iterator(nullptr, 0, this), false); + } + + std::pair PutFind(InnerBuck* buck, const K& key, V& value) + { + while (buck != nullptr) { + buck->spin.lock(); + auto status = buck->Find(key); + value = buck->datas_[static_cast(status)].second; + if (status != BuckStatus::BUCK_NOEXIST) { + buck->spin.unlock(); + return std::pair(iterator(buck, static_cast(status), this), true); + } + buck->spin.unlock(); + + if (buck->next_ != nullptr) { + buck = buck->next_; + } else { + break; + } + } + return std::pair(iterator(nullptr, 0, this), false); + }; + + std::pair Put(const K& key, V& value) + { + if (size_.load() == capacity_) { + return std::pair(iterator(nullptr, 0, this), false); + } + + if (key == 0) { + if (spec_buck != nullptr) { + return std::pair(iterator(spec_buck, 0, this), true); + } + spec_buck = new (std::nothrow) InnerBuck; + memset_s(spec_buck, sizeof(InnerBuck), 0, sizeof(InnerBuck)); + + spec_buck->spin.lock(); + spec_buck->datas_[0].first.store(key); + value = offset_.fetch_add(1); + spec_buck->datas_[0].second = value; + size_.fetch_add(1); + spec_buck->spin.unlock(); + + use_spec_buck = true; + return std::pair(iterator(spec_buck, 0, this), true); + } + InnerBuck* temp_buck = &(buck_maps_[key % sub_map_count_][key % buck_count_]); + // first,find key if exist in buck + std::pair put_ret = PutFind(temp_buck, key, value); + if (put_ret.second == true) { + return put_ret; + } + + // if not find, + put_ret = PutNotFind(temp_buck, key, value); + if (put_ret.second == true) { + return put_ret; + } + return std::pair(iterator(nullptr, 0, this), false); + } + + iterator Find(const K& key) + { + if (key == 0) { + if (spec_buck != nullptr) { + return iterator(spec_buck, 0, this); + } + return iterator(nullptr, 0, this); + } + InnerBuck* temp_buck = &(buck_maps_[key % sub_map_count_][key % buck_count_]); + if (temp_buck == nullptr) { + return iterator(nullptr, 0, this); + } + auto status = temp_buck->Find(key); + if (status == BuckStatus::BUCK_NOEXIST) { + return iterator(nullptr, 0, this); + } else { + return iterator(temp_buck, static_cast(status), this); + } + } + + bool Remove(const K& key) + { + if (key == 0) { + if (spec_buck != nullptr) { + delete spec_buck; + spec_buck = nullptr; + size_.fetch_sub(1); + use_spec_buck = false; + return true; + } + return false; + } + InnerBuck* temp_buck = &(buck_maps_[key % sub_map_count_][key % buck_count_]); + while (temp_buck != nullptr) { + if (temp_buck->Find(key) == BuckStatus::BUCK_NOEXIST) { + return false; + } + temp_buck->spin.lock(); + if (temp_buck->Remove(key) == BuckStatus::BUCK_EXIST) { + temp_buck->spin.unlock(); + size_.fetch_sub(1); + return true; + } + temp_buck->spin.unlock(); + temp_buck = temp_buck->next_; + } + return false; + } + + bool ToVector(std::vector>& vec) + { + if (spec_buck != nullptr) { + vec.push_back(std::make_pair(spec_buck->datas_[0].first.load(), spec_buck->datas_[0].second)); + } + for (auto& sub_map : buck_maps_) { + if (sub_map == nullptr) { + continue; + } + for (size_t i = 0; i < buck_count_; ++i) { + InnerBuck* temp_buck = &sub_map[i]; + while (temp_buck) { + for (size_t j = 0; j < BUCKCAPACITY && temp_buck->datas_[j].first != 0; ++j) { + vec.push_back(std::make_pair(temp_buck->datas_[j].first.load(), + temp_buck->datas_[j].second)); + } + temp_buck = temp_buck->next_; + } + } + } + return true; + } + + iterator begin() + { + if (spec_buck != nullptr) { + return iterator(spec_buck, 0, this); + } + for (auto &sub_map: buck_maps_) { + if (sub_map == nullptr) { + continue; + } + for (size_t i = 0; i < buck_count_; ++i) { + InnerBuck *buck = &sub_map[i]; + while (buck) { + for (size_t j = 0; j < BUCKCAPACITY && buck->datas_[j].first != 0; ++j) { + return iterator(buck, j, this); + } + buck = buck->next_; + } + } + } + return iterator(nullptr, 0, this); + } + + iterator end() + { + return iterator(nullptr, 0, this); + } + + const_iterator begin() const + { + if (spec_buck != nullptr) { + return iterator(spec_buck, 0, this); + } + for (auto &sub_map: buck_maps_) { + if (sub_map == nullptr) { + continue; + } + for (size_t i = 0; i < buck_count_; ++i) { + InnerBuck *buck = &sub_map[i]; + while (buck) { + for (size_t j = 0; j < BUCKCAPACITY && buck->datas_[j].first != 0; ++j) { + return iterator(buck, j, this); + } + buck = buck->next_; + } + } + } + return iterator(nullptr, 0, this); + } + + const_iterator end() const + { + return iterator(nullptr, 0, this); + } + + size_t Size() + { + return size_.load(); + } + + size_t Capacity() + { + return capacity_; + } + + void FreeBuckMaps() + { + for (auto &buck_map : buck_maps_) { + if (buck_map != nullptr) { + delete[] buck_map; + buck_map = nullptr; + } + } + if (spec_buck != nullptr) { + delete spec_buck; + spec_buck = nullptr; + } + offset_.store(0); + size_.store(0); + reserve_ = 0; + buck_count_ = 0; + capacity_ = 0; + } + + void FreeBuckExpend() + { + for (auto &buck_map : buck_maps_) { + if (buck_map == nullptr) { + continue; + } + for (size_t i = 0; i < buck_count_; ++i) { + InnerBuck* buck_attch = buck_map[i].next_; + while (buck_attch != nullptr) { + InnerBuck* buck_attch_temp = buck_attch->next_; + delete buck_attch; + buck_attch = buck_attch_temp; + } + } + } + } + + std::atomic offset_{ 0 }; + std::atomic size_{ 0 }; + + size_t reserve_; + size_t buck_count_; + size_t capacity_; + + static constexpr size_t sub_map_count_ = 5; + static constexpr size_t prime_max_ = 32; + static constexpr int pow_base_ = 2; + static constexpr int min_buck_num_ = 128; + static constexpr int loop_max_ = 8192; + + InnerBuck* buck_maps_[sub_map_count_] {}; + InnerBuck* spec_buck = nullptr; + + bool use_spec_buck = false; + }; +} + +#endif // FAST_MAPPERFAST_H diff --git a/src/tests/utils/mapper_fast_test.cpp b/src/tests/utils/mapper_fast_test.cpp new file mode 100644 index 0000000000000000000000000000000000000000..105748c5c5d1a3d4f0d4294dc508a60e7d45cf28 --- /dev/null +++ b/src/tests/utils/mapper_fast_test.cpp @@ -0,0 +1,205 @@ +/* Copyright 2024. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and + limitations under the License. +==============================================================================*/ + +#include +#include "utils/mapper_fast.h" + +#include +#include +#include +#include +using namespace std; +using namespace testing; +using namespace RecMapper; + +namespace TempVal { + constexpr int THREAD_NUM = 4; + constexpr int MAPPER_CAP = 180; + constexpr int RESERVE_SPLIT = 15; + constexpr int MAPPER_INSERT_CAP = 1800; + constexpr int MAPPER_INSERT_COUNT = 200; + constexpr int MAPPER_INSERT_NEW = 100; +} + +TEST(InnerBuck, BuckCase) +{ + InnerBuck<::uint64_t, ::uint64_t>* buck = new (std::nothrow) InnerBuck<::uint64_t, ::uint64_t>; + memset(buck, 0, sizeof(InnerBuck<::uint64_t, ::uint64_t>)); + buck->spin.lock(); + std::vector keys = {1, 2, 3, 1, 4}; + std::vector values = {1, 2, 3, 4}; + auto func = []() ->bool {return true;}; + BuckStatus ret_status = buck->Insert(keys[0], values[0], func); + ASSERT_EQ(ret_status, BuckStatus::BUCK_POS_0); + ret_status = buck->Insert(keys[1], values[1], func); + ASSERT_EQ(ret_status, BuckStatus::BUCK_POS_1); + ret_status = buck->Insert(keys[2], values[2], func); + ASSERT_EQ(ret_status, BuckStatus::BUCK_POS_2); + ret_status = buck->Insert(keys[3], values[3], func); + ASSERT_EQ(ret_status, BuckStatus::BUCK_ERROR); + buck->spin.unlock(); + + buck->spin.lock(); + ret_status = buck->Find(keys[0]); + ASSERT_EQ(ret_status, BuckStatus::BUCK_POS_0); + ret_status = buck->Find(keys[4]); + ASSERT_EQ(ret_status, BuckStatus::BUCK_NOEXIST); + buck->spin.unlock(); + + buck->spin.lock(); + ret_status = buck->Remove(keys[0]); + ASSERT_EQ(ret_status, BuckStatus::BUCK_EXIST); + ret_status = buck->Remove(keys[4]); + ASSERT_EQ(ret_status, BuckStatus::BUCK_ERROR); + buck->spin.unlock(); +} + +vector getRandom(size_t total, size_t use) +{ + if (total <= 0) { + return {}; + } + std::default_random_engine e; + std::vector input; + for (size_t i = 0; i < total; i++) { + input.push_back(i); + } + vector output; + int end = total; + for (size_t i = 0; i < total && output.size() < use; i++) { + vector::iterator iter = input.begin(); + int64_t num = e() % end; + iter = iter + num; + output.push_back(*iter); + input.erase(iter); + end--; + } + return output; +} + +void GenerateKeys(vector& keys, int64_t& key_start, std::vector& total_ids_num, + std::vector& look_ids_num, int i) +{ + std::random_device rd; + std::mt19937 g(rd()); + int new_ids_num = (i > 0) ? (total_ids_num[i] - total_ids_num[i - 1]) : total_ids_num[i]; + int old_ids_num = look_ids_num[i] - new_ids_num; + // old + keys = getRandom(key_start, old_ids_num); + // new + for (int j = 0; j < new_ids_num; ++j) { + keys.push_back(j + key_start); + } + std::shuffle(keys.begin(), keys.end(), g); + key_start += new_ids_num; +} + +void parallelGetInsert(FasterMapper<::uint64_t, ::uint64_t>* map, const std::vector& ids) +{ + uint64_t failed_num = 0; + for (uint64_t i = 0; i < ids.size(); i++) { + uint64_t value; + auto ret = map->Put(ids[i], value); + if (ret.second == false) { + failed_num++; + } + } + ASSERT_EQ(failed_num, 0); +} + +TEST(FasterMapper, PutCase) +{ + FasterMapper<::uint64_t, ::uint64_t> tMap(TempVal::MAPPER_INSERT_CAP, + TempVal::MAPPER_INSERT_CAP / TempVal::RESERVE_SPLIT); + bool ret = tMap.InitializeBuck(); + ASSERT_EQ(ret, true); + + int64_t key_start = 0; + std::vector look_ids_num(9, TempVal::MAPPER_INSERT_COUNT); + std::vector total_ids_num(9, TempVal::MAPPER_INSERT_COUNT); + for (int i = 0; i < 9; ++i) { + total_ids_num[i] += TempVal::MAPPER_INSERT_NEW * i; + } + for (size_t i = 0; i < look_ids_num.size(); ++i) { + vector keys; + GenerateKeys(keys, key_start, total_ids_num, look_ids_num, i); + std::vector> ids_vec; + size_t signum = keys.size() / TempVal::THREAD_NUM; + for (size_t j = 0; j < (TempVal::THREAD_NUM - 1); j++) { + ids_vec.push_back(vector(keys.begin() + j * signum, keys.begin() + (j+1) * signum)); + } + ids_vec.push_back(vector(keys.begin() + (TempVal::THREAD_NUM - 1) * signum, keys.end())); + std::vector workers(TempVal::THREAD_NUM); + for (size_t j = 0; j < TempVal::THREAD_NUM; j++) { + if (j == 0) { + workers[j] = std::thread(parallelGetInsert, &tMap, ids_vec[j]); + } else if (j == 1) { + workers[j] = std::thread(parallelGetInsert, &tMap, ids_vec[j]); + } else if (j == 2) { + workers[j] = std::thread(parallelGetInsert, &tMap, ids_vec[j]); + } else { + workers[j] = std::thread(parallelGetInsert, &tMap, ids_vec[j]); + } + } + for (size_t j = 0; j < TempVal::THREAD_NUM; j++) { + workers[j].join(); + } + ASSERT_EQ(tMap.Size(), total_ids_num[i]); + } +} + +TEST(FasterMapper, FindAndRemoveCase) +{ + FasterMapper<::uint64_t, ::uint64_t> tMap(TempVal::MAPPER_CAP, TempVal::MAPPER_CAP / TempVal::RESERVE_SPLIT); + bool ret = tMap.InitializeBuck(); + ASSERT_EQ(ret, true); + vector val_insert; + for (int i = 0; i < TempVal::MAPPER_CAP; i++) { + uint64_t value; + auto put_ret = tMap.Put(i, value); + ASSERT_EQ(put_ret.second, true); + val_insert.push_back(value); + } + for (int i = 0; i < TempVal::MAPPER_CAP; i++) { + auto find_ret = tMap.Find(i); + ASSERT_EQ(find_ret->first.load(), i); + ASSERT_EQ(find_ret->second, val_insert[i]); + } + uint64_t error_value; + auto put_ret = tMap.Put(TempVal::MAPPER_CAP + 1, error_value); + ASSERT_EQ(put_ret.second, false); + + for (auto it = tMap.begin(); it != tMap.end(); ++it) { + ::uint64_t val = it->second; + auto iter = std::find(val_insert.begin(), val_insert.end(), val); + ASSERT_NE(iter, val_insert.end()); + } + + std::vector> vec; + tMap.ToVector(vec); + ASSERT_EQ(vec.size(), TempVal::MAPPER_CAP); + + auto remove_ret = tMap.Remove(0); + ASSERT_EQ(remove_ret, true); + ASSERT_EQ(tMap.Size(), TempVal::MAPPER_CAP - 1); + + for (size_t i = 1; i < val_insert.size(); ++i) { + auto find_ret = tMap.Remove(i); + ASSERT_EQ(find_ret, true); + } + ASSERT_EQ(tMap.Size(), 0); + tMap.UnInitializeBuck(); + ASSERT_EQ(tMap.Capacity(), 0); +}