Python Forum
Error while transferring data from sqlite to elasticsearch - please help!
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Error while transferring data from sqlite to elasticsearch - please help!
#1
Hello to all members of the forum

There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.

let's get it in order:

there is a db.sqlite database

there is a running elasticsearch server on the locale - checked it works

the script takes data from the database and pushes it to the elasticsearch database

elasticsearch prepared for push, created index with correct schema

    import os
    import json
    import logging
    import sqlite3
    from contextlib import contextmanager
    from typing import List
    from urllib.parse import urljoin
    import requests
    ​
    url = 'http://127.0.0.1:9200/'
    db_path = os.path.abspath('db.sqlite')
    ​
    ​
    logger = logging.getLogger()
    ​
    def dict_factory(cursor: sqlite3.Cursor, row: tuple) -> dict:
    ​
        print("def dict_factory")
        d = {}
    ​
        for idx, col in enumerate(cursor.description):
            d[col[0]] = row[idx]
    ​
        return d
    ​
    ​
    @contextmanager
    def conn_context(db_path: str):
    ​
        print("def conn_context")
        conn = sqlite3.connect(db_path)
        conn.row_factory = dict_factory
        # yield conn
        # conn.close()
        try:
            yield conn
        except:
            print("exception")
    ​
        conn.close()
    ​
    ​
    class ESLoader:
        def __init__(self, url: str):
            print("class ESLoader -> __init__")
            self.url = url
    ​
        def _get_es_bulk_query(self, rows: List[dict], index_name: str) -> List[str]:
    ​
            print("class ESLoader -> def _get_es_bulk_query")
    ​
            prepared_query = []
    ​
            for row in rows:
                prepared_query.extend([
                json.dumps({'index': {'_index': index_name, '_id': row['id']}}),
                json.dumps(row)
                ])
    ​
            return prepared_query
    ​
        def load_to_es(self, records: List[dict], index_name: str):
    ​
            print("class ESLoader -> def load_to_es")
    ​
            prepared_query = self._get_es_bulk_query(records, index_name)
            str_query = '\n'.join(prepared_query) + '\n'
    ​
            response = requests.post(
            urljoin(self.url, '_bulk'),
            data=str_query,
            headers={'Content-Type': 'application/x-ndjson'}
            )
    ​
            json_response = json.loads(response.content.decode())
    ​
            for item in json_response['items']:
                error_message = item['index'].get('error')
                if error_message:
                    logger.error(error_message)
    ​
    ​
    class ETL:
        SQL = '''
        WITH x as (
        SELECT m.id, group_concat(a.id) as actors_ids, group_concat(a.name) as
        actors_names
        FROM movies m
        LEFT JOIN movie_actors ma on m.id = ma.movie_id
        LEFT JOIN actors a on ma.actor_id = a.id
        GROUP BY m.id
        )
        SELECT m.id, genre, director, title, plot, imdb_rating, x.actors_ids, x.actors_names,
        CASE
        WHEN m.writers = '' THEN '[{"id": "' || m.writer || '"}]'
        ELSE m.writers
        END AS writers
        FROM movies m
        LEFT JOIN x ON m.id = x.id
        '''
    ​
        def __init__(self, conn: sqlite3.Connection, es_loader: ESLoader):
            print("class ETL -> __init__")
    ​
            self.es_loader = es_loader
            self.conn = conn
    ​
    ​
        def load_writers_names(self) -> dict:
            
            print("class ETL -> def load_writers_names")
    ​
            writers = {}
    ​
            # sql = '''SELECT DISTINCT id, name FROM writers'''
            
            for writer in self.conn.execute('''SELECT DISTINCT id, name FROM writers'''):
                writers[writer['id']] = writer
    ​
            return writers
    ​
        def _transform_row(self, row: dict, writers: dict) -> dict:
            print("class ETL -> def _transform_row")
    ​
            movie_writers = []
            writers_set = set()
    ​
            for writer in json.loads(row['writers']):
                writer_id = writer['id']
                if writers[writer_id]['name'] != 'N/A' and writer_id not in writers_set:
                    movie_writers.append(writers[writer_id])
                    writers_set.add(writer_id)
    ​
            actors = []
            actors_names = []
    ​
            if row['actors_ids'] is not None and row['actors_names'] is not None:
                actors = [
                {'id': _id, 'name': name}
                for _id, name in zip(row['actors_ids'].split(','), row['actors_names'].split(','))
                if name != 'N/A'
                ]
                actors_names = [x for x in row['actors_names'].split(',') if x != 'N/A']
    ​
            return {
                'id': row['id'],
                'genre': row['genre'].replace(' ', '').split(','),
                'writers': movie_writers,
                'actors': actors,
                'actors_names': actors_names,
                'writers_names': [x['name'] for x in movie_writers],
                'imdb_rating': float(row['imdb_rating']) if row['imdb_rating'] != 'N/A' else None,
                'title': row['title'],
                'director': [x.strip() for x in row['director'].split(',')] if row['director'] != 'N/A' else None,
                'description': row['plot'] if row['plot'] != 'N/A' else None
            }
    ​
        def load(self, index_name: str):
            print("class ETL -> def load")
    ​
            records = []
    ​
            writers = self.load_writers_names()
    ​
            for row in self.conn.execute(self.SQL):
                transformed_row = self._transform_row(row, writers)
                records.append(transformed_row)
    ​
            self.es_loader.load_to_es(records, index_name)
    ​
    ​
    ​
    es_loader = ESLoader(url)
    ​
    connn =  conn_context(db_path)
    ​
    ​
    ​
    start = ETL(connn, es_loader)
    ​
    ​
    index = 'movies'
    ​
    start.load(index)
here is the error:

Error:
Traceback (most recent call last): File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 240, in <module> start.load (index) File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 221, in load writers = self.load_writers_names () File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 153, in load_writers_names for writer in self.conn.execute ('' 'SELECT DISTINCT id, name FROM writers' ''): AttributeError: '_GeneratorContextManager' object has no attribute 'execute'
as far as I know, the problem is hidden in the function: conn_context

but I can't figure out how to solve it.

can anyone help me

thank you in advance
Larz60+ write Jun-11-2021, 08:04 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Also pasted code in your post which is preferred.
Reply
#2
Welcome to the forum ps96068,
Well that is quite a large chunk of code to debug. You will understand it is difficult to say something about it when we can not test it.
It is better if you try to pick out small parts of your code until you have some 10 or 20 lines where the error is isolated.

But I will try to help you start. I see you are using "contextmanager". I have never used it but I found the manual here: contextmanager. There I read a contextmanager is used to manage the context of "with" statements.
According to the example given I expect the lines from 175 should be rewritten:
with  conn_context(db_path) as connn:
    ​start = ETL(connn, es_loader)
    ​index = 'movies'
    ​start.load(index)
ps96068 likes this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  need help with data analysing with python and sqlite Hardcool 2 357 Jan-30-2024, 06:49 AM
Last Post: Athi
  Help With Python SQLite Error Extra 10 14,949 May-04-2022, 11:42 PM
Last Post: Extra
  pandas.errors.ParserError: Error tokenizing data. C error: Expected 9 fields in line Anldra12 9 15,297 Jun-15-2021, 08:16 AM
Last Post: Anldra12
  SQLite Unique constraint failed error djwilson0495 3 13,441 Aug-14-2020, 05:23 PM
Last Post: ndc85430
  Importing data from a text file into an SQLite database with Python macieju1974 7 4,110 Jun-29-2020, 08:51 PM
Last Post: buran
  bulk update in elasticsearch pythonlearner1 1 5,980 Jun-10-2020, 10:01 PM
Last Post: pythonlearner1
  Unable post request to AWS elasticsearch service Rupini 0 1,885 May-18-2020, 08:27 AM
Last Post: Rupini
  hi guys, I got data reader error while pulling the data from yahoo gokulrajkmv 1 1,995 May-15-2020, 11:08 AM
Last Post: snippsat
  Error SQLite objects created in a thread can only be used in that same thread. binhduonggttn 3 15,562 Jan-31-2020, 11:08 AM
Last Post: DeaD_EyE
  printing selected item with Sqlite data whacky7 13 4,905 Jan-26-2020, 01:25 AM
Last Post: whacky7

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020