DeadLock Detection

Date:     Updated:

카테고리:

태그:

인프런에 있는 루키스님의 게임 서버 강의를 듣고 정리한 내용입니다.


DeadLock Profiler


class DeadLockProfiler
{
public:
	void PushLock(const char* name);
	void PopLock(const char* name);
	void CheckCycle();

private:
	void Dfs(int32 index);

private:
	unordered_map<const char*, int32>	_nameToId;
	unordered_map<int32, const char*>	_idToName;
	stack<int32>						_lockStack;
	map<int32, set<int32>>				_lockHistory;

	Mutex _lock;

private:
	vector<int32>	_discoveredOrder;
	int32			_discoveredCount = 0;
	vector<bool>	_finished; 
	vector<int32>	_parent;
};

데드락 문제가 까다로운 이유는 로직에 문제가 있더라도 쓰레드의 개수가 적은 테스트 환경에서는 발생하지 않고, 실제 라이브 환경에서 문제가 발생하기 때문이다. 이러한 문제를 막기 위해서는 프로그램 로직 단에서 문제가 있는지 없는지 확인해야 한다. 다행히 DFS를 이용한 Cycle Detection in Directed Graph 알고리듬을 이용하면 데드락이 발생할 수 있는 상황을 미리 발견할 수 있다.


void Lock::WriteLock(const char* name) {
#if _DEBUG
	GDeadLockProfiler->PushLock(name);
#endif
}

void Lock::WriteUnlock(const char* name) {
#if _DEBUG
	GDeadLockProfiler->PopLock(name);
#endif
}
### CoreMacro ###
#define	READ_LOCK_IDX(idx)		ReadLockGuard readLockGuard_##idx(_locks[idx], typeid(this).name());
#define READ_LOCK				READ_LOCK_IDX(0)
#define	WRITE_LOCK_IDX(idx)		WriteLockGuard writeLockGuard_##idx(_locks[idx], typeid(this).name());

Profiler는 프로그램의 로직만 검사하면 되기 때문에 라이브 환경에서는 작동할 필요가 없다. 따라서 Debug 모드일 때만 DeadlockProfiler를 작동시킨다. 또한 어떤 lock에서 문제가 생겼는지 쉽게 알기 위해 lock을 잡은 클래스의 이름으로 lock을 관리한다.


PushLock

void DeadLockProfiler::PushLock(const char* name)
{
	LockGuard guard(_lock);

	int32 lockId = 0;

	auto findIt = _nameToId.find(name);
	if (findIt == _nameToId.end())
	{
		lockId = static_cast<int32>(_nameToId.size());
		_nameToId[name] = lockId;
		_idToName[lockId] = name;
	}

	else
	{
		lockId = findIt->second;
	}

	if (_lockStack.empty() == false)
	{
		const int32 prevId = _lockStack.top();
		if (lockId != prevId)
		{
			set<int32>& history = _lockHistory[prevId];
			if (history.find(lockId) == history.end())
			{
				history.insert(lockId);
				CheckCycle();
			}
		}
	}

	_lockStack.push(lockId);
}
  • Class name으로 lock 조회
    • 없으면 목록에 추가
    • lockId는 추가되는 lock의 순서
  • 이미 잡고 있는 lock이 있다면
    • lockStack.top과 같은 lock이면 패스(재귀적인 lock 허용)
    • 다르면 history 조회
    • 기존에 발견되지 않은 lock이라면 history에 추가하고 cycle 검사


PopLock

void DeadLockProfiler::PopLock(const char* name)
{
	LockGuard guard(_lock);

	if (_lockStack.empty())
		CRASH("MULTIPLE_UNLOCK");

	int32 lockId = _nameToId[name];
	if (_lockStack.top() != lockId)
		CRASH("INVALID_UNLOCK");

	_lockStack.pop();
}
  • Pop의 경우 아주 간단한데, 혹시 모르니 예외 상황에 Crash 발동


Cycle Detection

Detect Cycle in Directed Graph 작동 방식

void DeadLockProfiler::CheckCycle()
{
	const int32 lockCount = static_cast<int32>(_nameToId.size());
	_discoveredOrder = vector<int32>(lockCount, -1);
	_discoveredCount = 0;
	_finished = vector<bool>(lockCount, false);
	_parent = vector<int32>(lockCount, -1);

	for (int32 lockId = 0; lockId < lockCount; lockId++)
		Dfs(lockId);

	_discoveredOrder.clear();
	_finished.clear();
	_parent.clear();
}

void DeadLockProfiler::Dfs(int32 here)
{
	if (_discoveredOrder[here] != -1)
		return;

	_discoveredOrder[here] = _discoveredCount++;

	auto findIt = _lockHistory.find(here);
	if (findIt == _lockHistory.end())
	{
		_finished[here] = true;
		return;
	}

	set<int32>& nextSet = findIt->second;
	for (int32 there : nextSet)
	{
		if (_discoveredOrder[there] == -1)
		{
			_parent[there] = here;
			Dfs(there);
			continue;
		}

		if (_discoveredOrder[here] < _discoveredOrder[there])
			continue;
			
		if (_finished[there] == false)
		{
			CRASH("DEADLOCK_DETECTED");
		}
	}

	_finished[here] = true;
}
  • if (_discoveredOrder[here] != -1) : 이전에 탐지된 적 있으니 패스
  • if (findIt == _lockHistory.end()) : 해당 노드의 히스토리 탐색 & 없으면 더 체크할 것도 없음
  • for (int32 there : nextSet) : 히스토리에 들어있는 모든 노드 체크
    • if (_discoveredOrder[there] == -1) : 이전에 방문한 적이 없음 -> 방문
    • if (_discoveredOrder[here] < _discoveredOrder[there]) : here가 there보다 먼저 발견 (순방향)
    • if (_finished[there] == false) : there가 아직 stack에 남아있고, there가 here보다 먼저 발견 (역방향)
      • 데드락!



맨 위로 이동하기

Server 카테고리 내 다른 글 보러가기

댓글 남기기