Python Forum
Error while transferring data from sqlite to elasticsearch - please help!
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Error while transferring data from sqlite to elasticsearch - please help!
#1
Hello to all members of the forum

There is a script that takes data from the db.sqlite database and transfers it to the elasticsearch database, which falls into an error related to the ContextManager, it does not reach how to solve.

let's get it in order:

there is a db.sqlite database

there is a running elasticsearch server on the locale - checked it works

the script takes data from the database and pushes it to the elasticsearch database

elasticsearch prepared for push, created index with correct schema

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import os
import json
import logging
import sqlite3
from contextlib import contextmanager
from typing import List
from urllib.parse import urljoin
import requests
db_path = os.path.abspath('db.sqlite')
logger = logging.getLogger()
def dict_factory(cursor: sqlite3.Cursor, row: tuple) -> dict:
    print("def dict_factory")
    d = {}
    for idx, col in enumerate(cursor.description):
        d[col[0]] = row[idx]
    return d
@contextmanager
def conn_context(db_path: str):
    print("def conn_context")
    conn = sqlite3.connect(db_path)
    conn.row_factory = dict_factory
    # yield conn
    # conn.close()
    try:
        yield conn
    except:
        print("exception")
    conn.close()
class ESLoader:
    def __init__(self, url: str):
        print("class ESLoader -> __init__")
        self.url = url
    def _get_es_bulk_query(self, rows: List[dict], index_name: str) -> List[str]:
        print("class ESLoader -> def _get_es_bulk_query")
        prepared_query = []
        for row in rows:
            prepared_query.extend([
            json.dumps({'index': {'_index': index_name, '_id': row['id']}}),
            json.dumps(row)
            ])
        return prepared_query
    def load_to_es(self, records: List[dict], index_name: str):
        print("class ESLoader -> def load_to_es")
        prepared_query = self._get_es_bulk_query(records, index_name)
        str_query = '\n'.join(prepared_query) + '\n'
        response = requests.post(
        urljoin(self.url, '_bulk'),
        data=str_query,
        headers={'Content-Type': 'application/x-ndjson'}
        )
        json_response = json.loads(response.content.decode())
        for item in json_response['items']:
            error_message = item['index'].get('error')
            if error_message:
                logger.error(error_message)
class ETL:
    SQL = '''
    WITH x as (
    SELECT m.id, group_concat(a.id) as actors_ids, group_concat(a.name) as
    actors_names
    FROM movies m
    LEFT JOIN movie_actors ma on m.id = ma.movie_id
    LEFT JOIN actors a on ma.actor_id = a.id
    GROUP BY m.id
    )
    SELECT m.id, genre, director, title, plot, imdb_rating, x.actors_ids, x.actors_names,
    CASE
    WHEN m.writers = '' THEN '[{"id": "' || m.writer || '"}]'
    ELSE m.writers
    END AS writers
    FROM movies m
    LEFT JOIN x ON m.id = x.id
    '''
    def __init__(self, conn: sqlite3.Connection, es_loader: ESLoader):
        print("class ETL -> __init__")
        self.es_loader = es_loader
        self.conn = conn
    def load_writers_names(self) -> dict:
             
        print("class ETL -> def load_writers_names")
        writers = {}
        # sql = '''SELECT DISTINCT id, name FROM writers'''
             
        for writer in self.conn.execute('''SELECT DISTINCT id, name FROM writers'''):
            writers[writer['id']] = writer
        return writers
    def _transform_row(self, row: dict, writers: dict) -> dict:
        print("class ETL -> def _transform_row")
        movie_writers = []
        writers_set = set()
        for writer in json.loads(row['writers']):
            writer_id = writer['id']
            if writers[writer_id]['name'] != 'N/A' and writer_id not in writers_set:
                movie_writers.append(writers[writer_id])
                writers_set.add(writer_id)
        actors = []
        actors_names = []
        if row['actors_ids'] is not None and row['actors_names'] is not None:
            actors = [
            {'id': _id, 'name': name}
            for _id, name in zip(row['actors_ids'].split(','), row['actors_names'].split(','))
            if name != 'N/A'
            ]
            actors_names = [x for x in row['actors_names'].split(',') if x != 'N/A']
        return {
            'id': row['id'],
            'genre': row['genre'].replace(' ', '').split(','),
            'writers': movie_writers,
            'actors': actors,
            'actors_names': actors_names,
            'writers_names': [x['name'] for x in movie_writers],
            'imdb_rating': float(row['imdb_rating']) if row['imdb_rating'] != 'N/A' else None,
            'title': row['title'],
            'director': [x.strip() for x in row['director'].split(',')] if row['director'] != 'N/A' else None,
            'description': row['plot'] if row['plot'] != 'N/A' else None
        }
    def load(self, index_name: str):
        print("class ETL -> def load")
        records = []
        writers = self.load_writers_names()
        for row in self.conn.execute(self.SQL):
            transformed_row = self._transform_row(row, writers)
            records.append(transformed_row)
        self.es_loader.load_to_es(records, index_name)
es_loader = ESLoader(url)
connn =  conn_context(db_path)
start = ETL(connn, es_loader)
index = 'movies'
start.load(index)
here is the error:

Error:
Traceback (most recent call last): File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 240, in <module> start.load (index) File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 221, in load writers = self.load_writers_names () File "D: \ Yandex_Midl \ p1 \ src \ script.py", line 153, in load_writers_names for writer in self.conn.execute ('' 'SELECT DISTINCT id, name FROM writers' ''): AttributeError: '_GeneratorContextManager' object has no attribute 'execute'
as far as I know, the problem is hidden in the function: conn_context

but I can't figure out how to solve it.

can anyone help me

thank you in advance
Larz60+ write Jun-11-2021, 08:04 PM:
Please post all code, output and errors (it it's entirety) between their respective tags. Refer to BBCode help topic on how to post. Use the "Preview Post" button to make sure the code is presented as you expect before hitting the "Post Reply/Thread" button.
Fixed for you this time. Please use bbcode tags on future posts.
Also pasted code in your post which is preferred.
Reply
#2
Welcome to the forum ps96068,
Well that is quite a large chunk of code to debug. You will understand it is difficult to say something about it when we can not test it.
It is better if you try to pick out small parts of your code until you have some 10 or 20 lines where the error is isolated.

But I will try to help you start. I see you are using "contextmanager". I have never used it but I found the manual here: contextmanager. There I read a contextmanager is used to manage the context of "with" statements.
According to the example given I expect the lines from 175 should be rewritten:
1
2
3
4
with  conn_context(db_path) as connn:
    ​start = ETL(connn, es_loader)
    ​index = 'movies'
    ​start.load(index)
ps96068 likes this post
Reply


Possibly Related Threads…
Thread Author Replies Views Last Post
  need help with data analysing with python and sqlite Hardcool 2 1,139 Jan-30-2024, 06:49 AM
Last Post: Athi
  Help With Python SQLite Error Extra 10 24,400 May-04-2022, 11:42 PM
Last Post: Extra
  pandas.errors.ParserError: Error tokenizing data. C error: Expected 9 fields in line Anldra12 9 23,176 Jun-15-2021, 08:16 AM
Last Post: Anldra12
  SQLite Unique constraint failed error djwilson0495 3 17,212 Aug-14-2020, 05:23 PM
Last Post: ndc85430
  Importing data from a text file into an SQLite database with Python macieju1974 7 6,482 Jun-29-2020, 08:51 PM
Last Post: buran
  bulk update in elasticsearch pythonlearner1 1 7,125 Jun-10-2020, 10:01 PM
Last Post: pythonlearner1
  Unable post request to AWS elasticsearch service Rupini 0 2,518 May-18-2020, 08:27 AM
Last Post: Rupini
  hi guys, I got data reader error while pulling the data from yahoo gokulrajkmv 1 2,581 May-15-2020, 11:08 AM
Last Post: snippsat
  Error SQLite objects created in a thread can only be used in that same thread. binhduonggttn 3 19,704 Jan-31-2020, 11:08 AM
Last Post: DeaD_EyE
  printing selected item with Sqlite data whacky7 13 7,399 Jan-26-2020, 01:25 AM
Last Post: whacky7

Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020