Jul-10-2022, 12:30 AM
I just thought I would share some code Ive written for interacting with Palo Alto firewalls which are managed by Panorama.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
import requests import pandevice import json import time import sys import ipaddress from requests.exceptions import HTTPError from pandevice import panorama from pandevice import objects #this code block will prevent ssl errors until i learn certificate handling verify = False if not verify: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning) # This defines the panorama device interface to connect to and the DeviceGroup device = "your_panorama_ip_address_goes_here" devicegroup = "Production" # This will set the description of the node to the Release Ticket number for audit purposes description = input ( 'Please enter the NETENG ticket number(just the numbers): ' ) #this is required to execute your script. you need a valid api key auth_key = input ( 'Please past in your palo alto api key with no extra spaces: ' ) #this is the api call for the script to use to do the basic panorama commmit commit_panorama_api = "https://<your panorama name here>/api/?type=commit&cmd=<commit></commit>&key=" + auth_key #this is the api call for the script to use to do the device group commit commit_devicegroup_api = 'https://<your panorama name here>/api/?type=commit&action=all&cmd=<commit-all><shared-policy><device-group><entry name="Production"/></device-group></shared-policy></commit-all>&key=' + auth_key # This defines how we will connect to panorama pano = panorama.Panorama(device, api_key = auth_key) # This defines the device group we wil be connecting to palo_device_group = panorama.DeviceGroup(devicegroup) pano.add(palo_device_group) #this is the api call to get the object list from the production device group getProdAddressObjectsApi = 'https://<your panorama name here>/restapi/9.0/Objects/Addresses?location=device-group&device-group=Production&key=' + auth_key #definitons of functions to be used below. this include validation of IP information entered... formatting the object names properly for palo alto. the function necessary to #clean up the text file entries. and both the panorama commit as well as the device group commit. #this function helps to validate the ip address information entered manually or via txt file. this will be used by otehr functions that deal with IP addresses and IP CIDR notations. def valid_ip_or_cidr(ip): try : ipaddress.IPv4Address(ip) #print('This is a valid IPs address') return True except : try : ipaddress.IPv4Network(ip) #print('This is a valid network CIDR') return True except : print (ip + ' This is neither a valid IPV4 address or a valid IPV4 CIDR notation, please check your data and try again' ) #return False sys.exit() #this function is what generates the object names that match our standard naming schemes and meets palo alot requirements. def palo_object_name(ip): try : ipaddress.IPv4Address(ip) object_name = ( "node-" + str (ip)) return object_name except : try : ipaddress.IPv4Network(ip) object_name = str (ip) temp_object_name = object_name.replace( "/" , "-" ) object_name = ( "net-" + temp_object_name) return object_name except : print ( "Somthing went wrong creating the object name, check the contents of the cleanips.txt file" ) sys.exit() #this function basically cleans the txt file data and remove the trailing '\n' from the objects so that each entry is seen as a string of just the IP address or CIDR def remove_lines_list ( file ): ip_list = open ( file ).readlines() return [address.rstrip( '\n' ) for address in ip_list] #this function does the basic palo alto panorama commmit def palo_panorama_commit(): # this next piece will commit the change to panorama try : panorama_commit_response = requests.get(commit_panorama_api, verify = False ) # if the response was successful, no Exception will be raised panorama_commit_response.raise_for_status() except HTTPError as http_err: print ( f 'HTTP error occurred: {http_err}' ) except Exception as err: print ( f 'Other error occurred: {err}' ) else : print ( "Please wait while your panorama commit processes!" ) time.sleep( 30 ) print ( 'Your panorama commit was successful' ) #this function does the palo alto device group commit def palo_devicegroup_commit(): #this should commit the device try : device_commit_response = requests.get(commit_devicegroup_api, verify = False ) # if the response was successful, no Exception will be raised device_commit_response.raise_for_status() except HTTPError as http_err: print ( f 'HTTP error occurred: {http_err}' ) except Exception as err: print ( f 'Other error occurred: {err}' ) else : print ( "Please wait while the Production Device group commits!" ) time.sleep( 60 ) print ( 'Your SFTP allow list update was successfully commmit' ) #validate ip address info is actually real valid IPV4 address or real valid IPV4 CIDR notation. def ipv4address_ipv4cidr_validation(): for newIpAddress in addressList: if valid_ip_or_cidr(newIpAddress): continue else : print (newIpAddress + " is not a valid IPV4 address nor IPV4 CIDR, check your data and try again." ) #this will generally happen if your cleanips.txt list is blank or has invalid characters in it. quit() def isPrivate_validaton(): for newIpAddress in addressList: if ipaddress.ip_address(newIpAddress).is_private: print (newIpAddress + " this is a private address and cannot be used to access our Public SFTP service." ) quit() else : continue #this code uses the above function defined to clean the text file data addressList = remove_lines_list( 'myips.txt' ) #this api call creates the dictionary of address ojbects that already exist in the firewall so we can compare it to the list of addresses being submit for addition try : current_address_objects_response = requests.get(getProdAddressObjectsApi, verify = False ) # if the response was successful, no Exception will be raised current_address_objects_response.raise_for_status() except HTTPError as http_err: print ( f 'HTTP error occurred: {http_err}' ) except Exception as err: print ( f 'Other error occurred: {err}' ) else : print ( "Please wait while we prepare your query" ) time.sleep( 5 ) productionAddressDict = json.loads(current_address_objects_response.text) #the below function will iterate through the addressList and compare it the data pulled from the palo alto api call ( a dictionary) and determine if the IPV4 address or IPV4 CIDR #already exists. if it does it will skip over it. if not it will add it to a new txt file "cleanips.txt" and then it cleans up that file to a list and validates its not empty. when not #empty it will create an address ojbect in the devicegroup specified at the top of this file with the SFTP tag so that it gets automaticaly added to the Dynamic Address Group which permits #inbound SFTP access. def create_sftp_object(): for newIpAddress in addressList: my_var = 0 for entry in productionAddressDict[ 'result' ][ 'entry' ]: if 'ip-netmask' in entry: temp_ip = entry[ 'ip-netmask' ] if temp_ip ! = newIpAddress: my_var = my_var + 1 continue elif temp_ip = = newIpAddress: print ( "Skipping " + newIpAddress + " because it already exists" ) my_var = 0 break else : continue if my_var > = 1 : with open ( "cleanips.txt" , 'a' ) as file_object: file_object.write(newIpAddress) file_object.write( "\n" ) my_var = 0 cleanAddressList = remove_lines_list( 'cleanips.txt' ) #print(cleanAddressList) #print(type(cleanAddressList)) #print(len(cleanAddressList)) if ( len (cleanAddressList)) = = 0 : print ( "All the addresses you submit already exist in the SFTP allow list." ) sys.exit() else : for newIpAddress in cleanAddressList: name = palo_object_name(newIpAddress) full_description = ( "NETENG-" + description) sftp_server = pandevice.objects.AddressObject(name = name, value = newIpAddress, description = full_description, tag = "SFTP" ) palo_device_group.add(sftp_server) sftp_server.create() cleanAddressList.clear() ipv4address_ipv4cidr_validation() isPrivate_validaton() create_sftp_object() palo_panorama_commit() palo_devicegroup_commit() |