[idd] helper/PipeClient: use event and async I/O to interrupt read

An event, `m_signal`, is created and signalled when either `m_running` or
`m_connected` is changed by another thread, so that the pipe thread knows
to interrupt the read.

The pipe is now opened as async to allow interruption, and the I/O
operations now use overlapped I/O.

Other changes include:
* Changing `m_pipe` to `HandleT<HANDLETraits>` since `CreateFile` returns
  `INVALID_HANDLE_VALUE` instead of `NULL` on error.
* Remove the call to `WaitNamedPipeA` because it's useless and returns
  immediately without waiting if the pipe doesn't exist.
This commit is contained in:
Quantum
2025-09-13 22:58:26 -04:00
committed by Geoffrey McRae
parent eff8555f9b
commit 8c3a2d01bc
2 changed files with 76 additions and 21 deletions

View File

@@ -1,4 +1,4 @@
/**
/**
* Looking Glass
* Copyright © 2017-2025 The Looking Glass Authors
* https://looking-glass.io
@@ -29,12 +29,20 @@ CPipeClient g_pipe;
bool CPipeClient::Init()
{
DeInit();
if (!IsLGIddDeviceAttached())
{
DEBUG_ERROR("Looking Glass Indirect Display Device not found");
return false;
}
m_signal.Attach(CreateEvent(NULL, TRUE, FALSE, NULL));
if (!m_signal.IsValid())
{
DEBUG_ERROR_HR(GetLastError(), "Failed to create pipe signal event");
return false;
}
m_running = true;
m_thread.Attach(CreateThread(
NULL,
@@ -56,9 +64,12 @@ bool CPipeClient::Init()
void CPipeClient::DeInit()
{
m_connected = false;
m_running = false;
if (m_signal.IsValid())
SetEvent(m_signal.Get());
if (m_thread.IsValid())
{
m_running = false;
WaitForSingleObject(m_thread.Get(), INFINITE);
m_thread.Close();
}
@@ -68,6 +79,8 @@ void CPipeClient::DeInit()
FlushFileBuffers(m_pipe.Get());
m_pipe.Close();
}
m_signal.Close();
}
bool CPipeClient::IsLGIddDeviceAttached()
@@ -145,6 +158,7 @@ void CPipeClient::WriteMsg(const LGPipeMsg& msg)
{
DEBUG_WARN_HR(err, "Client disconnected, failed to write");
m_connected = false;
SetEvent(m_signal.Get());
return;
}
@@ -158,50 +172,85 @@ void CPipeClient::WriteMsg(const LGPipeMsg& msg)
void CPipeClient::Thread()
{
DEBUG_INFO("Pipe thread started");
HandleT<EventTraits> ioEvent(CreateEvent(NULL, TRUE, FALSE, NULL));
if (!ioEvent.IsValid())
{
DEBUG_ERROR("Can't create event for overlapped I/O!");
WaitForSingleObject(m_signal.Get(), 5000);
return;
}
while (m_running)
{
if (!WaitNamedPipeA(LG_PIPE_NAME, 5000))
if (!IsLGIddDeviceAttached())
{
if (!IsLGIddDeviceAttached())
{
m_running = false;
DEBUG_ERROR("Device is no longer available, shutting down");
break;
}
continue;
m_running = false;
DEBUG_ERROR("Device is no longer available, shutting down");
break;
}
m_pipe.Attach(CreateFileA(
LG_PIPE_NAME,
m_pipe.Attach(CreateFile(
TEXT(LG_PIPE_NAME),
GENERIC_READ | GENERIC_WRITE,
0,
NULL,
OPEN_EXISTING,
0,
FILE_FLAG_OVERLAPPED,
NULL
));
if (!m_pipe.IsValid())
{
DEBUG_ERROR_HR(GetLastError(), "Failed to open the named pipe");
WaitForSingleObject(m_signal.Get(), 5000);
continue;
}
m_connected = true;
DEBUG_INFO("Pipe connected");
while (m_running && m_connected)
{
LGPipeMsg msg;
DWORD bytesRead;
if (!ReadFile(m_pipe.Get(), &msg, sizeof(msg), &bytesRead, NULL))
OVERLAPPED overlapped = { 0 };
overlapped.hEvent = ioEvent.Get();
if (!ReadFile(m_pipe.Get(), &msg, sizeof(msg), NULL, &overlapped))
{
DEBUG_ERROR_HR(GetLastError(), "ReadFile Failed");
DWORD dwError = GetLastError();
if (dwError != ERROR_IO_PENDING)
{
DEBUG_ERROR_HR(dwError, "ReadFile Failed");
break;
}
HANDLE hWait[] = { ioEvent.Get(), m_signal.Get() };
switch (WaitForMultipleObjects(2, hWait, FALSE, INFINITE))
{
case WAIT_OBJECT_0:
break;
case WAIT_OBJECT_0 + 1:
DEBUG_INFO("I/O interrupted by signal");
CancelIo(m_pipe.Get());
WaitForSingleObject(ioEvent.Get(), INFINITE);
continue;
}
}
DWORD bytesRead;
GetOverlappedResult(m_pipe.Get(), &overlapped, &bytesRead, TRUE);
if (bytesRead != sizeof(msg))
{
DEBUG_ERROR("Corrupted data, expected %lld bytes, read %lld bytes", sizeof msg, bytesRead);
break;
}
if (bytesRead != sizeof(msg) || msg.size != sizeof(msg))
if (msg.size != sizeof(msg))
{
DEBUG_ERROR("Corrupted data");
DEBUG_ERROR("Corrupted data, expected %lld bytes, actual message size: %lld bytes", sizeof msg, msg.size);
break;
}
@@ -224,7 +273,11 @@ void CPipeClient::Thread()
m_pipe.Close();
m_connected = false;
DEBUG_INFO("Pipe closed");
if (m_running)
ResetEvent(m_signal.Get());
}
DEBUG_INFO("Pipe thread shutdown");
}

View File

@@ -1,4 +1,4 @@
/**
/**
* Looking Glass
* Copyright © 2017-2025 The Looking Glass Authors
* https://looking-glass.io
@@ -33,8 +33,10 @@ using namespace Microsoft::WRL::Wrappers::HandleTraits;
class CPipeClient
{
private:
HandleT<HANDLENullTraits> m_pipe;
HandleT<HANDLETraits> m_pipe;
HandleT<HANDLENullTraits> m_thread;
HandleT<EventTraits> m_signal;
bool m_running = false;
bool m_connected = false;