Apr-29-2020, 12:10 AM
(This post was last modified: Apr-29-2020, 12:11 AM by searching1.)
Hi, I'm currently working on this ssh tunnel script but having issue related to AttributeError: 'Connection' object has no attribute 'send_line'.
- Tried running python -tt script and the output still the same.
- Validated that password attribute do exist and correct as I have tried printing the pwd.
- Reviewed the indentation and seems correct and properly aligned.
Any idea how to resolve this? Thank you
- Tried running python -tt script and the output still the same.
- Validated that password attribute do exist and correct as I have tried printing the pwd.
- Reviewed the indentation and seems correct and properly aligned.
Any idea how to resolve this? Thank you
##### YOU ARE HERE AT jump_svr ....Sending password.. Traceback (most recent call last): File "2ssh_tunel.py", line 215, in <module> c.login_svr(D1_IP, usernamex, passwordy) File "2ssh_tunel.py", line 124, in login_svr logged_in = self.jump_svr(user, pwd) File "2ssh_tunel.py", line 102, in jump_svr self.send_line(pwd) AttributeError: 'Connection' object has no attribute 'send_line'
#!/usr/bin/env python # DEMO import sys sys.path.append("/home/abcd/pexpect-2.4") import pexpect import time import re import sys import config import logging import ConfigParser FULLLOG = True #FULLLOG = False debug_log_filename = 'jump.log' logger = logging.getLogger('jump_obj') ch = logging.StreamHandler(sys.stdout) if FULLLOG: ch = logging.FileHandler(debug_log_filename) ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(levelname)s %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) logger.setLevel(logging.INFO) #logger.setLevel(logging.DEBUG) CONN_FAIL = 'conn_fail!' CONN_SUCCESS = 'conn_success!' CONN_BAD_PASS = 'bad_password!' CONN_RETRY = 'retry' class Connection(object): def __init__(self): self.ssh_tunnel = None def __del__(self): # close the connection if still active if self.ssh_tunnel: self.ssh_tunnel.close() def jump_svr(self,user,pwd): print "\n##### YOU ARE HERE AT jump_svr" print "CRED FOR HANDLE DOMAIN",user," ",pwd logger.debug('jump_svr') # all strings are wrapped in re.compile(xxx), no need to do it manually index = self.ssh_tunnel.expect( ['[Uu]sername', '[Ll]ogin:', 'assword:', 'Enter to accept:', '[pn]dcc01\-cmds.:~>', 'Your password has expired', pexpect.EOF, pexpect.TIMEOUT, 'Are you sure you want to continue connecting (yes/no)?', 'Last login', 'pdcd\d\d\-tools1a]~%', '\'s password:'], timeout=10 ) logger.debug('index: %s' % index) login_failed = False if index == 0: self.send_line(user) elif index == 1: self.send_line(user) elif index == 2: self.send_line(pwd) elif index == 3: self.send_line('') elif index == 4: logger.info('logged into D1!') return CONN_SUCCESS elif index == 5: logger.info('Your password for the domain X has expired') login_failed = True elif index == 6: logger.info('SSH to domain X failed') login_failed = True elif index == 7: logger.info('SSH to domain X timeout') login_failed = True elif index == 8: self.send_line('yes') elif index == 9: # Last login: Thu Feb 7 08:53:33 2013 from gsd-tools.vzbi. # do nothing pass elif index == 10: logger.info('logged into subsequent domain!') return CONN_SUCCESS elif index == 11: print "\n....Sending password..",pwd self.send_line(pwd) uld = os.getcwd() print uld if login_failed: return CONN_FAIL def login_svr(self, domain_ip, user, pwd): print "\n##### YOU ARE HERE AT login_svr FUNC" logger.debug('login_svr %s' % domain_ip) if self.ssh_tunnel == None: # for domain 1 let's start the connection cmd = 'ssh %s@%s' % (user, domain_ip) logger.debug('cmd: %s' % cmd) self.ssh_tunnel = pexpect.spawn(cmd) else: self.send_line('ssh %s@%s' % (user, domain_ip)) if FULLLOG: self.ssh_tunnel.logfile = file('actualjump.log','w') for _ in range(5): logged_in = self.jump_svr(user, pwd) logger.debug('logged_in: %s' % logged_in) if logged_in == CONN_SUCCESS: return True if logged_in == CONN_FAIL: return False else: logger.warning('A problem logging in domain [%s]! Quitting.' % domain_ip) return False return False def login_to_ce(self, ce_ip, user, pwd, via_ts=False, telnet=False, old_code=None, expected_pattern=''): logger.debug('login_to_ce, ip: %s' % ce_ip) if not via_ts and ce_ip != '' and ce_ip is not None: if telnet: cmd = 'telnet %(ip)s' % {'ip': ce_ip} else: cmd = 'ssh -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no %(username)s@%(ip)s' % {'username': user, 'ip': ce_ip} self.send_line(cmd) logger.debug(cmd) code = old_code if code == None: code = '' for _ in range(5): logged_in, old_code = self.handle_ce_login(user, pwd, old_code=old_code, ce_hostname=expected_pattern) # print repr(logged_in) if logged_in == CONN_SUCCESS: return True if logged_in == CONN_FAIL: return False if logged_in == CONN_BAD_PASS: logger.warning('bad password for ip %s!' % ce_ip) return False if logged_in == CONN_RETRY: continue else: logger.warning('A problem logging into CE [%s]!' % ce_ip) return False return False def connect_to_dev(self, ip, domain='44', user_domain=config.user_domain, pwd_domain=config.pwd_domain): '''Connect to the CE via the given domain (optional), authenticate and switch to enable mode. Return ssh_tunnel (pexpect.spawn() object). Depends on config package, not very clean. ''' # log into D1 self.login_svr(config.domains['1'], user_domain, pwd_domain) # log into the subsequent domain (per customer) logger.info('logging into domain %s' % domain) self.login_svr(config.domains[domain], user_domain, pwd_domain) uld = os.getcwd() print uld config = ConfigParser.ConfigParser() cfgcred = ConfigParser.ConfigParser() domains = ConfigParser.ConfigParser() domains.read('test_domains.cfg') cfgcred.read('test_config.cfg') c = Connection() # CALL CONNECTION CLASS try: D1_IP = domains.get('domains', '44') print "\nYOUR DOMAIN:\n",D1_IP,"\n" ## Dx_IP = domains.get('domains', args.domain) except ConfigParser.NoOptionError: logger.error('Domain 1 or %s not specified in config file!', args.domain) raise IOSupgradeException('domain missing') try: usernamex = cfgcred.get('domain login', 'username') passwordy = cfgcred.get('domain login', 'password') print "\nSHOW CRED:\n",usernamex,passwordy except ConfigParser.NoOptionError: logger.error('login credentials not found in config file!') raise IOSupgradeException('login info missing') print 'connecting to jumpser %s' % D1_IP c.login_svr(D1_IP, usernamex, passwordy)