Python Forum
Error when using Watchdog and Qt to keep open a Queue before and after sending first
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Error when using Watchdog and Qt to keep open a Queue before and after sending first
#1
Watchdog is being used to send new .hdf5 files to a different program for use. The problem is that using Queue only allows one file to be sent and watchdog doesn't send anymore files even when new files are added to the folder. I believe this has to do with Queue using block=True and this is problematic for Qt but I am not sure how to solve the issue. I have tried to use just a normal list instead of using Queue but since at the beginning nothing is in the list, I get an error since the program wants to already find an item in the list (example to this code is commented out below to help understand what I mean).

So essentially: the code works when one .hdf5 file is added to the folder but when other files are added, they do not get acknowledged by the code.

Here is the code that initiates the watchdog code (this is in a different .py file:

        try:
            print('try to connect to event service ...')
            self.listener = watchdog_search.get_qt_listener() 
            self.listener.listener.finishedRun.connect(self.on_finished_run)
        except Exception as e:
            print(e)
And here is the watchdog_search code:
    import time
    import traceback
    import os
    
    import h5py
    import queue
    from typing import Union
    
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent
    
    from .tools.qt import QtCore
    
    
    class NewFileHandler(FileSystemEventHandler):
        """h5 file creation handler for Watchdog"""
    
        def __init__(self):
            super().__init__()
            self.file_queue = queue.Queue()
            #self.la = [] #here is the list code I was referring to. I wanted to try to avoid using Queue but the attempt didnt work since an empty list at the beginning gives an error
    
        # callback for File/Directory created event, called by Observer.
        def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
            if event.src_path[-4:] == "hdf5":
                # run callback with path string
                self.file_queue.put(event.src_path)
                #self.la.append(event.src_path)
    
    class ObserverWrapper(QtCore.QObject): # New LabBusSubscriber
        """Encapsulated Observer boilerplate"""
        if hasattr(QtCore, "QString"):
            finishedRun = QtCore.Signal(QtCore.QString, QtCore.QString)
        else:
            finishedRun = QtCore.Signal(str, str)
        can_listen = True
        def __init__(self, path: str):#, recursive=True):
            super().__init__()
            self.path = path    
            self.observer = Observer()
            self.handler = NewFileHandler()
            self.observer.schedule(self.handler, path=path, recursive=True)
            self.start()
    
        def start(self):
            """
            Starts observing for filesystem events. Runs self.routine() every 1 second.
    
            :param blocking: If true, blocks main thread until keyboard interrupt.
            """
    
            self.observer.start()
        def stop(self):
            """
            Stops the observer. When running self.start(blocking=True) then you don't need to call this.
            """
    
            self.observer.stop()
            self.observer.join()
    
        def event(self, event):
            """Here we define what to do at which signal.
            In general we will transmit a QT signal to which the other components connect.
            """
            print("EVENT", event)
            if isinstance(event, QtCore.QEvent):
                # make sure the QObject code reacts on QEvents
                return QtCore.QObject.event(self, event)
            self.finishedRun.emit(event[0], event[1])
        
        def wait_for_file(self):
            """
            Wait and Process newly created files
            """
    
            max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished.
            # will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished.
            # Files are usually finished within 20-30 seconds
            #
            retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing
    

            #file_path = self.handler.la[-1] #this expects already at the beginning an entry but there is no file until one is populated into the folder
            file_path = self.handler.file_queue.get(block=True)
            file_name = os.path.basename(file_path)
    
            # try to open the file
            retry_count = 0
            while True:
                try:
                    file = h5py.File(file_path, "r")
                    file.close()
                    self.event([file_path, file_name])
                    break
                except OSError:
                    if retry_count < max_retry_count:
                        retry_count += 1
                        print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
                        time.sleep(retry_interval_seconds)
                    else:
                        print(f"h5 file <{file_path}> reached max retry count, skipping")
    
                except Exception as err:
                    print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
                    traceback.print_exc()
    
    class QtEventSubscriber(QtCore.QThread):
        """The listener thread"""
    
        def __init__(self):
            QtCore.QThread.__init__(self)
            self.listener = ObserverWrapper("/home/shawn/Desktop/Arbeit/MPQ/test_image_analyzer_files/Test_Data/")
            self.listener.moveToThread(self)
    
        def run(self):
            if self.listener.can_listen:
                print("start listener thread")
                self.listener.wait_for_file()
            else:
                print("can't listen: no listener thread")
                pass
    
    
    _qtlistener = None
    
    
    def get_qt_listener():
        global _qtlistener
        if _qtlistener is not None:
            return _qtlistener
        _qtlistener = QtEventSubscriber()
        _qtlistener.start()
        print(_qtlistener.listener.thread(), _qtlistener)
        return _qtlistener
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  watchdog on_modified CAD79 3 227 Apr-24-2024, 06:23 PM
Last Post: deanhystad
  Coding error. Can't open directory EddieG 6 1,138 Jul-13-2023, 06:47 PM
Last Post: deanhystad
  How would I use Watchdog to get triggered when DVD is inserted? Daring_T 12 4,774 Aug-17-2021, 01:49 PM
Last Post: Daring_T
  Error on open of file created with tempfile.TemporaryDirectory() Brian177 4 6,310 Apr-05-2021, 07:12 PM
Last Post: Brian177
  task queue Valon1981 8 3,614 Jul-07-2020, 07:41 AM
Last Post: freeman
  Queue in Pygame constantin01 1 3,697 Jan-07-2020, 04:02 PM
Last Post: metulburr
  Queue maxsize mr_byte31 2 4,568 Sep-03-2019, 07:02 PM
Last Post: mr_byte31
  ReShapping error while using open cv in python barry76 0 1,877 Apr-03-2019, 12:40 PM
Last Post: barry76
  Occasional error when taking an item off a queue Wiggy 1 2,108 Jan-19-2019, 11:22 PM
Last Post: Gribouillis
  How to search and open an error file whose entity id is stored in hbase table lravikumarvsp 2 2,875 May-08-2018, 07:39 AM
Last Post: nilamo

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020