Commit 76c0abb5 authored by Andrey Kamakin's avatar Andrey Kamakin

Added KisTileHashTableTraits2 basic version.

Moved folder with lock free map to 3dparty and implemented very simple table
based of ConcurrentMap.

Ref T8874
parent 59230b15
......@@ -187,7 +187,7 @@ public:
inline bool isNull() const {
return (d == 0);
}
private:
inline static void ref(const KisSharedPtr<T>* sp, T* t)
{
#ifndef HAVE_MEMORY_LEAK_TRACKER
......@@ -199,10 +199,7 @@ private:
t->ref();
}
}
inline void ref() const
{
ref(this, d);
}
inline static bool deref(const KisSharedPtr<T>* sp, T* t)
{
#ifndef HAVE_MEMORY_LEAK_TRACKER
......@@ -216,6 +213,13 @@ private:
}
return true;
}
private:
inline void ref() const
{
ref(this, d);
}
inline void deref() const
{
bool v = deref(this, d);
......@@ -227,6 +231,7 @@ private:
Q_UNUSED(v);
#endif
}
private:
mutable T* d;
};
......
......@@ -8,14 +8,13 @@
See the LICENSE file for more information.
------------------------------------------------------------------------*/
#ifndef CONCURRENTMAP_LEAPFROG_H
#define CONCURRENTMAP_LEAPFROG_H
#ifndef CONCURRENTMAP_H
#define CONCURRENTMAP_H
#include "Leapfrog.h"
#include <QDebug>
#include "leapfrog.h"
template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
class ConcurrentMap_Leapfrog
class ConcurrentMap
{
public:
typedef K Key;
......@@ -23,17 +22,17 @@ public:
typedef KT KeyTraits;
typedef VT ValueTraits;
typedef quint32 Hash;
typedef Leapfrog<ConcurrentMap_Leapfrog> Details;
typedef Leapfrog<ConcurrentMap> Details;
private:
Atomic<typename Details::Table*> m_root;
public:
ConcurrentMap_Leapfrog(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity))
ConcurrentMap(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity))
{
}
~ConcurrentMap_Leapfrog()
~ConcurrentMap()
{
typename Details::Table* table = m_root.loadNonatomic();
table->destroy();
......@@ -61,15 +60,15 @@ public:
class Mutator
{
private:
friend class ConcurrentMap_Leapfrog;
friend class ConcurrentMap;
ConcurrentMap_Leapfrog& m_map;
ConcurrentMap& m_map;
typename Details::Table* m_table;
typename Details::Cell* m_cell;
Value m_value;
// Constructor: Find existing cell
Mutator(ConcurrentMap_Leapfrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue))
Mutator(ConcurrentMap& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue))
{
Hash hash = KeyTraits::hash(key);
for (;;) {
......@@ -92,7 +91,7 @@ public:
}
// Constructor: Insert or find cell
Mutator(ConcurrentMap_Leapfrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue))
Mutator(ConcurrentMap& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue))
{
Hash hash = KeyTraits::hash(key);
for (;;) {
......@@ -302,7 +301,7 @@ public:
Value m_value;
public:
Iterator(ConcurrentMap_Leapfrog& map)
Iterator(ConcurrentMap& map)
{
// Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
m_table = map.m_root.load(Consume);
......@@ -346,55 +345,4 @@ public:
};
};
struct Foo {
void destroy()
{
delete this;
}
};
template <class T>
class KisLockFreeMap
{
public:
KisLockFreeMap() : m_currentThreads(0)
{
m_context = QSBR::instance().createContext();
}
~KisLockFreeMap()
{
QSBR::instance().destroyContext(m_context);
}
T insert(qint32 key, T value)
{
return m_map.assign(key, value);
}
void erase(qint32 key)
{
qint32 currentThreads = m_currentThreads.fetchAdd(1, ConsumeRelease);
T val = m_map.erase(key);
if (QTypeInfo<T>::isPointer) {
QSBR::instance().enqueue(&Foo::destroy, val);
}
qint32 expected = 1;
if (m_currentThreads.compareExchangeStrong(expected, currentThreads, Consume)) {
QSBR::instance().update(m_context);
}
m_currentThreads.fetchSub(1, ConsumeRelease);
}
T get(qint32 key)
{
return m_map.get(key);
}
ConcurrentMap_Leapfrog<qint32, T> m_map;
private:
QSBR::Context m_context;
Atomic<qint32> m_currentThreads;
};
#endif // CONCURRENTMAP_LEAPFROG_H
......@@ -11,9 +11,9 @@
#ifndef LEAPFROG_H
#define LEAPFROG_H
#include "MapTraits.h"
#include "SimpleJobCoordinator.h"
#include "QSBR.h"
#include "map_traits.h"
#include "simple_job_coordinator.h"
#include "qsbr.h"
template <class Map>
struct Leapfrog {
......
......@@ -12,7 +12,6 @@
#define MAPTRAITS_H
#include <QtCore>
#include <QDebug>
inline quint64 roundUpPowerOf2(quint64 v)
{
......
......@@ -15,8 +15,6 @@
#include <QMutex>
#include <QMutexLocker>
// TODO: Think about changing signature for std::function for more flexible usage
#define CALL_MEMBER(obj, pmf) ((obj).*(pmf))
class QSBR
......
......@@ -14,7 +14,8 @@
#include <QMutex>
#include <QWaitCondition>
#include <QMutexLocker>
#include "Atomic.h"
#include "atomic.h"
class SimpleJobCoordinator
{
......
#ifndef KIS_TILEHASHTABLE_2_H
#define KIS_TILEHASHTABLE_2_H
#include "kis_shared.h"
#include "kis_shared_ptr.h"
#include "3rdparty/lock_free_map/concurrent_map.h"
template <class T>
class KisTileHashTableTraits2
{
static_assert(std::is_convertible<T*, KisShared*>::value, "Template must inherit KisShared");
public:
typedef T TileType;
typedef KisSharedPtr<T> TileTypeSP;
typedef KisWeakSharedPtr<T> TileTypeWSP;
typedef typename ConcurrentMap<qint32, TileType *>::Iterator Iterator;
KisTileHashTableTraits2()
: m_rawPointerUsers(0)
{
m_context = QSBR::instance().createContext();
}
~KisTileHashTableTraits2()
{
QSBR::instance().destroyContext(m_context);
}
TileTypeSP insert(qint32 key, TileTypeSP value)
{
TileTypeSP::ref(&value, value.data());
TileType *result = m_map.assign(key, value.data());
if (result) {
MemoryReclaimer *tmp = new MemoryReclaimer(result);
QSBR::instance().enqueue(&MemoryReclaimer::destroy, tmp);
}
return TileTypeSP(result);
}
TileTypeSP erase(qint32 key)
{
qint32 currentThreads = m_rawPointerUsers.fetchAdd(1, ConsumeRelease);
TileType *result = m_map.erase(key);
if (result) {
MemoryReclaimer *tmp = new MemoryReclaimer(result);
QSBR::instance().enqueue(&MemoryReclaimer::destroy, tmp);
}
qint32 expected = 1;
if (m_rawPointerUsers.compareExchangeStrong(expected, currentThreads, ConsumeRelease)) {
QSBR::instance().update(m_context);
}
m_rawPointerUsers.fetchSub(1, ConsumeRelease);
return TileTypeSP(result);
}
TileTypeSP get(qint32 key)
{
m_rawPointerUsers.fetchAdd(1, ConsumeRelease);
TileTypeSP result(m_map.get(key));
m_rawPointerUsers.fetchSub(1, ConsumeRelease);
return result;
}
Iterator iterator()
{
return Iterator(m_map);
}
private:
struct MemoryReclaimer {
MemoryReclaimer(TileType *data) : d(data) {}
~MemoryReclaimer() = default;
void destroy()
{
TileTypeSP::deref(reinterpret_cast<TileTypeSP *>(this), d);
this->MemoryReclaimer::~MemoryReclaimer();
std::free(this);
}
private:
TileType *d;
};
ConcurrentMap<qint32, TileType *> m_map;
QSBR::Context m_context;
Atomic<qint32> m_rawPointerUsers;
};
#endif // KIS_TILEHASHTABLE_2_H
......@@ -3,28 +3,35 @@
#include <QDebug>
#include "kis_debug.h"
#include "tiles3/LockFreeMap/ConcurrentMap_Leapfrog.h"
#include "kis_shared_ptr.h"
#include "tiles3/kis_memento_item.h"
#include "tiles3/kis_tile_hash_table2.h"
#define NUM_TYPES 2
#define NUM_TYPES 3
// high-concurrency
#define NUM_CYCLES 50000
#define NUM_CYCLES 60000
#define NUM_THREADS 10
typedef ConcurrentMap_Leapfrog<qint32, qint32> ConcurrentMap;
class StressJobLockless : public QRunnable
class Wrapper : public KisShared
{
public:
StressJobLockless(ConcurrentMap &map)
: m_map(map), m_insertSum(0), m_eraseSum(0)
Wrapper(qint32 member) : m_member(member) {}
qint32 member()
{
return m_member;
}
qint64 insertSum()
private:
qint32 m_member;
};
class StressJobWrapper : public QRunnable
{
public:
StressJobWrapper(KisTileHashTableTraits2<Wrapper> &map)
: m_map(map), m_eraseSum(0), m_insertSum(0)
{
return m_insertSum;
}
qint64 eraseSum()
......@@ -32,75 +39,64 @@ public:
return m_eraseSum;
}
qint64 insertSum()
{
return m_insertSum;
}
protected:
void run() override
{
QSBR::Context context = QSBR::instance().createContext();
for (int i = 1; i < NUM_CYCLES + 1; i++) {
for (qint32 i = 1; i < NUM_CYCLES + 1; ++i) {
auto type = i % NUM_TYPES;
switch (type) {
case 0:
m_eraseSum += m_map.erase(i);
case 0: {
auto result = m_map.erase(i - 2);
if (result.data()) {
m_eraseSum += result->member();
}
break;
case 1:
m_eraseSum += m_map.assign(i + 1, i + 1);
m_insertSum += i + 1;
}
case 1: {
auto result = m_map.insert(i, new Wrapper(i));
if (result.data()) {
m_insertSum -= result->member();
}
m_insertSum += i;
break;
}
case 2: {
m_map.get(i - 1);
break;
}
if (i % 10000 == 0) {
QSBR::instance().update(context);
}
}
QSBR::instance().destroyContext(context);
}
private:
ConcurrentMap &m_map;
qint64 m_insertSum;
KisTileHashTableTraits2<Wrapper> &m_map;
qint64 m_eraseSum;
qint64 m_insertSum;
qint64 m_getSum;
};
void LockfreeMapTest::testOperations()
void LockFreeMapTest::testWrapper()
{
ConcurrentMap map;
qint64 totalSum = 0;
QList<StressJobWrapper *> jobs;
KisTileHashTableTraits2<Wrapper> map;
for (auto i = 1; i < NUM_CYCLES + 1; i++) {
totalSum += i + 1;
map.assign(i, i + 1);
}
for (auto i = 1; i < NUM_CYCLES + 1; i++) {
ConcurrentMap::Value result = map.erase(i);
totalSum -= result;
QVERIFY(result);
QCOMPARE(i, result - 1);
}
QVERIFY(totalSum == 0);
}
void LockfreeMapTest::stressTestLockless()
{
QList<StressJobLockless *> jobsList;
ConcurrentMap map;
for (auto i = 0; i < NUM_THREADS; ++i) {
StressJobLockless *task = new StressJobLockless(map);
task->setAutoDelete(false);
jobsList.append(task);
for (qint32 i = 0; i < NUM_THREADS; ++i) {
StressJobWrapper *job = new StressJobWrapper(map);
job->setAutoDelete(false);
jobs.append(job);
}
QThreadPool pool;
pool.setMaxThreadCount(NUM_THREADS);
QBENCHMARK {
for (auto &job : jobsList)
for (auto &job : jobs)
{
pool.start(job);
}
......@@ -108,88 +104,18 @@ void LockfreeMapTest::stressTestLockless()
pool.waitForDone();
}
qint64 totalSum = 0;
qint64 insertSum = 0;
qint64 eraseSum = 0;
for (auto i = 0; i < NUM_THREADS; i++) {
StressJobLockless *job = jobsList.takeLast();
totalSum += job->insertSum();
totalSum -= job->eraseSum();
StressJobWrapper *job = jobs.takeLast();
eraseSum += job->eraseSum();
insertSum += job->insertSum();
delete job;
}
QVERIFY(totalSum == 0);
}
void LockfreeMapTest::iteratorTest()
{
ConcurrentMap map;
qint32 sum = 0;
for (qint32 i = 2; i < 100; ++i) {
map.assign(i, i);
sum += i;
}
ConcurrentMap::Iterator iter(map);
qint32 testSum = 0;
while (iter.isValid()) {
testSum += iter.getValue();
iter.next();
}
QVERIFY(sum == testSum);
}
class StressJobWrapper : public QRunnable
{
public:
StressJobWrapper(KisLockFreeMap<Foo*> &map) : m_map(map) {}
protected:
void run() override
{
for (int i = 1; i < NUM_CYCLES + 1; i++) {
auto type = i % NUM_TYPES;
switch (type) {
case 0:
m_map.erase(i);
break;
case 1:
m_map.insert(i + 1, new Foo);
break;
}
}
}
private:
KisLockFreeMap<Foo*> &m_map;
};
void LockfreeMapTest::testWrapper()
{
QThreadPool pool;
pool.setMaxThreadCount(NUM_THREADS);
KisLockFreeMap<Foo*> map;
for (auto i = 0; i < NUM_THREADS; ++i) {
StressJobWrapper *task = new StressJobWrapper(map);
task->setAutoDelete(true);
pool.start(task);
}
pool.waitForDone();
ConcurrentMap_Leapfrog<qint32, Foo*>::Iterator iter(map.m_map);
qint32 sum = 0;
while (iter.isValid()) {
if (iter.getValue()) {
++sum;
}
iter.next();
}
qDebug() << sum;
QVERIFY(insertSum == eraseSum);
}
QTEST_GUILESS_MAIN(LockfreeMapTest)
QTEST_GUILESS_MAIN(LockFreeMapTest)
......@@ -3,14 +3,11 @@
#include <QTest>
class LockfreeMapTest : public QObject
class LockFreeMapTest : public QObject
{
Q_OBJECT
private Q_SLOTS:
void testOperations();
void stressTestLockless();
void iteratorTest();
void testWrapper();
};
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment