Oct-28-2021, 09:10 AM
Watchdog is being used to send new
So essentially: the code works when one
Here is the code that initiates the watchdog code (this is in a different
.hdf5
files to a different program for use. The problem is that using Queue
only allows one file to be sent and watchdog doesn't send anymore files even when new files are added to the folder. I believe this has to do with Queue
using block=True
and this is problematic for Qt
but I am not sure how to solve the issue. I have tried to use just a normal list instead of using Queue
but since at the beginning nothing is in the list, I get an error since the program wants to already find an item in the list (example to this code is commented out below to help understand what I mean). So essentially: the code works when one
.hdf5
file is added to the folder but when other files are added, they do not get acknowledged by the code. Here is the code that initiates the watchdog code (this is in a different
.py
file:try: print('try to connect to event service ...') self.listener = watchdog_search.get_qt_listener() self.listener.listener.finishedRun.connect(self.on_finished_run) except Exception as e: print(e)And here is the
watchdog_search
code:import time import traceback import os import h5py import queue from typing import Union from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent from .tools.qt import QtCore class NewFileHandler(FileSystemEventHandler): """h5 file creation handler for Watchdog""" def __init__(self): super().__init__() self.file_queue = queue.Queue() #self.la = [] #here is the list code I was referring to. I wanted to try to avoid using Queue but the attempt didnt work since an empty list at the beginning gives an error # callback for File/Directory created event, called by Observer. def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]): if event.src_path[-4:] == "hdf5": # run callback with path string self.file_queue.put(event.src_path) #self.la.append(event.src_path) class ObserverWrapper(QtCore.QObject): # New LabBusSubscriber """Encapsulated Observer boilerplate""" if hasattr(QtCore, "QString"): finishedRun = QtCore.Signal(QtCore.QString, QtCore.QString) else: finishedRun = QtCore.Signal(str, str) can_listen = True def __init__(self, path: str):#, recursive=True): super().__init__() self.path = path self.observer = Observer() self.handler = NewFileHandler() self.observer.schedule(self.handler, path=path, recursive=True) self.start() def start(self): """ Starts observing for filesystem events. Runs self.routine() every 1 second. :param blocking: If true, blocks main thread until keyboard interrupt. """ self.observer.start() def stop(self): """ Stops the observer. When running self.start(blocking=True) then you don't need to call this. """ self.observer.stop() self.observer.join() def event(self, event): """Here we define what to do at which signal. In general we will transmit a QT signal to which the other components connect. """ print("EVENT", event) if isinstance(event, QtCore.QEvent): # make sure the QObject code reacts on QEvents return QtCore.QObject.event(self, event) self.finishedRun.emit(event[0], event[1]) def wait_for_file(self): """ Wait and Process newly created files """ max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished. # will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished. # Files are usually finished within 20-30 seconds # retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing #file_path = self.handler.la[-1] #this expects already at the beginning an entry but there is no file until one is populated into the folder file_path = self.handler.file_queue.get(block=True) file_name = os.path.basename(file_path) # try to open the file retry_count = 0 while True: try: file = h5py.File(file_path, "r") file.close() self.event([file_path, file_name]) break except OSError: if retry_count < max_retry_count: retry_count += 1 print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}") time.sleep(retry_interval_seconds) else: print(f"h5 file <{file_path}> reached max retry count, skipping") except Exception as err: print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ") traceback.print_exc() class QtEventSubscriber(QtCore.QThread): """The listener thread""" def __init__(self): QtCore.QThread.__init__(self) self.listener = ObserverWrapper("/home/shawn/Desktop/Arbeit/MPQ/test_image_analyzer_files/Test_Data/") self.listener.moveToThread(self) def run(self): if self.listener.can_listen: print("start listener thread") self.listener.wait_for_file() else: print("can't listen: no listener thread") pass _qtlistener = None def get_qt_listener(): global _qtlistener if _qtlistener is not None: return _qtlistener _qtlistener = QtEventSubscriber() _qtlistener.start() print(_qtlistener.listener.thread(), _qtlistener) return _qtlistener