Python Forum
covertion of argparser to cliff
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
covertion of argparser to cliff
#1
hello guys i wanted your help , me and some of my folk write code in using argparser and we wanted our code transfer into cliff
here it is code

==================
python3
=================
    import argparse
    import os
    import re
    import sys
    import traceback

    from cliff import command
    from oslo_log import log as logging
    from oslo_serialization import jsonutils as json
    from six import moves
    from six.moves.urllib import parse as urlparse

    from tempest import clients
    from tempest.common import credentials_factory as credentials
    from tempest import config
    import tempest.lib.common.http
    from tempest.lib import exceptions as lib_exc


    CONF = config.CONF
    CONF_PARSER = None

    LOG = logging.getLogger(__name__)


    def _get_config_file():
        config_dir = os.getcwd()
        default_config_dir = os.path.join(config_dir, "etc")
        default_config_file = "tempest.conf"

        conf_dir = os.environ.get('TEMPEST_CONFIG_DIR', default_config_dir)
        conf_file = os.environ.get('TEMPEST_CONFIG', default_config_file)
        path = os.path.join(conf_dir, conf_file)
        fd = open(path, 'r+')
        return fd


    def change_option(option, group, value):
        if not CONF_PARSER.has_section(group):
            CONF_PARSER.add_section(group)
        CONF_PARSER.set(group, option, str(value))


    def print_and_or_update(option, group, value, update):
        print('Config option %s in group %s should be changed to: %s'
              % (option, group, value))
        if update:
            change_option(option, group, value)


    def contains_version(prefix, versions):
        return any([x for x in versions if x.startswith(prefix)])


    def verify_glance_api_versions(os, update):
        # Check glance api versions
        # Since we want to verify that the configuration is correct, we cannot
        # rely on a specific version of the API being available.
        try:
            _, versions = os.image_v1.ImagesClient().get_versions()
        except lib_exc.NotFound:
            # If not found, we use v2. The assumption is that either v1 or v2
            # are available, since glance is marked as available in the catalog.
            # If not, glance should be disabled in Tempest conf.
            try:
                versions = os.image_v2.VersionsClient().list_versions()['versions']
                versions = [x['id'] for x in versions]
            except lib_exc.NotFound:
                msg = ('Glance is available in the catalog, but no known version, '
                       '(v1.x or v2.x) of Glance could be found, so Glance should '
                       'be configured as not available')
                LOG.warn(msg)
                print_and_or_update('glance', 'service-available', False, update)
                return

        if CONF.image_feature_enabled.api_v1 != contains_version('v1.', versions):
            print_and_or_update('api_v1', 'image-feature-enabled',
                                not CONF.image_feature_enabled.api_v1, update)
        if CONF.image_feature_enabled.api_v2 != contains_version('v2.', versions):
            print_and_or_update('api_v2', 'image-feature-enabled',
                                not CONF.image_feature_enabled.api_v2, update)


    def _remove_version_project(url_path):
        # The regex matches strings like /v2.0, /v3/, /v2.1/project-id/
        return re.sub(r'/v\d+(\.\d+)?(/[^/]+)?', '', url_path)


    def _get_unversioned_endpoint(base_url):
        endpoint_parts = urlparse.urlparse(base_url)
        new_path = _remove_version_project(endpoint_parts.path)
        endpoint_parts = endpoint_parts._replace(path=new_path)
        endpoint = urlparse.urlunparse(endpoint_parts)
        return endpoint


    def _get_api_versions(os, service):
        # Clients are used to obtain the base_url. Each client applies the
        # appropriate filters to the catalog to extract a base_url which
        # matches the configured region and endpoint_type.
        # The base URL is used to obtain the list of versions available.
        client_dict = {
            'nova': os.compute.ServersClient(),
            'keystone': os.identity_v3.IdentityClient(
                endpoint_type=CONF.identity.v3_endpoint_type),
            'cinder': os.volume_v3.VolumesClient(),
        }
        if service != 'keystone' and service != 'cinder':
            # Since keystone and cinder may be listening on a path,
            # do not remove the path.
            client_dict[service].skip_path()
        endpoint = _get_unversioned_endpoint(client_dict[service].base_url)

        http = tempest.lib.common.http.ClosingHttp(
            CONF.identity.disable_ssl_certificate_validation,
            CONF.identity.ca_certificates_file)

        __, body = http.request(endpoint, 'GET')
        client_dict[service].reset_path()
        try:
            body = json.loads(body)
        except ValueError:
            LOG.error(
                'Failed to get a JSON response from unversioned endpoint %s '
                '(versioned endpoint was %s). Response is:\n%s',
                endpoint, client_dict[service].base_url, body[:100])
            raise
        if service == 'keystone':
            versions = map(lambda x: x['id'], body['versions']['values'])
        else:
            versions = map(lambda x: x['id'], body['versions'])
        return list(versions)


    def verify_keystone_api_versions(os, update):
        # Check keystone api versions
        versions = _get_api_versions(os, 'keystone')
        if (CONF.identity_feature_enabled.api_v3 !=
                contains_version('v3.', versions)):
            print_and_or_update('api_v3', 'identity-feature-enabled',
                                not CONF.identity_feature_enabled.api_v3, update)


    def verify_cinder_api_versions(os, update):
        # Check cinder api versions
        versions = _get_api_versions(os, 'cinder')
        if (CONF.volume_feature_enabled.api_v2 !=
                contains_version('v2.', versions)):
            print_and_or_update('api_v2', 'volume-feature-enabled',
                                not CONF.volume_feature_enabled.api_v2, update)
        if (CONF.volume_feature_enabled.api_v3 !=
                contains_version('v3.', versions)):
            print_and_or_update('api_v3', 'volume-feature-enabled',
                                not CONF.volume_feature_enabled.api_v3, update)


    def verify_api_versions(os, service, update):
        verify = {
            'cinder': verify_cinder_api_versions,
            'glance': verify_glance_api_versions,
            'keystone': verify_keystone_api_versions,
        }
        if service not in verify:
            return
        verify[service](os, update)


    def get_extension_client(os, service):
        extensions_client = {
            'nova': os.compute.ExtensionsClient(),
            'neutron': os.network.ExtensionsClient(),
            'swift': os.object_storage.CapabilitiesClient(),
            # NOTE: Cinder v3 API is current and v2 and v1 are deprecated.
            # V3 extension API is the same as v2, so we reuse the v2 client
            # for v3 API also.
            'cinder': os.volume_v2.ExtensionsClient(),
        }

        if service not in extensions_client:
            print('No tempest extensions client for %s' % service)
            sys.exit(1)
        return extensions_client[service]


    def get_enabled_extensions(service):
        extensions_options = {
            'nova': CONF.compute_feature_enabled.api_extensions,
            'cinder': CONF.volume_feature_enabled.api_extensions,
            'neutron': CONF.network_feature_enabled.api_extensions,
            'swift': CONF.object_storage_feature_enabled.discoverable_apis,
        }
        if service not in extensions_options:
            print('No supported extensions list option for %s' % service)
            sys.exit(1)
        return extensions_options[service]


    def verify_extensions(os, service, results):
        extensions_client = get_extension_client(os, service)
        if service != 'swift':
            resp = extensions_client.list_extensions()
        else:
            resp = extensions_client.list_capabilities()
        # For Nova, Cinder and Neutron we use the alias name rather than the
        # 'name' field because the alias is considered to be the canonical
        # name.
        if isinstance(resp, dict):
            if service == 'swift':
                # Remove Swift general information from extensions list
                resp.pop('swift')
                extensions = resp.keys()
            else:
                extensions = map(lambda x: x['alias'], resp['extensions'])

        else:
            extensions = map(lambda x: x['alias'], resp)
        extensions = list(extensions)
        if not results.get(service):
            results[service] = {}
        extensions_opt = get_enabled_extensions(service)
        if not extensions_opt:
            LOG.info("'%s' has no api_extensions set.", service)
            return results
        if extensions_opt[0] == 'all':
            results[service]['extensions'] = extensions
            return results
        # Verify that all configured extensions are actually enabled
        for extension in extensions_opt:
            results[service][extension] = extension in extensions
        # Verify that there aren't additional extensions enabled that aren't
        # specified in the config list
        for extension in extensions:
            if extension not in extensions_opt:
                results[service][extension] = False
        return results


    def display_results(results, update, replace):
        update_dict = {
            'swift': 'object-storage-feature-enabled',
            'nova': 'compute-feature-enabled',
            'cinder': 'volume-feature-enabled',
            'neutron': 'network-feature-enabled',
        }
        for service in results:
            # If all extensions are specified as being enabled there is no way to
            # verify this so we just assume this to be true
            if results[service].get('extensions'):
                if replace:
                    output_list = results[service].get('extensions')
                else:
                    output_list = ['all']
            else:
                extension_list = get_enabled_extensions(service)
                output_list = []
                for extension in results[service]:
                    if not results[service][extension]:
                        if extension in extension_list:
                            print("%s extension: %s should not be included in the "
                                  "list of enabled extensions" % (service,
                                                                  extension))
                        else:
                            print("%s extension: %s should be included in the list"
                                  " of enabled extensions" % (service, extension))
                            output_list.append(extension)
                    else:
                        output_list.append(extension)
            if update:
                # Sort List
                output_list.sort()
                # Convert list to a string
                output_string = ', '.join(output_list)
                if service == 'swift':
                    change_option('discoverable_apis', update_dict[service],
                                  output_string)
                else:
                    change_option('api_extensions', update_dict[service],
                                  output_string)


    def check_service_availability(os, update):
        services = []
        avail_services = []
        codename_match = {
            'volume': 'cinder',
            'network': 'neutron',
            'image': 'glance',
            'object_storage': 'swift',
            'compute': 'nova',
            'baremetal': 'ironic',
            'identity': 'keystone',
        }
        # Get catalog list for endpoints to use for validation
        _token, auth_data = os.auth_provider.get_auth()
        if os.auth_version == 'v2':
            catalog_key = 'serviceCatalog'
        else:
            catalog_key = 'catalog'
        for entry in auth_data[catalog_key]:
            services.append(entry['type'])
        # Pull all catalog types from config file and compare against endpoint list
        for cfgname in dir(CONF._config):
            cfg = getattr(CONF, cfgname)
            catalog_type = getattr(cfg, 'catalog_type', None)
            if not catalog_type:
                continue
            if cfgname == 'identity':
                # Keystone is a required service for tempest
                continue
            if catalog_type not in services:
                try:
                    if getattr(CONF.service_available, codename_match[cfgname]):
                        print('Endpoint type %s not found either disable service '
                              '%s or fix the catalog_type in the config file' % (
                                  catalog_type, codename_match[cfgname]))
                        if update:
                            change_option(codename_match[cfgname],
                                          'service_available', False)
                except KeyError:
                    print('%s is a third party plugin, cannot be verified '
                          'automatically, but it is suggested that it is set to '
                          'False because %s service is not available ' % (
                              cfgname, catalog_type))
            else:
                try:
                    if not getattr(CONF.service_available,
                                   codename_match[cfgname]):
                        print('Endpoint type %s is available, service %s should be'
                              ' set as available in the config file.' % (
                                  catalog_type, codename_match[cfgname]))
                        if update:
                            change_option(codename_match[cfgname],
                                          'service_available', True)
                            # If we are going to enable this we should allow
                            # extension checks.
                            avail_services.append(codename_match[cfgname])
                    else:
                        avail_services.append(codename_match[cfgname])
                except KeyError:
                    print('%s is a third party plugin, cannot be verified '
                          'automatically, but it is suggested that it is set to '
                          'True because %s service is available ' % (
                              cfgname, catalog_type))
        return avail_services


    def _parser_add_args(parser):
        parser.add_argument('-u', '--update', action='store_true',
                            help='Update the config file with results from api '
                                 'queries. This assumes whatever is set in the '
                                 'config file is incorrect. In the case of '
                                 'endpoint checks where it could either be the '
                                 'incorrect catalog type or the service available '
                                 'option the service available option is assumed '
                                 'to be incorrect and is thus changed')
        parser.add_argument('-o', '--output',
                            help="Output file to write an updated config file to. "
                                 "This has to be a separate file from the "
                                 "original config file. If one isn't specified "
                                 "with -u the values which should be changed "
                                 "will be printed to STDOUT")
        parser.add_argument('-r', '--replace-ext', action='store_true',
                            help="If specified the all option will be replaced "
                                 "with a full list of extensions")


    def parse_args():
        parser = argparse.ArgumentParser()
        _parser_add_args(parser)
        opts = parser.parse_args()
        return opts


    def main(opts=None):
        print('Running config verification...')
        if opts is None:
            print("Use of: 'verify-tempest-config' is deprecated, "
                  "please use: 'tempest verify-config'")
            opts = parse_args()
        update = opts.update
        replace = opts.replace_ext
        global CONF_PARSER

        if update:
            conf_file = _get_config_file()
            CONF_PARSER = moves.configparser.ConfigParser()
            CONF_PARSER.optionxform = str
            CONF_PARSER.readfp(conf_file)

        # Indicate not to create network resources as part of getting credentials
        net_resources = {
            'network': False,
            'router': False,
            'subnet': False,
            'dhcp': False
        }
        icreds = credentials.get_credentials_provider(
            'verify_tempest_config', network_resources=net_resources)
        try:
            os = clients.Manager(icreds.get_primary_creds().credentials)
            services = check_service_availability(os, update)
            results = {}
            for service in ['nova', 'cinder', 'neutron', 'swift']:
                if service not in services:
                    continue
                results = verify_extensions(os, service, results)

            # Verify API versions of all services in the keystone catalog and
            # keystone itself.
            services.append('keystone')
            for service in services:
                verify_api_versions(os, service, update)

            display_results(results, update, replace)
            if update:
                conf_file.close()
                if opts.output:
                    with open(opts.output, 'w+') as outfile:
                        CONF_PARSER.write(outfile)
        finally:
            icreds.clear_creds()


    class TempestVerifyConfig(command.Command):
        """Verify your current tempest configuration"""

        def get_parser(self, prog_name):
            parser = super(TempestVerifyConfig, self).get_parser(prog_name)
            _parser_add_args(parser)
            return parser

        def take_action(self, parsed_args):
            try:
                main(parsed_args)
            except Exception:
                LOG.exception("Failure verifying configuration.")
                traceback.print_exc()
                raise


    if __name__ == "__main__":
        main()
=================

buran thank you im newbie to this forum . please help me further
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  covertion of argparser to cliff suraj522 0 1,257 Sep-11-2019, 07:36 AM
Last Post: suraj522

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020