Hello to all members of the forum
There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.
let's get it in order:
there is a db.sqlite database
there is a running elasticsearch server on the locale - checked it works
the script takes data from the database and pushes it to the elasticsearch database
elasticsearch prepared for push, created index with correct schema
but I can't figure out how to solve it.
can anyone help me
thank you in advance
There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.
let's get it in order:
there is a db.sqlite database
there is a running elasticsearch server on the locale - checked it works
the script takes data from the database and pushes it to the elasticsearch database
elasticsearch prepared for push, created index with correct schema
import os import json import logging import sqlite3 from contextlib import contextmanager from typing import List from urllib.parse import urljoin import requests url = 'http://127.0.0.1:9200/' db_path = os.path.abspath('db.sqlite') logger = logging.getLogger() def dict_factory(cursor: sqlite3.Cursor, row: tuple) -> dict: print("def dict_factory") d = {} for idx, col in enumerate(cursor.description): d[col[0]] = row[idx] return d @contextmanager def conn_context(db_path: str): print("def conn_context") conn = sqlite3.connect(db_path) conn.row_factory = dict_factory # yield conn # conn.close() try: yield conn except: print("exception") conn.close() class ESLoader: def __init__(self, url: str): print("class ESLoader -> __init__") self.url = url def _get_es_bulk_query(self, rows: List[dict], index_name: str) -> List[str]: print("class ESLoader -> def _get_es_bulk_query") prepared_query = [] for row in rows: prepared_query.extend([ json.dumps({'index': {'_index': index_name, '_id': row['id']}}), json.dumps(row) ]) return prepared_query def load_to_es(self, records: List[dict], index_name: str): print("class ESLoader -> def load_to_es") prepared_query = self._get_es_bulk_query(records, index_name) str_query = '\n'.join(prepared_query) + '\n' response = requests.post( urljoin(self.url, '_bulk'), data=str_query, headers={'Content-Type': 'application/x-ndjson'} ) json_response = json.loads(response.content.decode()) for item in json_response['items']: error_message = item['index'].get('error') if error_message: logger.error(error_message) class ETL: SQL = ''' WITH x as ( SELECT m.id, group_concat(a.id) as actors_ids, group_concat(a.name) as actors_names FROM movies m LEFT JOIN movie_actors ma on m.id = ma.movie_id LEFT JOIN actors a on ma.actor_id = a.id GROUP BY m.id ) SELECT m.id, genre, director, title, plot, imdb_rating, x.actors_ids, x.actors_names, CASE WHEN m.writers = '' THEN '[{"id": "' || m.writer || '"}]' ELSE m.writers END AS writers FROM movies m LEFT JOIN x ON m.id = x.id ''' def __init__(self, conn: sqlite3.Connection, es_loader: ESLoader): print("class ETL -> __init__") self.es_loader = es_loader self.conn = conn def load_writers_names(self) -> dict: print("class ETL -> def load_writers_names") writers = {} # sql = '''SELECT DISTINCT id, name FROM writers''' for writer in self.conn.execute('''SELECT DISTINCT id, name FROM writers'''): writers[writer['id']] = writer return writers def _transform_row(self, row: dict, writers: dict) -> dict: print("class ETL -> def _transform_row") movie_writers = [] writers_set = set() for writer in json.loads(row['writers']): writer_id = writer['id'] if writers[writer_id]['name'] != 'N/A' and writer_id not in writers_set: movie_writers.append(writers[writer_id]) writers_set.add(writer_id) actors = [] actors_names = [] if row['actors_ids'] is not None and row['actors_names'] is not None: actors = [ {'id': _id, 'name': name} for _id, name in zip(row['actors_ids'].split(','), row['actors_names'].split(',')) if name != 'N/A' ] actors_names = [x for x in row['actors_names'].split(',') if x != 'N/A'] return { 'id': row['id'], 'genre': row['genre'].replace(' ', '').split(','), 'writers': movie_writers, 'actors': actors, 'actors_names': actors_names, 'writers_names': [x['name'] for x in movie_writers], 'imdb_rating': float(row['imdb_rating']) if row['imdb_rating'] != 'N/A' else None, 'title': row['title'], 'director': [x.strip() for x in row['director'].split(',')] if row['director'] != 'N/A' else None, 'description': row['plot'] if row['plot'] != 'N/A' else None } def load(self, index_name: str): print("class ETL -> def load") records = [] writers = self.load_writers_names() for row in self.conn.execute(self.SQL): transformed_row = self._transform_row(row, writers) records.append(transformed_row) self.es_loader.load_to_es(records, index_name) es_loader = ESLoader(url) connn = conn_context(db_path) start = ETL(connn, es_loader) index = 'movies' start.load(index)here is the error:
Error:Traceback (most recent call last):
File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 240, in <module>
start.load (index)
File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 221, in load
writers = self.load_writers_names ()
File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 153, in load_writers_names
for writer in self.conn.execute ('' 'SELECT DISTINCT id, name FROM writers' ''):
AttributeError: '_GeneratorContextManager' object has no attribute 'execute'
as far as I know, the problem is hidden in the function: conn_contextbut I can't figure out how to solve it.
can anyone help me
thank you in advance
Larz60+ write Jun-11-2021, 08:04 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Also pasted code in your post which is preferred.
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Also pasted code in your post which is preferred.