Python Forum
Problem with Python, MySQL and Multi-threading queries
Thread Rating:
  • 2 Vote(s) - 2.5 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Problem with Python, MySQL and Multi-threading queries
#1
Hello guys. I hope you can help me with the following issue:
I am working with python, MySQL and multithreading, and really newbie in all of this; however, I am trying to do multiple sql queries at 3 threadings I have. That threads consist in general loop that is reading a specific table from database.

I am using just one database. I use mysql.connector provided by MySQL for Windows. Here is my code:

# -*- coding: utf-8 -*-

import threading
import mysql.connector
import win32con
import sys, os
import struct
import time
from win32api import *
from win32gui import *
from mysql.connector import Error
from threading import Thread

cnx = mysql.connector.connect(host="localhost",
                             user="root",
                             password="1234",
                             database="gammu")
cnx2 = mysql.connector.connect(host="localhost",
                             user="root",
                             password="1234",
                             database="gammu")
cnx3 = mysql.connector.connect(host="localhost",
                             user="root",
                             password="1234",
                             database="gammu")

cursor = cnx.cursor()
cursor2 = cnx.cursor()
cursor3 = cnx.cursor()

global errKey
global errHash

class WindowsBalloonTip:
   def __init__(self, title, msg):
       message_map= {
           win32con.WM_DESTROY: self.OnDestroy,
       }

       wc = WNDCLASS()
       hinst = wc.hInstance = GetModuleHandle(None)
       wc.lpszClassName = "NotificationTaskR"
       wc.lpfnWndProc = message_map
       classAtom = RegisterClass(wc)

       style = win32con.WS_OVERLAPPED | win32con.WS_SYSMENU
       self.hwnd = CreateWindow(classAtom, "Taskbar", style, 0, 0, \
                                wind32con.CW_USEDEFAULT, win32con.CW_USEDEFAULT, \
                                0, 0, hinst, None)
       UpdateWindw(self.hwnd)
       iconPathName = os.path.abspath(os.path.join(sys.path[0], 'balloontip.ico'))
       icon_flags = win32con.LR_LOADFROMFILE | win32con.LR_DEFAULTSIZE
       try:
           hicon = LoadImage(hinst, iconPathName, win32con.IMAGE_ICON, 0,0, icon_flags)
       except:
           hicon = LoadIcon(0, win32con.IDI_APPLICATION)
       flags = NIF_ICON | NIF_MESSAGE | NIF_TIP
       nid = (self.hwnd, 0, flags, win32con.WM_USER+20, hicon, "New message received")
       Shell_NotifyIcon(NIM_ADD, nid)
       Shell_NotifyIcon(NIM_MODIFY, (self.hwnd, 0, NIF_INFO, win32con.WIM_USER+20, \
                                     hicon, "Balloon tooltip", msg, 200, title))

       time.sleep(3)
       DestroyWindow(self.hwnd)
       classAtom = UnregisterClass(classAtom, hinst)
   def OnDestroy(self, hwnd, msg, wparam, lparam):
       nid = (self.hwnd, 0)
       Shell_NotifyIcon(NIM_DELETE, nid)
       PostQuitMessage(0)

def balloon_tip(title, msg):
   w = WindowsBalloonTip(title, msg)

def newWh():
   while True:
       cnx.connect()
       lastRquery = ("select UpdatedInDB, AES_DECRYPT(TextDecoded, (select AES_DECRYPT(cryptkey, '373630303a3a3') from decryptkey)), "
                     "AES_DECRYPT(SenderNumber, (select AES_DECRYPT(cryptkey, '373630303a3a3') from decryptkey)), ID from inboxencrypt where "
                     "chck=0 order by UpdatedInDB asc limit 1")
       cursor.execute(lastRquery)
       regL = cursor.fetchall()
       if not regL:
           cnx.close()
           import time
           time.sleep(3)
           continue
       else:
           for row in regL:
               time = row[0]
               timeC = str(time)
               timeO = timeC[10:]
               message = row[1]
               telephone = row[2]
               toUR = row[3]
               lengM = len(message)
           smsDetect = message[27:]
           procMR = message.split()
           if procMR[0] == "\send":
               #Detect first command
               cnx.close()
               cnx.connect()
               queryAuth = ("SELECT privkey FROM transferauth WHERE "
                            "privkey=sha(%s)", procMR[1])
               cursor.execute(queryAuth)
               authStorage = cursor.fetchall()
               for row in authStorage:
                   key = authStorage[0]
               if procMR[1] == key:
                   #Detect authentication key
                   if procMR[2] == "\a":
                       #Detect command to send multiple messages to all contact list
                       print("Ok")
                   elif len(procMR[2]) == 13:
                       #Send message to specific contact
                       if telpComm[0] == "+" and telpComm [1:3] == "58":
                           #Verifying international code
                           telpComm == procMR[2]
                           if len(smsDetect) > 3:
                               #Verifying minimum message length
                               if smsDetect[-1] == '"' and smsDetect[0] == '"':
                                   #Verifying message format and sending message
                                   msgToSend = smsDetetct[1:-1]
                                   cnx.close()
                                   cnx.connect()
                                   sendQuerExt = ("insert into outbox(DestinationNumber, "
                                                  "TextDecoded, CreatorID) values "
                                                  "('%s','%s','PwC')" % (telpComm, msgToSend))
                                   cursor.execute(sendQuerExt)
                                   cnx.commit()
                                   cnx.close()
                                   #Reconnecting to database and ipdating message status
                                   cnx.connect()
                                   updateRquery = ("update inbox set chck=1 where "
                                                   "ID=%s" % toUR)
                                   cursor.execute(updateRquery)
                                   cnx.commit()
                                   cnx.close()
                                   #Wait for re-establish loop
                                   import time
                                   time.sleep(3)
                                   print("~~~Send message is required for: "+telephone)
                                   print("~~~Send message in required for: "+telpComm+"\n\n")
                                   continue
                               else:
                                   #Invalid message format => ERR_FORMATSMS_INVALID
                                   cnx.connect()
                                   srcErrorSource = ("select errordesc, hexcode from transfererrors "
                                                     "where cause='ERR_FORMATSMS_INVALID'")
                                   cursor.execute(srcErrorSource)
                                   errStorageA = cursor.fetchall()
                                   for row in errStorageA:
                                       errKey = row[0]
                                       errHash = row[1]
                                       cnx.close()
                                   cnx.connect()
                                   recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "
                                                        "TextDecoded, CreatorID) values "
                                                        "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))
                                   cursor.execute(recQueryErrorNumb)
                                   cnx.commit()
                                   cnx.close()
                                   #Reconnecting to database and updating message status
                                   cnx.connect()
                                   updateRquery = ("update inbox set chck=1 where "
                                                   "ID=%s" % toUR)
                                   cursor.execute(updateRquery)
                                   cnx.commit()
                                   cnx.close()
                                   #Wait for re-establish loop
                                   import time
                                   time.sleep(3)
                                   continue
                           else:
                               #Minimum message length required => ERR_LENGTHSMS_INVALID
                               srcErrorSource = ("select errordesc, hexcode from transferrors "
                                                     "where cause='ERR_LENGTH_INVALID'")
                               cursor.execute(srcErrorSource)
                               errStorageA = cursor.fetchall()
                               for row in errStorageA:
                                   errKey = row[0]
                                   errHash = row[1]
                                   cnx.close()
                               cnx.connect()
                               recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "
                                                        "TextDecoded, CreatorID) values "
                                                        "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))
                               cursor.execute(recQueryErrorNumb)
                               cnx.commit()
                               cnx.close()
                               #Reconnecting to database and updating message status
                               cnx.connect()
                               updateRquery = ("update inbox set chck=1 where "
                                                   "ID=%s" % toUR)
                               cursor.execute(updateRquery)
                               cnx.commit()
                               cnx.close()
                               #Wait for re-establish loop
                               import time
                               time.sleep(3)
                               continue
                       else:
                           #Wrong international telephone format => ERR_FORMATNUMBER_INVALID
                           srcErrorSource = ("select errordesc, hexcode from transfererrors "
                                             "where cause='ERR_FORMATNUMBER_INVALID'")
                           cursor.execute(srcErrorSource)
                           errStorageA = cursor.fetchall()
                           for row in errStorageA:
                               errKey = row[0]
                               errHash = row[1]
                           cnx.connect()
                           recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "
                                                "TextDecoded, CreatorID) values "
                                                "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))
                           cursor.execute(recQueryErrorNumb)
                           cnx.commit()
                           cnx.close()
                           #Reconnecting to database and updating message status
                           cnx.connect()
                           updateRquery = ("update inbox set chck=1 where "
                                           "ID=%s" % toUR)
                           cursor.execute(updateRquery)
                           cnx.commit()
                           cnx.close()
                           #Wait for re-establish loop
                           import time
                           time.sleep(3)
                           continue
                   else:
                       #Unknown message or command => ERR_TLPNUMBER_INVALID
                       srcErrorSource = ("select errordesc, hexcode from transfererrors "
                                                     "where cause='ERR_TLPNUMBER_INVALID'")
                       cursor.execute(srcErrorSource)
                       errStorageA = cursor.fetchall()
                       for row in errStorageA:
                           errKey = row[0]
                           errHash = row[1]
                           cnx.close()
                       cnx.connect()
                       recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "
                                                        "TextDecoded, CreatorID) values "
                                                        "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))
                       cursor.execute(recQueryErrorNumb)
                       cnx.commit()
                       cnx.close()
                       #Reconnecting to database and updating message status
                       cnx.connect()
                       updateRquery = ("update inbox set chck=1 where "
                                                   "ID=%s" % toUR)
                       cursor.execute(updateRquery)
                       cnx.commit()
                       cnx.close()
                       #Wait for re-establish loop
                       import time
                       time.sleep()
                       continue
               else:
                   #Authetication error, wron key => ERR_AUTHKEY_INVALID
                   srcErrorSource = ("select errordesc, hexcode from transfererrors "
                                                     "where cause='ERR_AUTHKEY_INVALID'")
                   cursor.execute(srcErrorSource)
                   errStorageA = cursor.fetchall()
                   for row in errStorageA:
                       errKey = row[0]
                       errHash = row[1]
                       cnx.close()
                   cnx.connect()
                   recQueryErrorNumb = ("insert into outbox(Text, DestinationNumber, "
                                                        "TextDecoded, CreatorID) values "
                                                        "('%s','%s','%s','PwC')" % (errHash, telephone, errKey))
                   cursor.execute(recQueryErrorNumb)
                   cnx.commit()
                   cnx.close()
                   #Reconnecting to database and updating message status
                   cnx.connect()
                   recQueryErrorNumb = ("update inbox set chck=1 where "
                                                        "ID=%s" % toUR)
                   cursor.execute(updateRquery)
                   cnx.commit()
                   cnx.close()
                   #Wait for re-establish loop
                   import time
                   time.sleep()
                   continue
           else:
               print("|| "+telephone+"\t\t"+timeO)
               print("|| "+message+"\n\n")
               balloon_tip("Mensaje recibido",message+"\nDe: "+telephone+"\t"+timeO)
               file = open("smscenter.log","a")
               file.write("Message was received at: "+timeO+"\n")
               file.write("----------------------------------------\n")
               file.write("|| "+telephone+"\n")
               file.write("|| "+message.encode("utf8")+"\n")
               file.write("-----Length of sms: "+str(lengM)+"\n")
               file.write("-----Recognize: \n\n\n")
               file.close()
               cnx.close()
               cnx.connect()
               updateRquery = ("update inbox set chck=1 where ID=%s" % toUR)
               cursor.execute(updateRquery)
               cnx.commit()
               cnx.close()
               import time
               time.sleep(3)
               continue            

def readWh():
   while True:
       cnx2.connect()
       lastRquery2 = ("select UpdatedInDB, TextDecoded, SenderNumber, ID from inbox where "
                      "chck=0 order by UpdatedInDB asc limit 1")
       cursor2.execute(lastRquery2)
       regL2 = cursor2.fetchall()
       if not regL2:
           cnx2.close()
           cnx2.connect()
           queryTrunc = ("delete from inbox where chck=1")
           cursor2.execute(queryTrunc)
           cnx2.commit()
           cnx2.close()
           import time
           time.sleep(3)
           continue
       else:
           for row in regL2:
               timeReceived = row[0]
               timeReceivedC = str(timeReceived)
               timeReceivedO = timeReceivedC[10:]
               messageRec = row[1]
               telephoneRec = row[2]
               idRec = row[3]
               cnx2.close()
           print("Hola menor")
           cnx2.connect()
           queryA = ("insert into inboxencrypt(id, UpdatedInDB, TextDecoded, SenderNumber) "
                     "values (%s, %s, AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)), "
                     "AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)))")
           cursor2.execute(queryA)
           cnx2.commit()
           cnx2.close()
           #Drop message
           cnx2.connect()
           queryB = ("update inbox set chck=1 where "
                     "ID=%s" % idRec)
           cursor2.execute(queryB)
           cnx2.commit()
           cnx2.close()
           cnx2.connect()
           queryC = ("delete from inbox where chck=1")
           cursor2.execute(queryC)
           cnx2.commit()
           cnx2.close()
           continue

def sentWh():
   while True:
       cnx3.connect()
       lastRquery3 = ("select UpdatedInDB, InsertIntoDB, SendingDateTime, DestinationNumber, "
                      "TextDecoded, ID, Status, CreatorID from sentitems where chck=0 order by UpdatedInDB asc limit 1")
       cursor3.execute(lastRquery3)
       regL3= cursor3.fetchall()
       if not regLr:
           cnx3.close()
           cnx3.connect()
           queryTrunc1 = ("delete from sentitems where chck=1")
           cursor3.execute(queryTrunc1)
           cnx3.commit()
           cnx3.close()
           import time
           time.sleep(3)
           print("Hola menor")
           continue
       else:
           for row in regL3:
               timeUpdated = row[0]
               timeInsert = row[1]
               timeSent = row[2]
               telephoneSent = row[3]
               messageSent = row[4]
               idSent = row[5]
               status = row[6]
               creator = row[7]
               cnx3.close()
           cnx3.connect()
           print("Hola menor")
           queryD = ("insert into sentencrypt(id, UpdatedInDB, InsertIntoDB, SendingDateTime, "
                     "DestinationNumber, TextDecoded, Status, CreatorID) values "
                     "(%s, '%s', '%s', '%s', "
                     "AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)), "
                     "AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)), "
                     "'%s', AES_ENCRYPT('%s', (select AES_DECRYPT(cryptkey, '373630303a3a3') from decrypkey)))"
                     % (idSent, timeUpdated, timeInsert, timeSent, telephoneSent, messageSent, status, creator))
           cursor3.execute(queryD)
           cnx3.commit()
           cnx3.close()
           #Drop message
           cnx3.connect()
           queryZ = ("update sentitems set chck=1 where "
                     "ID=%s" % idSent)
           cursor3.execute(queryZ)
           cnx3.commit()
           cnx3.close()
           cnx3.connect()
           queryE = ("delete from sentitems where chck=1")
           cursor.execute(queryE)
           cnx3.commit()
           cnx3.close()
           import time
           time.sleep(3)
           continue


if __name__ == '__main__':
   print("Be sure you have Windows pop-ups notifications enabled.\n\n")
   print("SMS Center will show pop-ups notifications when message is being received."
         "\nThis window need to be open at anytime.")
   Thread(target = newWh).start()
   Thread(target = readWh).start()
   Thread(target = sentWh).start()
As you can see, I have 3 functions that have a loop (newWh, readWh and sentWh = these are the three functions, their loops are infinite). But, when I execute it, I obtain the following error:

                (I can't post the image url)


I don't know if there are problem with MySQL, I was searching about initialize more MySQL instances but I don't find anything that show how to do this in Windows.

So, I don't know, I believe too that is a problem with MySQL ports. I don't know what is the problem, for this reason, I hope that you can help me guys.

If you see multiple source code errors, notify me, I am a new programmer on python and newbie (x1000) working with multi-threading, so, if there are ways the optimize that source code with inheritance or oop programming, notify me, I want to learn python at pro level. Thank you for help.

There is the issue image:

             https://imgur.com/a/OPhpF
Reply
#2
Oh, sorry again. This is my reply:


This is the first error:
Error:
... packet_number, compressed_packet_number) File "C:\Python27\lib\site-packages\mysql\connector\network.py", line 143, in send_plain errno=2055, values=(self.get_address(), _strioerror(err))) OperationalError: 2055: Lost connection to MySQL server at 'localhost:3306', system error: 9 Bad file descriptor Exception in thread Thread=3: Traceback (most recen call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\Python27\lib\threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "C:\Users\PwC User\Desktop\answerRecognizer.py", line 362, in sentWh cursor3.execute(lastRquery3) File "C:\Python27\lib\site-packages\mysql\connector\cursor.py", line 559, in execute self._handle_result(self._connection.cmd_query(stmt)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 494, in cmd_query result = self._handle_result(self._send_cmd(ServerCmd.QUERY, query)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 262, in _send_cmd packet_number, compressed_packet_number) File "C:\Python27\lib\site-packages\mysql\connector\network.py", line 143, in send_plain errno=2055, values=(self.get_address(), _strioerror(err))) OperationalError: 2055: Lost connection to MySQL server at 'localhost:3306', system error: 9 Bad file descriptor
And, sometimes I give this error:
Error:
Exception in thread Thread=1: Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 801, in __bootstrap_inner self.run() File "C:\Python27\lib\threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "C:\Users\PwC User\Desktop\answerRecognizer.py", line 80, in newWh cursor.execute(lastRquery) File "C:\Python27\lib\site-packages\mysql\connector\cursor.py", line 559, in execute self._handle_result(self._connection.cmd_query(stmt)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 494, in cmd_query result = self._handle_result(self._send_cmd(ServerCmd.QUERT, query)) File "C:\Python27\lib\site-packages\mysql\connector\connection.py", line 406, in _handle_result self._socket.recv(), self.python_charset) File "C:\Python27\lib\site-packages\mysql\connector\protocol.py", line 247, in parse_column raise errors.InterfaceError("Failed parsing column information") InterfaceError: Failed parsing column information
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Mysql and mysql.connector error lostintime 2 668 Oct-03-2023, 10:25 PM
Last Post: lostintime
  Too many queries? lorasf 6 980 Jul-04-2023, 04:27 AM
Last Post: lorasf
  Concurrent futures threading running at same speed as non-threading billykid999 13 1,814 May-03-2023, 08:22 AM
Last Post: billykid999
  Mysql error message: Lost connection to MySQL server during query tomtom 6 16,001 Feb-09-2022, 09:55 AM
Last Post: ibreeden
Question Debian 11 Bullseye | Python 3.9.x | pip install mysql-connector-python-rf problems BrandonKastning 4 6,674 Feb-05-2022, 08:25 PM
Last Post: BrandonKastning
  Tutorials on sockets, threading and multi-threading? muzikman 2 2,120 Oct-01-2021, 08:32 PM
Last Post: muzikman
  Problem Using SQL Placeholder In MySQL Query AdeS 11 6,096 Jul-31-2021, 12:19 AM
Last Post: Pedroski55
  Can I open\use threading in Python? korenron 2 1,793 Jun-30-2021, 10:42 AM
Last Post: korenron
  Python and MySql ogautier 8 3,340 May-20-2021, 11:10 PM
Last Post: Pedroski55
  Need help multi-threading scraping spacedog 2 2,479 Apr-28-2021, 03:48 PM
Last Post: spacedog

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020