Hi every body .
I can tried do a guide with tkinter python . In guide i process Serial and Video Stream,but Video Stream is slowly.
my source:
I can tried do a guide with tkinter python . In guide i process Serial and Video Stream,but Video Stream is slowly.
my source:
#!/usr/bin/env python import tkinter as tk import tkinter.ttk as ttk import tkinter.scrolledtext as tkscrolledtext import serial.tools.list_ports import cv2 import PIL.Image, PIL.ImageTk import time import os.path from os import path import serial import sys import _thread import webbrowser import smtplib from email.mime.base import MIMEBase from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email import encoders import datetime as dt import argparse from tkinter import filedialog from datetime import datetime from imutils.video import VideoStream from imutils import face_utils import numpy as np import imutils import dlib from keras.models import load_model from keras.preprocessing import image class App: def __init__(self, window, window_title, video_source=0): self.window = window self.load() self.window.title(window_title) self.window.geometry('1200x600') self.video_source = video_source self.vid = VideoCapture(self.video_source) self.ok = False self.serialPort = SerialPort() self.tab_control = ttk.Notebook(window) self.tab1 = ttk.Frame(self.tab_control) self.tab2 = ttk.Frame(self.tab_control) self.tab_control.add(self.tab1, text=' Configure ') self.tab_control.add(self.tab2, text=' Camera' ) self.tab_control.pack(expand = 1, fill = 'both') self.label_comport = tk.Label(self.tab1,width = 10,text = "COM Port:") self.label_comport.place(x = 10,y = 120) self.port_string = tk.StringVar() self.port = ttk.Entry(self.tab1, textvariable = self.port_string, width = 15) self.port.place(x = 100,y = 120) self.port.insert(0,self.line1) self.baud = "9600" self.button_openclose = tk.Button( self.tab1,text = "CONNECT",width = 10,command = self.OpenCommand) self.button_openclose.place(x = 240,y = 115) self.textbox = tkscrolledtext.ScrolledText(self.tab1, wrap = 'word', width=30, height = 2) self.textbox.place(x =100,y=170) self.label_mailserver = tk.Label(self.tab1, width = 10,text = "Mail Server:") self.label_mailserver.place(x = 10,y = 240) self.mailserver = tk.StringVar() self.mailserver = ttk.Entry( self.tab1, textvariable = self.mailserver, width = 30) self.mailserver.place(x = 100,y = 240) self.mailserver.insert(0,self.line2) self.label_mk = tk.Label(self.tab1, width = 10,text = "MK mail: ") self.label_mk.place(x = 10 ,y = 270) self.mk = tk.StringVar() self.mk = ttk.Entry( self.tab1, width = 30, textvariable = self.mk,show = '*') self.mk.place(x = 100,y = 270) self.mk.insert(0,self.line3) self.label_mailclient = tk.Label(self.tab1, width = 10,text = "Mail Client:") self.label_mailclient.place(x = 10,y = 300) self.mailclient = tk.StringVar() self.mailclient = ttk.Entry( self.tab1, textvariable = self.mailclient, width = 30) self.mailclient.place(x = 100,y = 300) self.mailclient.insert(0,self.line4) self.timer=ElapsedTimeClock(self.tab2) self.canvas = tk.Canvas(self.tab2, width = self.vid.width, height = self.vid.height) self.canvas.pack() # Button that lets the user take a snapshot self.btn_snapshot=tk.Button(self.tab2, width = 20 ,text="Snapshot", command=self.snapshot) self.btn_snapshot.pack(side=tk.LEFT) self.btn_start=tk.Button(self.tab2,width = 20 ,text='START', command=self.open_camera) self.btn_start.pack(side=tk.LEFT) self.btn_stop=tk.Button(self.tab2,width = 20 , text='STOP', command=self.close_camera) self.btn_stop.pack(side=tk.LEFT) def OnReceiveSerialData(message): print(message) str_message = message.decode("utf-8") if(message > b'37.5\r\n'): print(" Send ") self.textbox.insert('1.0', "Sent mail\r\n") self.image = self.snapshot() print(self.image) print(self.mailserver.get()) sentmail(self.mailserver.get(),"ductranvan2369",self.mailclient.get(),os.path.dirname(__file__)+"/image/"+self.image) def sentmail( f , k , t , src ): fromaddr = f toaddr = t msg = MIMEMultipart() msg['From'] = fromaddr msg['To'] = toaddr msg['Subject'] = " TEST MAIL WARNING" body = " Cảnh báo nhân viên DTH nhiệt độ cao và dang bi ngáo " try: msg.attach(MIMEText(body, 'plain')) filename = src attachment = open(filename, "rb") part = MIMEBase('application', 'octet-stream') part.set_payload((attachment).read()) encoders.encode_base64(part) part.add_header('Content-Disposition', "attachment; filename= %s" % self.image) msg.attach(part) server = smtplib.SMTP('smtp.gmail.com', 587) server.starttls() server.login(fromaddr, k) text = msg.as_string() server.sendmail(fromaddr, toaddr, text) time.sleep(5) server.quit() except Exception as e: print(str(e)) self.serialPort.RegisterReceiveCallback(OnReceiveSerialData) self.delay = 50 self.update() self.window.mainloop() def update(self): # Get a frame from the video source ret, frame = self.vid.get_frame() if self.ok: self.vid.out.write(cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)) if ret: self.photo = PIL.ImageTk.PhotoImage(image = PIL.Image.fromarray(frame)) self.canvas.create_image(0, 0, image = self.photo, anchor = tk.NW) self.window.after(self.delay,self.update) def snapshot(self): # Get a frame from the video source ret,frame=self.vid.get_frame() if ret: os.chdir('image') cv2.imwrite("frame-"+time.strftime("%d-%m-%Y-%H-%M-%S")+".jpg",cv2.cvtColor(frame,cv2.COLOR_RGB2BGR)) os.chdir('..') return "frame-"+time.strftime("%d-%m-%Y-%H-%M-%S")+".jpg" def open_camera(self): self.ok = True self.timer.start() print("camera opened => Recording") def close_camera(self): self.ok = False self.timer.stop() print("camera closed => Not Recording") def OpenCommand(self): if self.button_openclose.cget("text") == 'CONNECT': self.save() self.comport = self.port.get() self.baudrate = self.baud self.note = self.serialPort.Open(self.comport,self.baudrate) self.button_openclose.config(text = 'DISCONNECT') self.textbox.insert('1.0', "CONNECT\r\n") self.textbox.insert('1.0', self.note) elif self.button_openclose.cget("text") == 'DISCONNECT': self.serialPort.Close() self.button_openclose.config(text = 'CONNECT') self.textbox.insert('1.0',"DISCONNECT\r\n") def c_open_file_old(self): self.rep = filedialog.askopenfilenames( parent=self.window, initialdir='C:/Users/MyPC/Documents/Python/test_guide', initialfile='', filetypes=[ ("VIDEO", "*.avi"), ("PNG", "*.png"), ("JPEG", "*.jpg"), ("All files", "*")]) print(self.rep) try: os.startfile(self.rep[0]) except IndexError: print("No file selected") def load(self): self.f = open('txt.txt', 'r') self.line1 = self.f.readline().strip() self.line2 = self.f.readline().strip() self.line3 = self.f.readline().strip() self.line4 = self.f.readline().strip() self.f.close() def save(self): info = \ self.port_string.get()+'\n' +\ self.mailserver.get() +'\n'+\ self.mk.get()+'\n'+\ self.mailclient.get() self.f = open('txt.txt', 'w') self.f.write(info) self.f.close() class VideoCapture: def __init__(self, video_source=0): # Open the video source self.vid = cv2.VideoCapture(video_source) if not self.vid.isOpened(): raise ValueError("Unable to open video source", video_source) args=CommandLineParser().args self.model = load_model('model.h5') self.model.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy']) print(1) self.face_cascade = cv2.CascadeClassifier('haarcascade_frontalface_default.xml') self.hogFaceDetector = dlib.get_frontal_face_detector() VIDEO_TYPE = { 'avi': cv2.VideoWriter_fourcc(*'XVID'), 'mp4': cv2.VideoWriter_fourcc(*'MP4V'), #'mp4': cv2.VideoWriter_fourcc(*'XVID'), } self.fourcc=VIDEO_TYPE[args.type[0]] STD_DIMENSIONS = { '480p': (640, 480), '720p': (1280, 720), '1080p': (1920, 1080), '4k': (3840, 2160), } res=STD_DIMENSIONS[args.res[0]] print(args.name,self.fourcc,res) os.chdir('video') self.out = cv2.VideoWriter(args.name[0]+time.strftime("%d-%m-%Y-%H-%M-%S")+'.'+args.type[0],self.fourcc,10,res) os.chdir('..') self.vid.set(3,res[0]) self.vid.set(4,res[1]) self.width,self.height = res def get_frame(self): if self.vid.isOpened(): ret, frame = self.vid.read() font = cv2.FONT_HERSHEY_SIMPLEX cv2.putText(frame,str(datetime(1, 1, 1).now()),(10,30), font, 1,(0,0,0),2,cv2.LINE_AA) faceRects = self.hogFaceDetector(frame, 0) faces = [] for faceRect in faceRects: x1 = faceRect.left() y1 = faceRect.top() x2 = faceRect.right() y2 = faceRect.bottom() face = frame[y1:y2,x1:x2] face = image.img_to_array(face) try: face = cv2.resize(face,dsize = (150,150)) except: break faces.append(face) #lprint("len(faces) = {0}".format(len(faces))) if(len(faces) > 0): face_img = np.array(faces) classes = self.model.predict_classes(face_img , batch_size=10) classes = classes[0][0] if classes == 0: cv2.rectangle(frame,(x1,y1),(x2,y2),(255,0,0),2) cv2.putText(frame, "No Mask",(x1,y1-5), cv2.FONT_HERSHEY_DUPLEX, 1, (0, 0, 255),1) if ret: return (ret, cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)) else: return (ret, None) else: return (ret, None) def __del__(self): if self.vid.isOpened(): self.vid.release() #self.out.release() cv2.destroyAllWindows() class ElapsedTimeClock: def __init__(self,window): self.T = tk.Label(window,text = '00:00:00',font = ('times', 12, 'bold')) self.T.pack() self.elapsedTime = dt.datetime(1,1,1) self.running = 0 self.lastTime = '' t = time.localtime() self.zeroTime = dt.timedelta(hours = t[3], minutes = t[4], seconds = t[5]) def tick(self): self.now = dt.datetime(1, 1, 1).now() self.elapsedTime = self.now - self.zeroTime self.time2 = self.elapsedTime.strftime('%H:%M:%S') if self.time2 != self.lastTime: self.lastTime = self.time2 self.T.config(text = self.time2) self.updwin = self.T.after(100, self.tick) def start(self): if not self.running: self.zeroTime = dt.datetime(1, 1, 1).now()-self.elapsedTime self.tick() self.running = 1 def stop(self): if self.running: self.T.after_cancel(self.updwin) self.elapsedTime = dt.datetime(1, 1, 1).now()-self.zeroTime self.time2 = self.elapsedTime self.running = 0 class CommandLineParser: def __init__(self): # Create object of the Argument Parser parser = argparse.ArgumentParser(description = 'Script to record videos') # required_arguments=parser.add_argument_group('Required command line arguments') # Only values is supporting for the tag --type. So nargs will be '1' to get parser.add_argument('--type', nargs = 1, default = ['avi'], type = str, help = 'Type of the video output: for now we have only AVI & MP4') # Only one values are going to accept for the tag --res. So nargs will be '1' parser.add_argument('--res', nargs = 1, default = ['480p'], type = str, help = 'Resolution of the video output: for now we have 480p, 720p, 1080p & 4k') # Only one values are going to accept for the tag --name. So nargs will be '1' parser.add_argument('--name', nargs = 1, default = ['output'], type = str, help = 'Enter Output video title/name') # Parse the arguments and get all the values in the form of namespace. # Here args is of namespace and values will be accessed through tag names self.args = parser.parse_args() class SerialPort: def __init__(self): self.comportName = "" self.baud = 0 self.timeout = None self.ReceiveCallback = None self.isopen = False self.receivedMessage = None self.serialport = serial.Serial() def __del__(self): try: if self.serialport.is_open(): self.serialport.close() except: print("Destructor error closing COM port: ", sys.exc_info()[0] ) def RegisterReceiveCallback(self,aReceiveCallback): self.ReceiveCallback = aReceiveCallback try: _thread.start_new_thread(self.SerialReadlineThread, ()) except: print("Error starting Read thread: ", sys.exc_info()[0]) def SerialReadlineThread(self): while True: try: if self.isopen: self.receivedMessage = self.serialport.readline() if self.receivedMessage != "": self.ReceiveCallback(self.receivedMessage) except: print("Error reading COM port: ", sys.exc_info()[0]) def IsOpen(self): return self.isopen def Open(self,portname,baudrate): if not self.isopen: # serialPort = 'portname', baudrate, bytesize = 8, parity = 'N', stopbits = 1, timeout = None, xonxoff = 0, rtscts = 0) self.serialport.port = portname self.serialport.baudrate = baudrate try: self.serialport.open() self.isopen = True return "CONNECTED \r\n" except: print("Error opening COM port: ", sys.exc_info()[0]) return "Error CONNECT \r\n" def Close(self): if self.isopen: try: self.serialport.close() self.isopen = False except: print("Close error closing COM port: ", sys.exc_info()[0]) def Send(self,message): if self.isopen: try: # Ensure that the end of the message has both \r and \n, not just one or the other newmessage = message.strip() newmessage += '\r\n' self.serialport.write(newmessage.encode('utf-8')) except: print("Error sending message: ", sys.exc_info()[0] ) else: return True else: return False def main(): if(path.exists('image') == False): os.mkdir('image') if(path.exists('video') == False): os.mkdir('video') # Create a window and pass it to the Application object App(tk.Tk(),'Video Recorder') main()