[ui] Nodes status monitoring simplification

* store status file last modification time on NodeChunk
* ChunksMonitor: don't stop the underlying thread when changing chunks, only modify the list of monitored files + use a mutex for thread-safety
This commit is contained in:
Yann Lanthony 2019-07-24 17:53:34 +02:00
parent 6b7065dd5d
commit f6a42cb86e
No known key found for this signature in database
GPG key ID: 519FAE6DF7A70642
2 changed files with 35 additions and 51 deletions

View file

@ -144,6 +144,7 @@ class NodeChunk(BaseObject):
self.range = range self.range = range
self.status = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion) self.status = StatusData(node.name, node.nodeType, node.packageName, node.packageVersion)
self.statistics = stats.Statistics() self.statistics = stats.Statistics()
self.statusFileLastModTime = -1
self._subprocess = None self._subprocess = None
# notify update in filepaths when node's internal folder changes # notify update in filepaths when node's internal folder changes
self.node.internalFolderChanged.connect(self.nodeFolderChanged) self.node.internalFolderChanged.connect(self.nodeFolderChanged)
@ -175,11 +176,13 @@ class NodeChunk(BaseObject):
oldStatus = self.status.status oldStatus = self.status.status
# No status file => reset status to Status.None # No status file => reset status to Status.None
if not os.path.exists(statusFile): if not os.path.exists(statusFile):
self.statusFileLastModTime = -1
self.status.reset() self.status.reset()
else: else:
with open(statusFile, 'r') as jsonFile: with open(statusFile, 'r') as jsonFile:
statusData = json.load(jsonFile) statusData = json.load(jsonFile)
self.status.fromDict(statusData) self.status.fromDict(statusData)
self.statusFileLastModTime = os.path.getmtime(statusFile)
if oldStatus != self.status.status: if oldStatus != self.status.status:
self.statusChanged.emit() self.statusChanged.emit()

View file

@ -4,7 +4,7 @@ import logging
import os import os
import time import time
from enum import Enum from enum import Enum
from threading import Thread, Event from threading import Thread, Event, Lock
from multiprocessing.pool import ThreadPool from multiprocessing.pool import ThreadPool
from PySide2.QtCore import Slot, QJsonValue, QObject, QUrl, Property, Signal, QPoint from PySide2.QtCore import Slot, QJsonValue, QObject, QUrl, Property, Signal, QPoint
@ -28,12 +28,13 @@ class FilesModTimePollerThread(QObject):
def __init__(self, parent=None): def __init__(self, parent=None):
super(FilesModTimePollerThread, self).__init__(parent) super(FilesModTimePollerThread, self).__init__(parent)
self._thread = None self._thread = None
self._mutex = Lock()
self._threadPool = ThreadPool(4) self._threadPool = ThreadPool(4)
self._stopFlag = Event() self._stopFlag = Event()
self._refreshInterval = 5 # refresh interval in seconds self._refreshInterval = 5 # refresh interval in seconds
self._files = [] self._files = []
def start(self, files): def start(self, files=None):
""" Start polling thread. """ Start polling thread.
Args: Args:
@ -42,14 +43,20 @@ class FilesModTimePollerThread(QObject):
if self._thread: if self._thread:
# thread already running, return # thread already running, return
return return
if not files:
# file list is empty
return
self._stopFlag.clear() self._stopFlag.clear()
self._files = files or [] self._files = files or []
self._thread = Thread(target=self.run) self._thread = Thread(target=self.run)
self._thread.start() self._thread.start()
def setFiles(self, files):
""" Set the list of files to monitor
Args:
files: the list of files to monitor
"""
with self._mutex:
self._files = files
def stop(self): def stop(self):
""" Request polling thread to stop. """ """ Request polling thread to stop. """
if not self._thread: if not self._thread:
@ -69,8 +76,12 @@ class FilesModTimePollerThread(QObject):
def run(self): def run(self):
""" Poll watched files for last modification time. """ """ Poll watched files for last modification time. """
while not self._stopFlag.wait(self._refreshInterval): while not self._stopFlag.wait(self._refreshInterval):
times = self._threadPool.map(FilesModTimePollerThread.getFileLastModTime, self._files) with self._mutex:
self.timesAvailable.emit(times) files = list(self._files)
times = self._threadPool.map(FilesModTimePollerThread.getFileLastModTime, files)
with self._mutex:
if files == self._files:
self.timesAvailable.emit(times)
class ChunksMonitor(QObject): class ChunksMonitor(QObject):
@ -85,49 +96,25 @@ class ChunksMonitor(QObject):
""" """
def __init__(self, chunks=(), parent=None): def __init__(self, chunks=(), parent=None):
super(ChunksMonitor, self).__init__(parent) super(ChunksMonitor, self).__init__(parent)
self.lastModificationRecords = dict() self.chunks = []
self._filesTimePoller = FilesModTimePollerThread(parent=self) self._filesTimePoller = FilesModTimePollerThread(parent=self)
self._filesTimePoller.timesAvailable.connect(self.compareFilesTimes) self._filesTimePoller.timesAvailable.connect(self.compareFilesTimes)
self._pollerOutdated = False self._filesTimePoller.start()
self.setChunks(chunks) self.setChunks(chunks)
def setChunks(self, chunks): def setChunks(self, chunks):
""" Set the list of chunks to monitor. """ """ Set the list of chunks to monitor. """
self._filesTimePoller.stop() self.chunks = chunks
self.clear() self._filesTimePoller.setFiles(self.statusFiles)
for chunk in chunks:
# initialize last modification times to current time for all chunks
self.lastModificationRecords[chunk] = time.time()
# For local use, handle statusChanged emitted directly from the node chunk
chunk.statusChanged.connect(self.onChunkStatusChanged)
self._pollerOutdated = True
self.chunkStatusChanged.emit(None, -1)
self._filesTimePoller.start(self.statusFiles)
self._pollerOutdated = False
def stop(self): def stop(self):
""" Stop the status files monitoring. """ """ Stop the status files monitoring. """
self._filesTimePoller.stop() self._filesTimePoller.stop()
def clear(self):
""" Clear the list of monitored chunks. """
for chunk in self.lastModificationRecords:
chunk.statusChanged.disconnect(self.onChunkStatusChanged)
self.lastModificationRecords.clear()
def onChunkStatusChanged(self):
""" React to change of status coming from the NodeChunk itself. """
chunk = self.sender()
assert chunk in self.lastModificationRecords
# update record entry for this file so that it's up-to-date on next timerEvent
# use current time instead of actual file's mtime to limit filesystem requests
self.lastModificationRecords[chunk] = time.time()
self.chunkStatusChanged.emit(chunk, chunk.status.status)
@property @property
def statusFiles(self): def statusFiles(self):
""" Get status file paths from current chunks. """ """ Get status file paths from current chunks. """
return [c.statusFile for c in self.lastModificationRecords.keys()] return [c.statusFile for c in self.chunks]
def compareFilesTimes(self, times): def compareFilesTimes(self, times):
""" """
@ -137,21 +124,12 @@ class ChunksMonitor(QObject):
Args: Args:
times: the last modification times for currently monitored files. times: the last modification times for currently monitored files.
""" """
if self._pollerOutdated: newRecords = dict(zip(self.chunks, times))
return for chunk, fileModTime in newRecords.items():
# update chunk status if last modification time has changed since previous record
newRecords = dict(zip(self.lastModificationRecords.keys(), times)) if fileModTime != chunk.statusFileLastModTime:
for chunk, previousTime in self.lastModificationRecords.items():
lastModTime = newRecords.get(chunk, -1)
# update chunk status if:
# - last modification time is more recent than previous record
# - file is no more available (-1)
if lastModTime > previousTime or (lastModTime == -1 != previousTime):
self.lastModificationRecords[chunk] = lastModTime
chunk.updateStatusFromCache() chunk.updateStatusFromCache()
chunkStatusChanged = Signal(NodeChunk, int)
class GraphLayout(QObject): class GraphLayout(QObject):
""" """
@ -270,7 +248,6 @@ class UIGraph(QObject):
self._graph = Graph('', self) self._graph = Graph('', self)
self._modificationCount = 0 self._modificationCount = 0
self._chunksMonitor = ChunksMonitor(parent=self) self._chunksMonitor = ChunksMonitor(parent=self)
self._chunksMonitor.chunkStatusChanged.connect(self.onChunkStatusChanged)
self._computeThread = Thread() self._computeThread = Thread()
self._running = self._submitted = False self._running = self._submitted = False
self._sortedDFSChunks = QObjectListModel(parent=self) self._sortedDFSChunks = QObjectListModel(parent=self)
@ -305,7 +282,11 @@ class UIGraph(QObject):
# Nothing has changed, return # Nothing has changed, return
if self._sortedDFSChunks.objectList() == chunks: if self._sortedDFSChunks.objectList() == chunks:
return return
for chunk in self._sortedDFSChunks:
chunk.statusChanged.disconnect(self.onChunkStatusChanged)
self._sortedDFSChunks.setObjectList(chunks) self._sortedDFSChunks.setObjectList(chunks)
for chunk in self._sortedDFSChunks:
chunk.statusChanged.connect(self.onChunkStatusChanged)
# provide ChunkMonitor with the update list of chunks # provide ChunkMonitor with the update list of chunks
self.updateChunkMonitor(self._sortedDFSChunks) self.updateChunkMonitor(self._sortedDFSChunks)
@ -393,7 +374,7 @@ class UIGraph(QObject):
node = [node] if node else None node = [node] if node else None
submitGraph(self._graph, os.environ.get('MESHROOM_DEFAULT_SUBMITTER', ''), node) submitGraph(self._graph, os.environ.get('MESHROOM_DEFAULT_SUBMITTER', ''), node)
def onChunkStatusChanged(self, chunk, status): def onChunkStatusChanged(self):
# update graph computing status # update graph computing status
running = any([ch.status.status == Status.RUNNING for ch in self._sortedDFSChunks]) running = any([ch.status.status == Status.RUNNING for ch in self._sortedDFSChunks])
submitted = any([ch.status.status == Status.SUBMITTED for ch in self._sortedDFSChunks]) submitted = any([ch.status.status == Status.SUBMITTED for ch in self._sortedDFSChunks])