Python Forum
Thread Rating:
  • 0 Vote(s) - 0 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Insert results API in database
#1
Hello,

I am new in programmation and in Python.

I am beginning to play with the requests lib which is quiet impressive. My first tests was to retrieve some datas throught some websites :

import requests

r = requests.get("http://somewebsite")
print(r.headers)
Really simple, really smart.

I would like to know if i can insert the results in a databse to store these data and query it. If you have some tutorials or example i'll be very glad.

Regards,
Reply
#2
I use a method that saves the request to a file, and checks the created date on the file each time it's
run. If the file is newer than the refresh time, the file will be loaded rather than fetching a new copy,
otherwise it will get a new one.

This may be a bit more that you want, but you can look at the code to get an idea,
there are three modules involved:

CheckInternet.py - checks that there is a network connection available
GetUrl.py - Does the actual fetching of a url
GetPage.py - takes care of caching, calls GetUrl, which does the request

I modified the GetPage.py testit routine on the fly for this post without testing,
so if it doesn't work, let me know.

This code was part of a presentation I gave for a makeit meeting, and has a full jupyter notebook
document with the full code on github here: https://github.com/Larz60p/MakerProject
if you have any interest in how the above code was used in a full package.

prog1 CheckInternet.py:
import socket


class CheckInternet:
    def __init__(self):
        self.internet_available = False

    def check_availability(self):
        self.internet_available = False
        if socket.gethostbyname(socket.gethostname()) != '127.0.0.1':
            self.internet_available = True
        return self.internet_available


def testit():
    ci = CheckInternet()
    print('Please turn internet OFF, then press Enter')
    input()
    ci.check_availability()
    print(f'ci.internet_available: {ci.internet_available}')
    if not ci.internet_available:
        print('    Off test successful')
    else:
        print('    Off test failed')
    print('Please turn internet ON, then press Enter')
    input()
    ci.check_availability()
    print(f'ci.internet_available: {ci.internet_available}')
    if ci.internet_available:
        print('    On test successful')
    else:
        print('    On test failed')


if __name__ == '__main__':
    testit()
prog2 GetUrl.py:
import requests
import CheckInternet
import sys


class GetUrl:
    def __init__(self):
        self.ci = CheckInternet.CheckInternet()
        self.ok_status = 200
        self.r = None

    def fetch_url(self, url):
        self.r = None
        if self.ci.check_availability():
            self.r = requests.get(url, allow_redirects=False)
        return self.r


def testit():
    gu = GetUrl()
    page = gu.fetch_url('https://www.google.com/')
    count = 0
    maxcount = 20
    try:
        if page.status_code == 200:
            ptext = page.text.split('/n')
            for line in ptext:
                print(f'{line}\n')
                count += 1
                if count > maxcount:
                    break
        else:
            print(f'Error retreving file status code: {page.status_code}')
    except AttributeError:
        print('Please enable internet and try again')

if __name__ == '__main__':
    testit()
prog3 GetPage:
import GetUrl
import time
import sys

class GetPage:
    def __init__(self):
        """
        Initalize - Instantiate imported modules, initialize class variables
        """
        self.elapsed_hours = 0
        self.gu = GetUrl.GetUrl()
        self.savefile = None

    def get_page(self, url, savefile=None, refresh_hours_every=48):
        self.url = url
        self.savefile = savefile
        self.refresh_hours_every = refresh_hours_every
        self.page = None
        if self.savefile:
            if self.savefile.exists():
                lstats = savefile.lstat()
                self.elapsed_hours = (time.time() - lstats.st_mtime) / 3600
                if lstats.st_size == 0 or (self.elapsed_hours > self.refresh_hours_every):
                    self.page = self.download_new_file()
                else:
                    with self.savefile.open('r') as f:
                        self.page = f.read()
            else:
                self.page = self.download_new_file()
        else:
            self.page = self.download_new_file()
        return self.page


    def download_new_file(self):
        page = None
        try:
            page = self.gu.fetch_url(self.url)
            if page.status_code == 200:
                with self.savefile.open('wb') as f:
                    f.write(page.content)
            else:
                print(f'Invalid status code: {page.st}')
        except AttributeError:
            print('Please enable internet and try again')
        return page

def testit():
    from pathlib import Path
    homepath = Path('.')
    datapath = self.homepath / 'data'
    datapath.mkdir(exist_ok=True)
    htmlpath = self.datapath / 'html'
    htmlpath.mkdir(exist_ok=True)
    rfc_index_html = self.htmlpath / 'rfc_index.html'
    
    # Test url = rfc index download page, save to data/html/rfc_index.html, refresh always
    gp = GetPage()
    page = gp.get_page(url='https://www.rfc-editor.org/rfc/', savefile=rfc_index_html,
                       refresh_hours_every=0)
    if page:
        if page.status_code == 200:
            print(f'Page contents: {page.text}')
        else:
            print('Page is empty or in')

if __name__ == '__main__':
    testit()
Reply
#3
Try to use sqlalchemy when working with databases, eg:

from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

from sqlalchemy import create_engine
engine = create_engine('sqlite:////tmp/db.sqlite') # 

from sqlalchemy import Column, Text, String
class Header(Base):
    __tablename__ = 'headers'
    url = Column(String(500), unique=True, nullable=False,  primary_key=True)
    data = Column(Text)

Base.metadata.create_all(engine)

from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
session = Session()

import requests
url = "https://python-forum.io"
r = requests.get(url)

h = Header(url=url, data = str(r.headers))
print(h.data)

session.add(h)
session.commit()
then...

t@tbox:~$ sqlite3 /tmp/db.sqlite "select * from headers"
https://python-forum.io|{'Server': 'Apache', 'Cache-Control': 'no-store, ...
Reply


Forum Jump:

User Panel Messages

Announcements
Announcement #1 8/1/2020
Announcement #2 8/2/2020
Announcement #3 8/6/2020