Aug-06-2021, 07:37 PM
Hi everyone.
Iam trying to call connection_string method inside run_oracle_job and Im not sure if its better way to do that.
Im getting this error. TypeError: run_oracle_job() missing 1 required positional argument: 'connection_string'
Someone knows what is missing here?
Im running this code on Apache Airflow and using PythonVirtualenvOperator
Iam trying to call connection_string method inside run_oracle_job and Im not sure if its better way to do that.
Im getting this error. TypeError: run_oracle_job() missing 1 required positional argument: 'connection_string'
Someone knows what is missing here?
Im running this code on Apache Airflow and using PythonVirtualenvOperator
def connection_string(**kwargs): from airflow.hooks.oracle_hook import OracleHook connection = OracleHook.get_connection(kwargs['oracle_conn']) user = connection.login password = connection.password host = connection.host port = connection.port service_name = connection.extra_dejson.get('service_name', None) return f'oracle+cx_oracle://{user}:{password}@{host}:{port}/?service_name={service_name}' def run_oracle_job(connection_string, *args, **kwargs): where_clause_suffix = textwrap.dedent(""" where table_schema = 'public' """) tmp_folder = '/var/tmp/amundsen/table_metadata' node_files_folder = f'{tmp_folder}/nodes/' relationship_files_folder = f'{tmp_folder}/relationships/' job_config = ConfigFactory.from_dict({ f'extractor.oracle_metadata.{OracleMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}': where_clause_suffix, f'extractor.oracle_metadata.{OracleMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}': True, f'extractor.oracle_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}': connection_string(), f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}': node_files_folder, f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}': relationship_files_folder, f'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}': True, f'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}': node_files_folder, f'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}': relationship_files_folder, f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}': neo4j_endpoint, f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}': neo4j_user, f'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}': neo4j_password, f'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}': 'unique_tag', # should use unique tag here like {ds} }) job = DefaultJob(conf=job_config, task=DefaultTask(extractor=OracleMetadataExtractor(), loader=FsNeo4jCSVLoader()), publisher=Neo4jCsvPublisher()) return jobwhat is missing here please?