Here are 2 examples for mock use case
using separate tables:
import random
import sqlite3
def measurments(num):
sensors = ('Sensor1', 'Sensor2', 'Sensor3')
for _ in range(num):
yield (random.choice(sensors), random.randint(30, 100))
class Sensor:
def __init__(self, sensor_id, sensor_type, sensor_brand, values=None):
self.id = sensor_id
self.type = sensor_type
self.brand = sensor_brand
self.values = values or []
def add_value(self, value):
self.values.append(value)
def __str__(self):
return (f"{self.id}, {self.type}, {self.brand}, values:{self.values}")
def create(db):
with sqlite3.connect(db) as conn:
conn.executescript(
"""DROP TABLE IF EXISTS "measurments";
DROP TABLE IF EXISTS "sensors";
CREATE TABLE "measurments" (
`SensorID` TEXT NOT NULL,
`Value` INTEGER NOT NULL
);
CREATE TABLE "sensors" (
`SensorID` TEXT NOT NULL,
`SensorType` TEXT NOT NULL,
`SensorBrand` TEXT NOT NULL
);
INSERT INTO `sensors` VALUES ('Sensor1','Temperature Sensor','Brand1'),
('Sensor2','Humidity Sensor','Brand2'),
('Sensor3','Temperature Sensor','Brand2');""")
conn.commit()
def read_data(db):
with sqlite3.connect(db) as conn:
# retrieve all data. This could be defined as view in the db, so that it is simpel in the code
data = conn.execute("""SELECT sen.SensorID, sen.SensorType, sen.SensorBrand, mes.Value
FROM sensors as sen
INNER JOIN measurments as mes
ON sen.SensorID = mes.SensorID
ORDER BY mes.Rowid""")
sensors = {}
for sensor_id, sensor_type, sensor_brand, value in data:
sensor = sensors.setdefault(sensor_id, Sensor(sensor_id, sensor_type, sensor_brand))
sensor.add_value(value)
return sensors
def write_data(db):
with sqlite3.connect(db) as conn:
# take measurments and write data to db:
for measurment in measurments(10): # simulate 10 random observations
conn.execute("""INSERT INTO measurments VALUES (?, ?);""", measurment)
conn.commit()
if __name__ == '__main__':
db = 'sample2.db'
create(db)
write_data(db)
for sensor_id, sensor in read_data(db).items():
print(sensor)
# you can do simply
with sqlite3.connect(db) as conn:
data = conn.execute("""SELECT value FROM measurments where SensorID='Sensor1';""")
values = [item for (item, ) in data]
print(values)
or using one table, with values comma-separated:
import random
import sqlite3
def measurments(num):
sensors = ('Sensor1', 'Sensor2', 'Sensor3')
for _ in range(num):
yield (random.choice(sensors), random.randint(30, 100))
class Sensor:
def __init__(self, sensor_id, sensor_type, sensor_brand, values=None):
self.id = sensor_id
self.type = sensor_type
self.brand = sensor_brand
if values is None:
self.values = []
else:
self.values = list(map(int, values.split(',')))
def add_value(self, value):
self.values.append(value)
def __str__(self):
return (f"{self.id}, {self.type}, {self.brand}, values:{self.values}")
# your way, sample1.db
def create(db):
with sqlite3.connect(db) as conn:
conn.executescript(
"""DROP TABLE IF EXISTS "sensors";
CREATE TABLE "sensors" (
`SensorID` TEXT NOT NULL,
`SensorType` TEXT NOT NULL,
`SensorBrand` TEXT NOT NULL,
`Measurments` TEXT
);
INSERT INTO `sensors` (SensorID,SensorType,SensorBrand, Measurments) VALUES ('Sensor1','Temperature Sensor','Brand1',NULL),
('Sensor2','Humidity Sensor','Brand2',NULL),
('Sensor3','Temperature Sensor','Brand2',NULL);""")
conn.commit()
def read_data(db):
with sqlite3.connect(db) as conn:
# retrieve initial data
data = conn.execute('SELECT * from sensors;')
sensors = {}
for record in data:
sensor = Sensor(*record)
sensors[sensor.id] = sensor
return sensors
def write_data(db):
with sqlite3.connect(db) as conn:
# take measurments and write data to db:
sensors = read_data(db)
for sensor_id, value in measurments(10): # simulate 10 random observations
sensor = sensors[sensor_id]
sensor.add_value(value)
conn.execute("""UPDATE sensors SET Measurments = ? WHERE `SensorID` = ?;""", (','.join(map(str, sensor.values)), sensor_id))
conn.commit()
if __name__ == '__main__':
db = 'sample1.db'
create(db)
write_data(db)
for sensor_id, sensor in read_data(db).items():
print(sensor)
with sqlite3.connect(db) as conn:
data = conn.execute("""SELECT Measurments FROM sensors WHERE SensorID='Sensor1';""")
values = [int(value) for value in data.fetchone()[0].split(',')]
print(values)
Note that this is sample implementation, so there might be different ways to do the same.
The setup is - having 3 Sensors, and we collect data from them to write to sqlite database