Python Forum
I am a total noob, I need this reworked
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
I am a total noob, I need this reworked
#1
OK so I know enough about Python and that is that I don't know it. I normally install IP phones not program them. Anyuway, I have been looking all over for a script to tell my phones where the server is at when they boot. When they boot they multicast on 224.0.1.75 port 5060 with a request for a provisioning server. The server would respond with a message where the server is located. I found this Python script and I tried to set it to my phones but to no avail. The first two octets of my MAC addresses are 80:82 The phones are Atcom. When I run this script it shows that it is listening (well I get no errors) but there are no responses to the phones. Can someone look at this and help me getting this working? It originally was created for snom phones. The original creater I cannot get to respond to email requests. Thanks
[inline]#! /usr/bin/python
#
# snom multicast telephone discovery
#
#
# Author: Filip Polsakiewicz <[email protected]>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import socket
import struct
import sys
import re

from optparse import OptionParser

class snom_phone(object):
"""Basic representation of a snom phone."""

def __init__(self, mac=None, ip=None, mod=None, fw=None, subs=None):
"""Default constructor."""
self.mac_addr = mac
self.ip_addr = ip
self.sip_port = 5060
self.model = mod
self.fw_version = fw
self.subscribe = subs

def __repr__(self):
"""Gets a string representation of the phone"""
return "%s (MAC: %s) running Firmware %s found at IP %s" % (self.model, self.__macrepr(self.mac_addr), self.fw_version, self.ip_addr)

def __macrepr(self, m):
""" Normalize a MAC address to lower case unix style """
m = re.sub("[.:-]", "", m)
m = m.lower()
n = "%s:%s:%s:%s:%s:%s" % (m[0:2], m[2:4], m[4:6], m[6:8], m[8:10], m[10:])
return n

def parse(text):
"""Parses the incoming SUBSCRIBE."""
try:
lines = text.split('\r\n')

# Line 1 conatains the SUBSCRIBE and our MAC
new_phone = atcomtec(subs=text)
new_phone.mac_addr = lines[0][80:82]

# We can read the IP address from line 2
new_phone.ip_addr = lines[1][17:].split(';')[0].split(':')[0]
new_phone.sip_port = lines[1][17:].split(';')[0].split(':')[1]

# The other interesting information can be found in line 7
model_info = lines[6]
l_model_info = model_info.split(';')
new_phone.model = l_model_info[3].split('=')[1][1:-1]
new_phone.fw_version = l_model_info[4].split('=')[1][1:-1]
print new_phone

return new_phone
except:
# Parsing failed. Probably not a SUBSCRIBE
return None


def get_sip_info(text):
"""Get some relevant SIP information which we need in order to generate the responses."""

lines = text.split('\r\n')
# Some SIP info we need
call_id = lines[4][9:]
cseq = lines[5][6]
via_header = lines[1]
from_header = lines[2]
to_header = lines[3]

if options.verbose: print "CallId: " + call_id + "; CSeq: " + cseq + "\r\n";
return (call_id, cseq, via_header, from_header, to_header)

def get_ip_address():
# This is a simple hack to find our IP address
# AFAIK this is the only platform-independent way to obtain the address

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('snom.com', 0))
return s.getsockname()[0]

prov_uri = None
parser = OptionParser()
parser.add_option('-u', '--url', action="store", dest="prov_uri", help="URI of the provisioning server")
parser.add_option('-l', '--local-ip', action="store", dest="local_ip", help="Local IP address")
parser.add_option("-v", "--verbose",
action="store_true", dest="verbose", default=False,
help="make lots of noise")

(options, args) = parser.parse_args()



print "Nimbus PnP Provisioning Server\n"

print "Provisioning URI is %s\n" % options.prov_uri

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('224.0.1.75', 5060))
mreq = struct.pack('4sl', socket.inet_aton('224.0.1.75'), socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
if not options.local_ip:
ip_adr = get_ip_address()
else:
ip_adr = options.local_ip

print "Local IP Address is :: %s" % ip_adr
print "=" * 80

while True:
subs = sock.recv(10240)

if options.verbose: print subs

phone = parse(subs)
(call_id, cseq, via_header, from_header, to_header) = get_sip_info(subs)

if phone:
# Create a socket to send data
sendsock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sendsock.bind(('%s' % ip_adr, 1036))

# If a phone has been recognized first send 200 OK
ok_response = "SIP/2.0 200 OK\r\n"
ok_response += via_header + "\r\n"
ok_response += "Contact: <sip:" + phone.ip_addr + ":" + phone.sip_port + ";transport=tcp;handler=dum>\r\n"
ok_response += to_header + "\r\n"
ok_response += from_header + "\r\n"
ok_response += "Call-ID: %s\r\n" % call_id
ok_response += "CSeq: %s SUBSCRIBE\r\nExpires: 0\r\nContent-Length: 0\r\n" % cseq

sendsock.sendto(ok_response, ("%s" % phone.ip_addr, int(phone.sip_port)))

# Now send a NOTIFY with the configuration URL

if not options.prov_uri:
prov_uri = "tftp://192.168.0.20"
else:
prov_uri = options.prov_uri
prov_uri = "%s/%s.htm" % (prov_uri,phone.model)

notify = "NOTIFY sip:%s:%s SIP/2.0\r\n" % (phone.ip_addr, phone.sip_port)
notify += via_header + "\r\n"
notify += "Max-Forwards: 20\r\n"
notify += "Contact: <sip:%s:1036;transport=TCP;handler=dum>\r\n" % ip_adr
notify += to_header + "\r\n"
notify += from_header + "\r\n"
notify += "Call-ID: %s\r\n" % call_id
notify += "CSeq: 3 NOTIFY\r\n"
notify += "Content-Type: application/url\r\n"
notify += "Subscription-State: terminated;reason=timeout\r\n"
notify += "Event: ua-profile;profile-type=\"device\";vendor=\"OEM\";model=\"OEM\";version=\"7.1.19\"\r\n"
notify += "Content-Length: %i\r\n" % (len(prov_uri))
notify += "\r\n%s" % prov_uri

print "Sending NOTIFY with URI :: %s\n" % prov_uri
if options.verbose: print notify
sendsock.sendto(notify, ("%s" % phone.ip_addr, int(phone.sip_port)))
[/inline]
Reply
#2
Anyone have any ideas?
Reply
#3
You need to post your code with indentation, as sparkz_alot indicate in the moderator note.
Reply
#4
(Dec-01-2017, 04:20 PM)edlentz Wrote: Anyone have any ideas?

No, until you've edited your original post.
Almost dead, but too lazy to die: https://sourceserver.info
All humans together. We don't need politicians!
Reply
#5
Better?
I found some other code and I am getting the IP address and port of the phones to print, so I know I am on the right track. So the task is to send the notify message back to the phone with the local ip as the provisioning server

#!/usr/bin/env python
#
# Send/receive UDP multicast packets.
# Requires that your OS kernel supports IP multicast.
#
# Usage:
#   mcast -s (sender, IPv4)
#   mcast -s -6 (sender, IPv6)
#   mcast    (receivers, IPv4)
#   mcast  -6  (receivers, IPv6)

MYPORT = 5060
MYGROUP_4 = '224.0.1.75'
MYGROUP_6 = '0000:0000:0000:0000:0000:ffff:e000:014b'
MYTTL = 1 # Increase to reach other networks

import time
import struct
import socket
import sys

def main():
    group = MYGROUP_6 if "-6" in sys.argv[1:] else MYGROUP_4

    if "-s" in sys.argv[1:]:
        sender(group)
    else:
        receiver(group)


def sender(group):
    addrinfo = socket.getaddrinfo(group, None)[0]

    s = socket.socket(addrinfo[0], socket.SOCK_DGRAM)

    # Set Time-to-live (optional)
    ttl_bin = struct.pack('@i', MYTTL)
    if addrinfo[0] == socket.AF_INET: # IPv4
        s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl_bin)
    else:
        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, ttl_bin)

    while True:
        data = repr(time.time())
        s.sendto(data + '\0', (addrinfo[4][0], MYPORT))
        time.sleep(1)


def receiver(group):
    # Look up multicast group address in name server and find out IP version
    addrinfo = socket.getaddrinfo(group, None)[0]

    # Create a socket
    s = socket.socket(addrinfo[0], socket.SOCK_DGRAM)

    # Allow multiple copies of this program on one machine
    # (not strictly needed)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # Bind it to the port
    s.bind(('', MYPORT))

    group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
    # Join group
    if addrinfo[0] == socket.AF_INET: # IPv4
        mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
        s.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
    else:
        mreq = group_bin + struct.pack('@I', 0)
        s.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)

    # Loop, printing any data we receive
    while True:
        data, sender = s.recvfrom(1500)
        while data[-1:] == '\0': data = data[:-1] # Strip trailing \0's
        print (str(sender) )

    # Get the IP of the system we are running on

ip = ((([ip for ip in socket.gethostbyname_ex(socket.gethostname())[2] 
if not ip.startswith("127.")] or [[(s.connect(("8.8.8.8", 53)), s.getsockname()[0], s.close()) 
for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]]) + ["no IP found"])[0])


print "Local Provisioning server address is : %s" % ip




if __name__ == '__main__':
    main()
Thanks

Forgot to mention this is the result of the message

('192.168.0.xxx', 5060)
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020