Aug-06-2021, 07:37 PM
Hi everyone.
Iam trying to call connection_string method inside run_oracle_job and Im not sure if its better way to do that.
Im getting this error. TypeError: run_oracle_job() missing 1 required positional argument: 'connection_string'
Someone knows what is missing here?
Im running this code on Apache Airflow and using PythonVirtualenvOperator
what is missing here please?
Iam trying to call connection_string method inside run_oracle_job and Im not sure if its better way to do that.
Im getting this error. TypeError: run_oracle_job() missing 1 required positional argument: 'connection_string'
Someone knows what is missing here?
Im running this code on Apache Airflow and using PythonVirtualenvOperator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
def connection_string( * * kwargs): from airflow.hooks.oracle_hook import OracleHook connection = OracleHook.get_connection(kwargs[ 'oracle_conn' ]) user = connection.login password = connection.password host = connection.host port = connection.port service_name = connection.extra_dejson.get( 'service_name' , None ) def run_oracle_job(connection_string, * args, * * kwargs): where_clause_suffix = textwrap.dedent( """ where table_schema = 'public' """ ) tmp_folder = '/var/tmp/amundsen/table_metadata' node_files_folder = f '{tmp_folder}/nodes/' relationship_files_folder = f '{tmp_folder}/relationships/' job_config = ConfigFactory.from_dict({ f 'extractor.oracle_metadata.{OracleMetadataExtractor.WHERE_CLAUSE_SUFFIX_KEY}' : where_clause_suffix, f 'extractor.oracle_metadata.{OracleMetadataExtractor.USE_CATALOG_AS_CLUSTER_NAME}' : True , f 'extractor.oracle_metadata.extractor.sqlalchemy.{SQLAlchemyExtractor.CONN_STRING}' : connection_string(), f 'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.NODE_DIR_PATH}' : node_files_folder, f 'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.RELATION_DIR_PATH}' : relationship_files_folder, f 'loader.filesystem_csv_neo4j.{FsNeo4jCSVLoader.SHOULD_DELETE_CREATED_DIR}' : True , f 'publisher.neo4j.{neo4j_csv_publisher.NODE_FILES_DIR}' : node_files_folder, f 'publisher.neo4j.{neo4j_csv_publisher.RELATION_FILES_DIR}' : relationship_files_folder, f 'publisher.neo4j.{neo4j_csv_publisher.NEO4J_END_POINT_KEY}' : neo4j_endpoint, f 'publisher.neo4j.{neo4j_csv_publisher.NEO4J_USER}' : neo4j_user, f 'publisher.neo4j.{neo4j_csv_publisher.NEO4J_PASSWORD}' : neo4j_password, f 'publisher.neo4j.{neo4j_csv_publisher.JOB_PUBLISH_TAG}' : 'unique_tag' , # should use unique tag here like {ds} }) job = DefaultJob(conf = job_config, task = DefaultTask(extractor = OracleMetadataExtractor(), loader = FsNeo4jCSVLoader()), publisher = Neo4jCsvPublisher()) return job |