Python Forum

Full Version: Error while transferring data from sqlite to elasticsearch - please help!
You're currently viewing a stripped down version of our content. View the full version with proper formatting.
Hello to all members of the forum

There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.

let's get it in order:

there is a db.sqlite database

there is a running elasticsearch server on the locale - checked it works

the script takes data from the database and pushes it to the elasticsearch database

elasticsearch prepared for push, created index with correct schema

    import os
    import json
    import logging
    import sqlite3
    from contextlib import contextmanager
    from typing import List
    from urllib.parse import urljoin
    import requests
    ​
    url = 'http://127.0.0.1:9200/'
    db_path = os.path.abspath('db.sqlite')
    ​
    ​
    logger = logging.getLogger()
    ​
    def dict_factory(cursor: sqlite3.Cursor, row: tuple) -> dict:
    ​
        print("def dict_factory")
        d = {}
    ​
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
    ​
        return d
    ​
    ​
    @contextmanager
    def conn_context(db_path: str):
    ​
        print("def conn_context")
        conn = sqlite3.connect(db_path)
        conn.row_factory = dict_factory
        # yield conn
        # conn.close()
        try:
            yield conn
        except:
            print("exception")
    ​
        conn.close()
    ​
    ​
    class ESLoader:
        def __init__(self, url: str):
            print("class ESLoader -> __init__")
            self.url = url
    ​
        def _get_es_bulk_query(self, rows: List[dict], index_name: str) -> List[str]:
    ​
            print("class ESLoader -> def _get_es_bulk_query")
    ​
            prepared_query = []
    ​
            for row in rows:
                prepared_query.extend([
                json.dumps({'index': {'_index': index_name, '_id': row['id']}}),
                json.dumps(row)
                ])
    ​
            return prepared_query
    ​
        def load_to_es(self, records: List[dict], index_name: str):
    ​
            print("class ESLoader -> def load_to_es")
    ​
            prepared_query = self._get_es_bulk_query(records, index_name)
            str_query = '\n'.join(prepared_query) + '\n'
    ​
            response = requests.post(
            urljoin(self.url, '_bulk'),
            data=str_query,
            headers={'Content-Type': 'application/x-ndjson'}
            )
    ​
            json_response = json.loads(response.content.decode())
    ​
            for item in json_response['items']:
                error_message = item['index'].get('error')
                if error_message:
                    logger.error(error_message)
    ​
    ​
    class ETL:
        SQL = '''
        WITH x as (
        SELECT m.id, group_concat(a.id) as actors_ids, group_concat(a.name) as
        actors_names
        FROM movies m
        LEFT JOIN movie_actors ma on m.id = ma.movie_id
        LEFT JOIN actors a on ma.actor_id = a.id
        GROUP BY m.id
        )
        SELECT m.id, genre, director, title, plot, imdb_rating, x.actors_ids, x.actors_names,
        CASE
        WHEN m.writers = '' THEN '[{"id": "' || m.writer || '"}]'
        ELSE m.writers
        END AS writers
        FROM movies m
        LEFT JOIN x ON m.id = x.id
        '''
    ​
        def __init__(self, conn: sqlite3.Connection, es_loader: ESLoader):
            print("class ETL -> __init__")
    ​
            self.es_loader = es_loader
            self.conn = conn
    ​
    ​
        def load_writers_names(self) -> dict:
            
            print("class ETL -> def load_writers_names")
    ​
            writers = {}
    ​
            # sql = '''SELECT DISTINCT id, name FROM writers'''
            
            for writer in self.conn.execute('''SELECT DISTINCT id, name FROM writers'''):
                writers[writer['id']] = writer
    ​
            return writers
    ​
        def _transform_row(self, row: dict, writers: dict) -> dict:
            print("class ETL -> def _transform_row")
    ​
            movie_writers = []
            writers_set = set()
    ​
            for writer in json.loads(row['writers']):
                writer_id = writer['id']
                if writers[writer_id]['name'] != 'N/A' and writer_id not in writers_set:
                    movie_writers.append(writers[writer_id])
                    writers_set.add(writer_id)
    ​
            actors = []
            actors_names = []
    ​
            if row['actors_ids'] is not None and row['actors_names'] is not None:
                actors = [
                {'id': _id, 'name': name}
                for _id, name in zip(row['actors_ids'].split(','), row['actors_names'].split(','))
                if name != 'N/A'
                ]
                actors_names = [x for x in row['actors_names'].split(',') if x != 'N/A']
    ​
            return {
                'id': row['id'],
                'genre': row['genre'].replace(' ', '').split(','),
                'writers': movie_writers,
                'actors': actors,
                'actors_names': actors_names,
                'writers_names': [x['name'] for x in movie_writers],
                'imdb_rating': float(row['imdb_rating']) if row['imdb_rating'] != 'N/A' else None,
                'title': row['title'],
                'director': [x.strip() for x in row['director'].split(',')] if row['director'] != 'N/A' else None,
                'description': row['plot'] if row['plot'] != 'N/A' else None
            }
    ​
        def load(self, index_name: str):
            print("class ETL -> def load")
    ​
            records = []
    ​
            writers = self.load_writers_names()
    ​
            for row in self.conn.execute(self.SQL):
                transformed_row = self._transform_row(row, writers)
                records.append(transformed_row)
    ​
            self.es_loader.load_to_es(records, index_name)
    ​
    ​
    ​
    es_loader = ESLoader(url)
    ​
    connn =  conn_context(db_path)
    ​
    ​
    ​
    start = ETL(connn, es_loader)
    ​
    ​
    index = 'movies'
    ​
    start.load(index)
here is the error:

Error:
Traceback (most recent call last): File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 240, in <module> start.load (index) File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 221, in load writers = self.load_writers_names () File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 153, in load_writers_names for writer in self.conn.execute ('' 'SELECT DISTINCT id, name FROM writers' ''): AttributeError: '_GeneratorContextManager' object has no attribute 'execute'
as far as I know, the problem is hidden in the function: conn_context

but I can't figure out how to solve it.

can anyone help me

thank you in advance
Welcome to the forum ps96068,
Well that is quite a large chunk of code to debug. You will understand it is difficult to say something about it when we can not test it.
It is better if you try to pick out small parts of your code until you have some 10 or 20 lines where the error is isolated.

But I will try to help you start. I see you are using "contextmanager". I have never used it but I found the manual here: contextmanager. There I read a contextmanager is used to manage the context of "with" statements.
According to the example given I expect the lines from 175 should be rewritten:
with  conn_context(db_path) as connn:
    ​start = ETL(connn, es_loader)
    ​index = 'movies'
    ​start.load(index)