Commit 2736c466 authored by Andrey Kamakin's avatar Andrey Kamakin

Simplified QSBR and enabled.

Ref T8874
parent 1106ee22
......@@ -12,6 +12,7 @@
#define CONCURRENTMAP_H
#include "leapfrog.h"
#include "tiles3/kis_lockless_stack.h"
template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
class ConcurrentMap
......@@ -26,8 +27,7 @@ public:
private:
Atomic<typename Details::Table*> m_root;
// QSBR m_gc;
GarbageCollector<Property> m_gc;
QSBR m_gc;
public:
ConcurrentMap(quint64 capacity = Details::InitialSize) : m_root(Details::Table::create(capacity))
......@@ -40,16 +40,16 @@ public:
table->destroy();
}
// QSBR &getGC()
// {
// return m_gc;
// }
GarbageCollector<Property> &getGC()
QSBR &getGC()
{
return m_gc;
}
bool migrationInProcess()
{
return (quint64) m_root.loadNonatomic()->jobCoordinator.loadConsume() != 1;
}
// publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
// after all the threads participating in the migration have completed their work.
void publishTableMigration(typename Details::TableMigration* migration)
......
......@@ -93,7 +93,7 @@ struct Leapfrog {
}
};
class TableMigration : public SimpleJobCoordinator::Job, public Property
class TableMigration : public SimpleJobCoordinator::Job
{
public:
struct Source {
......@@ -127,26 +127,20 @@ struct Leapfrog {
}
virtual ~TableMigration() override
{
}
void destroy()
{
// Destroy all source tables.
for (quint64 i = 0; i < m_numSources; i++) {
if (getSources()[i].table) {
for (quint64 i = 0; i < m_numSources; i++)
if (getSources()[i].table)
getSources()[i].table->destroy();
}
}
// Delete the migration object itself.
this->TableMigration::~TableMigration();
std::free(this);
}
// void destroy()
// {
// // Destroy all source tables.
// for (quint64 i = 0; i < m_numSources; i++)
// if (getSources()[i].table)
// getSources()[i].table->destroy();
// // Delete the migration object itself.
// this->TableMigration::~TableMigration();
// std::free(this);
// }
Source* getSources() const
{
return (Source*)(this + 1);
......@@ -547,7 +541,7 @@ endMigration:
}
// We're done with this TableMigration. Queue it for GC.
// m_map.getGC().enqueue(this);
m_map.getGC().enqueue(&TableMigration::destroy, this);
}
#endif // LEAPFROG_H
......@@ -17,88 +17,6 @@
#define CALL_MEMBER(obj, pmf) ((obj).*(pmf))
struct Property {
Property() = default;
virtual ~Property() = default;
};
template <class T>
class GarbageCollector
{
public:
~GarbageCollector()
{
cleanUpNodes();
}
void enqueue(T *data)
{
m_rawPointerUsers.ref();
Node *newNode = new Node(data);
Node *head;
do {
head = m_freeNodes.loadAcquire();
newNode->next = head;
} while (!m_freeNodes.testAndSetRelease(head, newNode));
m_rawPointerUsers.deref();
}
void update()
{
m_rawPointerUsers.ref();
if (m_rawPointerUsers == 1) {
Node *head = m_freeNodes.loadAcquire();
if (!(reinterpret_cast<std::uintptr_t>(head) & 1)) {
if (m_freeNodes.testAndSetOrdered(head, reinterpret_cast<Node *>(reinterpret_cast<std::uintptr_t>(head) | 1))) {
if (m_rawPointerUsers == 1) {
if (m_freeNodes.testAndSetOrdered(reinterpret_cast<Node *>(reinterpret_cast<std::uintptr_t>(head) | 1), 0)) {
cleanUpNodes();
}
} else {
m_freeNodes.testAndSetOrdered(reinterpret_cast<Node *>(reinterpret_cast<std::uintptr_t>(head) | 1), head);
}
}
}
}
m_rawPointerUsers.deref();
}
private:
struct Node {
Node(T *data) : d(data) {}
Node *next;
T* d;
};
inline void cleanUpNodes()
{
Node *head = m_freeNodes.fetchAndStoreOrdered(0);
if (head) {
freeList(head);
}
}
inline void freeList(Node *first)
{
Node *next;
while (first) {
next = first->next;
delete first;
first = next;
}
}
private:
QAtomicInt m_rawPointerUsers;
QAtomicPointer<Node> m_freeNodes;
};
class QSBR
{
private:
......@@ -120,89 +38,10 @@ private:
}
};
struct Status {
qint16 inUse : 1;
qint16 wasIdle : 1;
qint16 nextFree : 14;
Status() : inUse(1), wasIdle(0), nextFree(0)
{
}
};
QMutex m_mutex;
QVector<Status> m_status;
qint64 m_freeIndex;
qint64 m_numContexts;
qint64 m_remaining;
QVector<Action> m_deferredActions;
QVector<Action> m_pendingActions;
void onAllQuiescentStatesPassed(QVector<Action>& actions)
{
// m_mutex must be held
actions.swap(m_pendingActions);
m_pendingActions.swap(m_deferredActions);
m_remaining = m_numContexts;
for (qint32 i = 0; i < m_status.size(); i++) {
m_status[i].wasIdle = 0;
}
}
QVector<Action> m_actions;
public:
typedef qint16 Context;
QSBR() : m_freeIndex(-1), m_numContexts(0), m_remaining(0)
{
}
Context createContext()
{
QMutexLocker guard(&m_mutex);
m_numContexts++;
m_remaining++;
Q_ASSERT(m_numContexts < (1 << 14));
qint64 context = m_freeIndex;
if (context >= 0) {
Q_ASSERT(context < (qint64) m_status.size());
Q_ASSERT(!m_status[context].inUse);
m_freeIndex = m_status[context].nextFree;
m_status[context] = Status();
} else {
context = m_status.size();
m_status.append(Status());
}
return context;
}
void destroyContext(QSBR::Context context)
{
QVector<Action> actions;
{
QMutexLocker guard(&m_mutex);
Q_ASSERT(context < m_status.size());
if (m_status[context].inUse && !m_status[context].wasIdle) {
Q_ASSERT(m_remaining > 0);
--m_remaining;
}
m_status[context].inUse = 0;
m_status[context].nextFree = m_freeIndex;
m_freeIndex = context;
m_numContexts--;
if (m_remaining == 0) {
onAllQuiescentStatesPassed(actions);
}
}
for (qint32 i = 0; i < actions.size(); i++) {
actions[i]();
}
}
template <class T>
void enqueue(void (T::*pmf)(), T* target)
......@@ -220,54 +59,27 @@ public:
Closure closure = {pmf, target};
QMutexLocker guard(&m_mutex);
m_deferredActions.append(Action(Closure::thunk, &closure, sizeof(closure)));
m_actions.append(Action(Closure::thunk, &closure, sizeof(closure)));
}
void update(QSBR::Context context)
void update()
{
QVector<Action> actions;
{
QMutexLocker guard(&m_mutex);
Q_ASSERT(context < m_status.size());
Status& status = m_status[context];
Q_ASSERT(status.inUse);
if (status.wasIdle) {
return;
}
status.wasIdle = 1;
Q_ASSERT(m_remaining > 0);
if (--m_remaining > 0) {
return;
}
onAllQuiescentStatesPassed(actions);
actions.swap(m_actions);
}
for (qint32 i = 0; i < actions.size(); i++) {
actions[i]();
for (auto &action : actions) {
action();
}
}
void flush()
{
// This is like saying that all contexts are quiescent,
// so we can issue all actions at once.
// No lock is taken.
for (qint32 i = 0; i < m_pendingActions.size(); i++) {
m_pendingActions[i]();
for (auto &action : m_actions) {
action();
}
m_pendingActions.clear();
for (qint32 i = 0; i < m_deferredActions.size(); i++) {
m_deferredActions[i]();
}
m_deferredActions.clear();
m_remaining = m_numContexts;
}
};
......
......@@ -28,36 +28,44 @@ public:
TileTypeSP insert(quint32 key, TileTypeSP value)
{
TileTypeSP::ref(&value, value.data());
TileType *result = m_map.assign(key, value.data());
TileTypeSP result = m_map.assign(key, value.data());
if (result) {
m_map.getGC().enqueue(new MemoryReclaimer(result));
if (result.data()) {
m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(result.data()));
} else {
m_numTiles.fetchAndAddRelaxed(1);
}
// m_map.getGC().update();
return TileTypeSP(result);
if (!m_map.migrationInProcess()) {
m_map.getGC().update();
}
return result;
}
TileTypeSP erase(quint32 key)
{
TileType *result = m_map.erase(key);
TileTypeSP ptr(result);
TileTypeSP result = m_map.erase(key);
if (result) {
m_numTiles.fetchAndSubRelaxed(1);
m_map.getGC().enqueue(new MemoryReclaimer(result));
m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(result.data()));
}
// m_map.getGC().update();
return ptr;
if (!m_map.migrationInProcess()) {
m_map.getGC().update();
}
return result;
}
TileTypeSP get(quint32 key)
{
TileTypeSP result(m_map.get(key));
// m_map.getGC().update();
if (!m_map.migrationInProcess()) {
m_map.getGC().update();
}
return result;
}
......@@ -122,11 +130,14 @@ public:
private:
static inline quint32 calculateHash(qint32 col, qint32 row);
struct MemoryReclaimer : public Property {
struct MemoryReclaimer {
MemoryReclaimer(TileType *data) : d(data) {}
~MemoryReclaimer()
void destroy()
{
TileTypeSP::deref(reinterpret_cast<TileTypeSP *>(this), d);
this->MemoryReclaimer::~MemoryReclaimer();
delete this;
}
private:
......@@ -218,7 +229,8 @@ KisTileHashTableTraits2<T>::KisTileHashTableTraits2(const KisTileHashTableTraits
template <class T>
KisTileHashTableTraits2<T>::~KisTileHashTableTraits2()
{
// clear();
clear();
m_map.getGC().flush();
}
template<class T>
......@@ -248,10 +260,10 @@ typename KisTileHashTableTraits2<T>::TileTypeSP KisTileHashTableTraits2<T>::getT
tile = new TileType(col, row, m_defaultTileData, m_mementoManager);
}
TileTypeSP::ref(&tile, tile.data());
TileType *result = mutator.exchangeValue(tile.data());
TileTypeSP result = mutator.exchangeValue(tile.data());
if (result) {
m_map.getGC().enqueue(new MemoryReclaimer(result));
if (result.data()) {
m_map.getGC().enqueue(&MemoryReclaimer::destroy, new MemoryReclaimer(result.data()));
} else {
m_numTiles.fetchAndAddRelaxed(1);
}
......@@ -262,7 +274,10 @@ typename KisTileHashTableTraits2<T>::TileTypeSP KisTileHashTableTraits2<T>::getT
tile = mutator.getValue();
}
// m_map.getGC().update();
if (!m_map.migrationInProcess()) {
m_map.getGC().update();
}
return tile;
}
......@@ -278,7 +293,10 @@ typename KisTileHashTableTraits2<T>::TileTypeSP KisTileHashTableTraits2<T>::getR
tile = new TileType(col, row, m_defaultTileData, 0);
}
// m_map.getGC().update();
if (!m_map.migrationInProcess()) {
m_map.getGC().update();
}
return tile;
}
......@@ -351,7 +369,6 @@ quint32 KisTileHashTableTraits2<T>::calculateHash(qint32 col, qint32 row)
boost::hash_combine(seed, col);
boost::hash_combine(seed, row);
return seed;
// return (((row << 5) + (col & 0x1F)) & 0x3FF) + 1;
}
typedef KisTileHashTableTraits2<KisTile> KisTileHashTable;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment