I am trying to use
For completeness, here is the full code:
.hdf5
files once they are done writing (in my case, trying to emit them). But the problem is that I don't have a way to 1) test if they are finished writing and 2) then send them. The code that I have been trying to work with is follows:while True: event = self._q.get() while True: try: file = h5py.File(event.src_path, "r") file.close() self.new_file.emit(event.src_path, os.path.basename(event.src_path)) break except OSError: if retry_count < max_retry_count: retry_count += 1 print(f"h5 file <{event.src_path}> is locked, retrying {retry_count}/{max_retry_count}") time.sleep(retry_interval_seconds) else: print(f"h5 file <{event.src_path}> reached max retry count, skipping") except Exception as err: print(f"Got unexpected Error <{type(err).__name__}> while opening <{event.src_path}> ") traceback.print_exc()Obviously this is problematic with the
break
. But without the break, the try
stays in the loop and emits the same file over and over again. This code tests if they are done writing perfectly but the ability to send them and continue to take in new files does not work. Any insight is greatly appreciated. For completeness, here is the full code:
import time import traceback import os import h5py import queue from typing import Union from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent from .tools.qt import QtCore from PyQt5.QtCore import pyqtSignal from PyQt5.QtWidgets import QApplication from PyQt5.QtCore import ( QObject, QThread, pyqtSignal, pyqtSlot, ) class NewFileHandler(FileSystemEventHandler): def __init__(self, q, *a, **k): super().__init__(*a, **k) self._q = q def on_created(self, event): self._q.put(event) class Worker(QObject): new_file = pyqtSignal(str,str) def __init__(self, path): super().__init__() self._q = queue.Queue() observer = Observer() handler = NewFileHandler(self._q) observer.schedule(handler, path=path, recursive=True) # starts a background thread! Thus we need to wait for the # queue to receive the events in work. observer.start() def work(self): max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished. retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing retry_count = 0 while True: event = self._q.get() while True: try: file = h5py.File(event.src_path, "r") file.close() self.new_file.emit(event.src_path, os.path.basename(event.src_path)) except OSError: if retry_count < max_retry_count: retry_count += 1 print(f"h5 file <{event.src_path}> is locked, retrying {retry_count}/{max_retry_count}") time.sleep(retry_interval_seconds) else: print(f"h5 file <{event.src_path}> reached max retry count, skipping") except Exception as err: print(f"Got unexpected Error <{type(err).__name__}> while opening <{event.src_path}> ") traceback.print_exc()This code is ran in a
main.py
file by the following code:thread = QThread(parent=self) print('try to connect to event service ...') worker = watchdog_search.Worker("/home/test_image_analyzer_files/Test_Data/") worker.moveToThread(thread) thread.started.connect(worker.work) thread.start() worker.new_file.connect(self.on_finished_run)