Hello to all members of the forum
There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.
let's get it in order:
there is a db.sqlite database
there is a running elasticsearch server on the locale - checked it works
the script takes data from the database and pushes it to the elasticsearch database
elasticsearch prepared for push, created index with correct schema
here is the error:
but I can't figure out how to solve it.
can anyone help me
thank you in advance
There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.
let's get it in order:
there is a db.sqlite database
there is a running elasticsearch server on the locale - checked it works
the script takes data from the database and pushes it to the elasticsearch database
elasticsearch prepared for push, created index with correct schema
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import os import json import logging import sqlite3 from contextlib import contextmanager from typing import List from urllib.parse import urljoin import requests db_path = os.path.abspath( 'db.sqlite' ) logger = logging.getLogger() def dict_factory(cursor: sqlite3.Cursor, row: tuple ) - > dict : print ( "def dict_factory" ) d = {} for idx, col in enumerate (cursor.description): d[col[ 0 ]] = row[idx] return d @contextmanager def conn_context(db_path: str ): print ( "def conn_context" ) conn = sqlite3.connect(db_path) conn.row_factory = dict_factory # yield conn # conn.close() try : yield conn except : print ( "exception" ) conn.close() class ESLoader: def __init__( self , url: str ): print ( "class ESLoader -> __init__" ) self .url = url def _get_es_bulk_query( self , rows: List [ dict ], index_name: str ) - > List [ str ]: print ( "class ESLoader -> def _get_es_bulk_query" ) prepared_query = [] for row in rows: prepared_query.extend([ json.dumps({ 'index' : { '_index' : index_name, '_id' : row[ 'id' ]}}), json.dumps(row) ]) return prepared_query def load_to_es( self , records: List [ dict ], index_name: str ): print ( "class ESLoader -> def load_to_es" ) prepared_query = self ._get_es_bulk_query(records, index_name) str_query = '\n' .join(prepared_query) + '\n' response = requests.post( urljoin( self .url, '_bulk' ), data = str_query, headers = { 'Content-Type' : 'application/x-ndjson' } ) json_response = json.loads(response.content.decode()) for item in json_response[ 'items' ]: error_message = item[ 'index' ].get( 'error' ) if error_message: logger.error(error_message) class ETL: SQL = ''' WITH x as ( SELECT m.id, group_concat(a.id) as actors_ids, group_concat(a.name) as actors_names FROM movies m LEFT JOIN movie_actors ma on m.id = ma.movie_id LEFT JOIN actors a on ma.actor_id = a.id GROUP BY m.id ) SELECT m.id, genre, director, title, plot, imdb_rating, x.actors_ids, x.actors_names, CASE WHEN m.writers = '' THEN '[{"id": "' || m.writer || '"}]' ELSE m.writers END AS writers FROM movies m LEFT JOIN x ON m.id = x.id ''' def __init__( self , conn: sqlite3.Connection, es_loader: ESLoader): print ( "class ETL -> __init__" ) self .es_loader = es_loader self .conn = conn def load_writers_names( self ) - > dict : print ( "class ETL -> def load_writers_names" ) writers = {} # sql = '''SELECT DISTINCT id, name FROM writers''' for writer in self .conn.execute( '''SELECT DISTINCT id, name FROM writers''' ): writers[writer[ 'id' ]] = writer return writers def _transform_row( self , row: dict , writers: dict ) - > dict : print ( "class ETL -> def _transform_row" ) movie_writers = [] writers_set = set () for writer in json.loads(row[ 'writers' ]): writer_id = writer[ 'id' ] if writers[writer_id][ 'name' ] ! = 'N/A' and writer_id not in writers_set: movie_writers.append(writers[writer_id]) writers_set.add(writer_id) actors = [] actors_names = [] if row[ 'actors_ids' ] is not None and row[ 'actors_names' ] is not None : actors = [ { 'id' : _id, 'name' : name} for _id, name in zip (row[ 'actors_ids' ].split( ',' ), row[ 'actors_names' ].split( ',' )) if name ! = 'N/A' ] actors_names = [x for x in row[ 'actors_names' ].split( ',' ) if x ! = 'N/A' ] return { 'id' : row[ 'id' ], 'genre' : row[ 'genre' ].replace( ' ' , ' ').split(' ,'), 'writers' : movie_writers, 'actors' : actors, 'actors_names' : actors_names, 'writers_names' : [x[ 'name' ] for x in movie_writers], 'imdb_rating' : float (row[ 'imdb_rating' ]) if row[ 'imdb_rating' ] ! = 'N/A' else None , 'title' : row[ 'title' ], 'director' : [x.strip() for x in row[ 'director' ].split( ',' )] if row[ 'director' ] ! = 'N/A' else None , 'description' : row[ 'plot' ] if row[ 'plot' ] ! = 'N/A' else None } def load( self , index_name: str ): print ( "class ETL -> def load" ) records = [] writers = self .load_writers_names() for row in self .conn.execute( self .SQL): transformed_row = self ._transform_row(row, writers) records.append(transformed_row) self .es_loader.load_to_es(records, index_name) es_loader = ESLoader(url) connn = conn_context(db_path) start = ETL(connn, es_loader) index = 'movies' start.load(index) |
Error:Traceback (most recent call last):
File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 240, in <module>
start.load (index)
File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 221, in load
writers = self.load_writers_names ()
File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 153, in load_writers_names
for writer in self.conn.execute ('' 'SELECT DISTINCT id, name FROM writers' ''):
AttributeError: '_GeneratorContextManager' object has no attribute 'execute'
as far as I know, the problem is hidden in the function: conn_contextbut I can't figure out how to solve it.
can anyone help me
thank you in advance
Larz60+ write Jun-11-2021, 08:04 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Also pasted code in your post which is preferred.
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Also pasted code in your post which is preferred.