Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
python3.9 auto_ossec issue
#1
Hi all,

last resort for me trying to resolve an issue. been using ossec, and auto_ossec on centos 8 for ages, with python 3.6.8. centos8 is EOL, so built a new centos9 server.
on the server, im running (manually so i can see console output) the following...

/bin/python3 /bin/auto_server.py

i will paste the auto_server.py contents at the bottom of this post, and highlight the lines from the error.


when i run the client, i get the following on the server output...

Error:
[*] The auto enrollment OSSEC Server is now listening on 9654 Client connected with ('*.*.*.*', 64241) new() missing 1 required positional argument: 'mode' Traceback (most recent call last): File "/bin/auto_server.py", line 173, in handle data = aescall(secret, data, "decrypt") File "/bin/auto_server.py", line 153, in aescall cipher = AES.new(secret) TypeError: new() missing 1 required positional argument: 'mode' Pairing complete. Terminating connection to client.
as far as i had read (and i have read a LOT about this), the old server used pycrypto, and i now need to use pycryptodome. im sure this is wher the issues lie.

#!/usr/bin/python
#
#                                    Auto-OSSEC Server
#
#  This is the server piece to the client/server pair (ossec_client.py). Auto-OSSEC will create a protocol
#  and allow automatic deployment of OSSEC keys through an enterprise. One of the biggest challenges with
#  OSSEC is the key management pieces which auto-ossec tries to solve. When run, this will listen for comms
#  with auto_ossec.py which is the OSSEC client and pass a key request to the client through an AES
#  encrypted tunnel. Once the exchange completes, auto_ossec will integrate the key and rewrite the conf
#  file for you to incorporate the server IP address. View the README.md for usage and how to effecitvely
#  use auto-ossec. This also works with AlienVault pairing.
#
#  Written by: Dave Kennedy and the Binary Defense Systems (BDS) Team
#  Twitter: @HackingDave, @Binary_Defense
#  Website: https://www.binarydefense.com
#
#  Recommended: Place this python file under supervisor to ensure health, stability, and service starts.
#
#  Usage: python auto_sever.py  - This will spawn a port listening and wait for connections from auto_ossec.py
#
#  Will listen on port 9654 for an incoming challege
#
#  Python Crypto and Python Pexpect is required - apt-get install python-crypto python-pexpect
#

# needed for python2/3 compatibility
try: import SocketServer as socketserver
except ImportError: import socketserver
from threading import Thread
import subprocess
import sys
import traceback
import base64
import time
import socket
import os

# python2/3 compatibility
try: import _thread as thread
except ImportError: import thread

# check python crypto library
try:
    from Crypto.Cipher import AES

except ImportError:
    print("[!] ERROR: pycryptodome not installed. Run 'python3 -m install pycrypto' to fix.")
    sys.exit()

# check pexpect library
try:
    import pexpect

except ImportError:
    print("[!] ERROR: pexpect not installed. Run 'python3 -m install pexpect' to fix.")
    sys.exit()

# global lock to restart ossec service
global counter
counter = 0

# global lock for queue
global queue_lock
queue_lock = 0

# main service handler for auto_server
class service(socketserver.BaseRequestHandler):
    def handle(self):
        # parse OSSEC hids client certificate
        def parse_client(hostname, ipaddr):
            child = pexpect.spawn("/var/ossec/bin/manage_agents")
            child.timeout=300
            child.expect("Choose your action")
            child.sendline("a")
            child.expect("for the new agent")
            child.sendline(hostname)
            i = child.expect(['IP Address of the new agent', 'already present'])
            # if we haven't already added the hostname
            if i == 0:
                child.sendline(ipaddr)
                child.expect("for the new agent")
                child.sendline("")
                for line in child:
                    try: line = str(line, 'UTF-8')
                    except TypeError: line = str(line) # python2 compatibility
                    # pull id
                    if "[" in line:
                        id = line.replace("[", "").replace("]", "").replace(":", "").rstrip()
                        break

                child.expect("Confirm adding it?")
                child.sendline("y")
                child.sendline("q")
                child.close()
                child = pexpect.spawn("/var/ossec/bin/manage_agents -e %s" % (id))
                child.timeout=300
                for line in child: key = line.rstrip() # actual key export
                # when no agents are there and one is removed - the agent wont be added properly right away - need to go through the addition again - appears to be an ossec manage bug - going through everything again appears to solve this
                time.sleep(0.5)
                if "Invalid ID" in str(key): return 0
                return key

            # if we have a duplicate hostname
            else:
                child.close()
                child = pexpect.spawn("/var/ossec/bin/manage_agents -l")
                child.timeout=300
                for line in child:
                    try: line = str(line, 'UTF-8').rstrip()
                    except TypeError: line = str(line).rstrip() # python 2 and 3 compatibility
                    if hostname in line:
                        remove_id = line.split(",")[0].replace("ID: ", "").replace("   ", "").rstrip()
                        break
                child.close()
                time.sleep(0.5)
                child = pexpect.spawn("/var/ossec/bin/manage_agents -r %s" % (remove_id))
                child.timeout=300
                child.expect("manage_agents: Exiting.")
                time.sleep(2)
                child.close()
                time.sleep(1)
                return 0

        def decryptaes(cipher, data, padding):
            result = str(cipher.decrypt(base64.b64decode(data)), 'UTF-8').rstrip(padding)
            return result

        def decryptaes_py2(cipher, data, padding):
            result = cipher.decrypt(base64.b64decode(data)).rstrip(padding)
            return result

        def encryptaes(cipher, data, padding, blocksize):
            # one-liner to sufficiently pad the text to be encrypted
            pad = lambda s: s + (blocksize - len(s) % blocksize) * padding
            try: data1 = str(data, 'UTF-8') #print('d1', data1)
            except TypeError: data1 = str(data)
            data2 = pad(data1) #; print('d2', data2)
            data3 = cipher.encrypt(data2) #; print('d3', data3, type(data3))
            result = base64.b64encode(data3)
            return result

        # main AES encrypt and decrypt function with 32 block size padding
            def aescall(secret, data, format)

            # padding and block size
            PADDING = '{'
            BLOCK_SIZE = 32

            # random value here to randomize builds
            a = 50 * 5

            # generate the cipher
                                                                                                                                                                  cipher = AES.new(secret)

            if format == "encrypt":
                aes = encryptaes(cipher, data, PADDING, BLOCK_SIZE)
                return aes

            if format == "decrypt":
                try: aes = decryptaes(cipher, data, PADDING)
                except TypeError: aes = decryptaes_py2(cipher, data, PADDING)
                return str(aes)

        # recommend changing this - if you do, change auto_ossec.py as well - -
        # would recommend this is the default published to git
        secret = "(3j+-sa!333hNA2u3h@*!~h~2&^lk<!B"

        print("Client connected with ", self.client_address)
        try:
            data = self.request.recv(1024)
            if data != "":
                try:
                                                                                                                                                                      data = aescall(secret, data, "decrypt")
                    # if this section clears -we know that it is a legit
                    # request, has been decrypted and we're ready to rock
                    if "BDSOSSEC" in data:
                        # if we are using star IP addresses
                        if "BDSOSSEC*" in data: star = 1
                        else: star = 0

                        # process to restart OSSEC if needed every 10 minutes -
                        # if lock variable is 1 is present then it will trigger a
                        # restart of OSSEC server
                        global counter
                        counter = 1

                        # strip identifier
                        data = data.replace("BDSOSSEC*", "").replace("BDSOSSEC", "")
                        hostname = data

                        # pull the true IP, not the NATed one if they are using VMWare
                        if star == 0: ipaddr = self.client_address[0]
                        else: ipaddr = "0.0.0.0/0"

                        # this will provision the key
                        def provision_key(hostname, ipaddr):
                            ossec_key = parse_client(hostname, ipaddr)
                            if ossec_key == 0:
                                ossec_key = parse_client(hostname, ipaddr)
                                # run through again for trouble ones - ossec bug looks like - but this is a decent workaround
                                if ossec_key == 0:
                                    ossec_key = parse_client(hostname, ipaddr)

                            print("[*] Provisioned new key for hostname: %s with IP of: %s" % (hostname, ipaddr))
                            try: ossec_key = ossec_key.decode('UTF-8')
                            except: ossec_key = str(ossec_key) # python2 compatibility
                            ossec_key_crypt = aescall(secret, ossec_key, "encrypt")
                            try: ossec_key_crypt = str(ossec_key_crypt, 'UTF-8')
                            except TypeError: ossec_key_crypt = str(ossec_key_crypt)
                            print("[*] Sending new key to %s: " % (hostname) + str(ossec_key))
                            # if client disconnected dont crash everything
                            try: self.request.send(ossec_key_crypt.encode('UTF-8'))
                            except: pass
                            time.sleep(1)

                        # here if the hostname was already used, we need to
                        # remove it and call it again
                        global queue_lock

                        # if our queue lock is 0
                        if queue_lock == 0:
                            # locking up the queue to process
                            queue_lock = 1
                            provision_key(hostname, ipaddr)
                            time.sleep(0.2)
                            queue_lock = 0

                        # if we aren't ready to provision wait
                        else:
                            # we need to wait until its finished
                            while 1:
                                # sleep 5 seconds and wait for lock
                                time.sleep(5)
                                if queue_lock == 0:
                                    print("Queue is now free, proceeding with current agent additions..")
                                    queue_lock = 1
                                    provision_key(hostname, ipaddr)
                                    time.sleep(0.2)
                                    queue_lock = 0
                                    break

                except Exception as e:
                    print(e)
                    traceback.print_exc(file=sys.stdout)
                    pass

        except Exception as e:
            print(e)
            pass

        print("Pairing complete. Terminating connection to client.")
        self.request.close()

# this waits 5 minutes to check if new ossec agents have been deployed, if
# so it restarts the server
def ossec_monitor():
    while 1:
        time.sleep(300)
        global counter
        if counter == 1:
            # if we dont have any new agents being added at the time
            if queue_lock == 0:
                print("[*] New OSSEC agent added - triggering restart of service to add..")
                subprocess.Popen("service ossec restart", stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True).wait()
                counter = 0

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass

print("[*] The auto enrollment OSSEC Server is now listening on 9654")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# set is so that when we cancel out we can reuse port
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# bind to all interfaces on port 10900
ThreadedTCPServer.allow_reuse_address = True
t = ThreadedTCPServer(('', 9654), service)

# start the server and listen forever
try:
    # start a threaded counter
    thread.start_new_thread(ossec_monitor, ())
    t.serve_forever()

except KeyboardInterrupt: print("[*] Exiting the automatic enrollment OSSEC daemon")
snippsat write Nov-15-2024, 05:48 PM:
Added code tag
Reply
#2
The error you're encountering probably stems from a change in how the pycryptodome library handles the AES.new() method compared to the old pycrypto library.
In pycrypto, the mode parameter defaults to ECB if not specified,but pycryptodome requires you to explicitly specify the mode.
The code has some stuff that could better(hard to read and test now) not gonna take that now.

Locate line 153 in your auto_server.py script, which reads and check indentation:
cipher = AES.new(secret)
To this:
cipher = AES.new(secret, AES.MODE_ECB)
imports modify:
from Crypto.Cipher import AES
from Crypto.Cipher import AES, MODE_ECB  # Add this line
check indentation:
def aescall(secret, data, format):
    # Your code here
Updated aescall function example:
def aescall(secret, data, format):
    # padding and block size
    PADDING = '{'
    BLOCK_SIZE = 32
    # generate the cipher with ECB mode
    cipher = AES.new(secret, AES.MODE_ECB)
    if format == "encrypt":
        aes = encryptaes(cipher, data, PADDING, BLOCK_SIZE)
        return aes
    if format == "decrypt":
        try:
            aes = decryptaes(cipher, data, PADDING)
        except TypeError:
            aes = decryptaes_py2(cipher, data, PADDING)
        return str(aes)
Reply
#3
hi there,

Thanks so much for your quick response. i added your changed and im getting the following...

# /bin/python3 /bin/auto_server.py
[!] ERROR: pycryptodome not installed. Run 'python3 -m install pycrypto' to fix.

# pip3 install pycryptodome
Requirement already satisfied: pycryptodome in /usr/local/lib64/python3.9/site-packages (3.21.0)
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv

cheers.
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  Type conversion issue while using Descriptor in Python3 mailnsuresh 1 3,478 May-07-2020, 12:26 PM
Last Post: deanhystad
  Gnuradio python3 is not compatible python3 xmlrpc library How Can I Fix İt ? muratoznnnn 3 6,039 Nov-07-2019, 05:47 PM
Last Post: DeaD_EyE
  strange python3 issue tony1812 1 2,778 Sep-06-2018, 01:47 AM
Last Post: Larz60+
  Issue python3.6 while inheriting telnet library sourabhjaiswal92 4 5,142 May-09-2018, 05:20 AM
Last Post: sourabhjaiswal92
  Code issue with time remaining loop. Python3 deboerdn2000 11 10,738 May-04-2017, 04:53 PM
Last Post: deboerdn2000

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020