1. Requirements
sudo pip3 install databases[sqlite]
2. Create a table
import sqlalchemy
engine = sqlalchemy.create_engine('sqlite:///resources.db')
metadata = sqlalchemy.MetaData()
metadata.bind = engine
resources = sqlalchemy.Table(
"resources",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("date", sqlalchemy.DateTime),
sqlalchemy.Column("text", sqlalchemy.String(length=100)),
)
metadata.create_all(engine)
3. Use databases to connect it
from databases import Database
from datetime import datetime
database = Database('sqlite:///resources.db')
await database.connect()
4. Insert one row
query = resources.insert()
values = {"date": datetime.now(), "text": "hi"}
await database.execute(query=query, values=values)
5. Insert many rows at once
query = resources.insert()
values = [
{"date": datetime.now(), "text": "hi1"},
{"date": datetime.now(), "text": "hi2"}
]
await database.execute_many(query=query, values=values)
6. Get all rows
query = resources.select()
await database.fetch_all(query=query)
7. Update one row
query = resources.update().values(text="no").where(resources.columns.text=="hi")
await database.execute(query=query)
8. Insert if the row does not exist
query = resources.select().where(resources.columns.text == "ssssss")
result = await database.fetch_all(query=query)
if not len(result):
query = resources.insert()
values = {"date": datetime.now(), "text": "ssssss"}
await database.execute(query=query, values=values)
9. Get all rows but in an iterator
query = resources.select()
async for row in database.iterate(query=query):
print(row)
10. Select rows based on date condition
# author: yingshaoxo
splitor = datetime(2020, 6, 2, 12, 48, 1, 218622)
query = resources.select().where(resources.columns.date < splitor)
await database.fetch_all(query=query)