共计 20443 个字符,预计需要花费 52 分钟才能阅读完成。
本文将具体探讨如何在 Python 中连贯全品种数据库以及实现相应的 CRUD(创立,读取,更新,删除)操作。咱们将逐个解析连贯 MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB 数据库的办法,并演示相应的 CRUD 操作。
MySQL
连贯数据库
Python 能够应用 mysql-connector-python 库连贯 MySQL 数据库:
import mysql.connector
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
print("Opened MySQL database successfully")
conn.close()
CRUD 操作
接下来,咱们将展现在 MySQL 中如何进行根本的 CRUD 操作。
创立(Create)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
cursor = conn.cursor()
cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
print("Table created successfully")
conn.close()
读取(Retrieve)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("ADDRESS =", row[2])
print("SALARY =", row[3])
conn.close()
更新(Update)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()
print("Total number of rows updated :", cursor.rowcount)
conn.close()
删除(Delete)
conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')
cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()
print("Total number of rows deleted :", cursor.rowcount)
conn.close()
SQL Server
连贯数据库
Python 能够应用 pyodbc 库连贯 SQL Server 数据库:
import pyodbc
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
print("Opened SQL Server database successfully")
conn.close()
CRUD 操作
接下来,咱们将展现在 SQL Server 中如何进行根本的 CRUD 操作。
创立(Create)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
cursor = conn.cursor()
cursor.execute("CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)")
conn.commit()
print("Table created successfully")
conn.close()
读取(Retrieve)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("ADDRESS =", row[2])
print("SALARY =", row[3])
conn.close()
更新(Update)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()
print("Total number of rows updated :", cursor.rowcount)
conn.close()
删除(Delete)
conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')
cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()
print("Total number of rows deleted :", cursor.rowcount)
conn.close()
Oracle
连贯数据库
Python 能够应用 cx_Oracle 库连贯 Oracle 数据库:
import cx_Oracle
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
print("Opened Oracle database successfully")
conn.close()
CRUD 操作
接下来,咱们将展现在 Oracle 中如何进行根本的 CRUD 操作。
创立(Create)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
cursor = conn.cursor()
cursor.execute("CREATE TABLE Employees (ID NUMBER(10) NOT NULL PRIMARY KEY, NAME VARCHAR2(20) NOT NULL, AGE NUMBER(3), ADDRESS CHAR(50), SALARY NUMBER(10, 2))")
conn.commit()
print("Table created successfully")
conn.close()
读取(Retrieve)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("ADDRESS =", row[2])
print("SALARY =", row[3])
conn.close()
更新(Update)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()
print("Total number of rows updated :", cursor.rowcount)
conn.close()
删除(Delete)
dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database')
conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)
cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()
print("Total number of rows deleted :", cursor.rowcount)
conn.close()
PostgreSQL
连贯数据库
Python 能够应用 psycopg2 库连贯 PostgreSQL 数据库:
import psycopg2
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
print("Opened PostgreSQL database successfully")
conn.close()
CRUD 操作
接下来,咱们将展现在 PostgreSQL 中如何进行根本的 CRUD 操作。
创立(Create)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
cursor = conn.cursor()
cursor.execute('''CREATE TABLE Employees
(ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY REAL);''')
conn.commit()
print("Table created successfully")
conn.close()
读取(Retrieve)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("ADDRESS =", row[2])
print("SALARY =", row[3])
conn.close()
更新(Update)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()
print("Total number of rows updated :", cursor.rowcount)
conn.close()
删除(Delete)
conn = psycopg2.connect(database="my_database", user="username", password="password", host="127.0.0.1", port="5432")
cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()
print("Total number of rows deleted :", cursor.rowcount)
conn.close()
MongoDB
连贯数据库
Python 能够应用 pymongo 库连贯 MongoDB 数据库:
from pymongo import MongoClient
client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]
print("Opened MongoDB database successfully")
client.close()
CRUD 操作
接下来,咱们将展现在 MongoDB 中如何进行根本的 CRUD 操作。
创立(Create)
在 MongoDB 中,文档的创立操作通常蕴含在插入操作中:
client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]
employees = db["Employees"]
employee = {"id": "1", "name": "John", "age": "30", "address": "New York", "salary": "1000.00"}
employees.insert_one(employee)
print("Document inserted successfully")
client.close()
读取(Retrieve)
client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]
employees = db["Employees"]
cursor = employees.find()
for document in cursor:
print(document)
client.close()
更新(Update)
client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]
employees = db["Employees"]
query = {"id": "1"}
new_values = {"$set": { "salary": "25000.00"} }
employees.update_one(query, new_values)
print("Document updated successfully")
client.close()
删除(Delete)
client = MongoClient("mongodb://localhost:27017/")
db = client["my_database"]
employees = db["Employees"]
query = {"id": "1"}
employees.delete_one(query)
print("Document deleted successfully")
client.close()
SQLite
连贯数据库
Python 应用 sqlite3 库连贯 SQLite 数据库:
import sqlite3
conn = sqlite3.connect('my_database.db')
print("Opened SQLite database successfully")
conn.close()
CRUD 操作
接下来,咱们将展现在 SQLite 中如何进行根本的 CRUD 操作。
创立(Create)
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
cursor.execute('''CREATE TABLE Employees
(ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY REAL);''')
conn.commit()
print("Table created successfully")
conn.close()
读取(Retrieve)
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("ADDRESS =", row[2])
print("SALARY =", row[3])
conn.close()
更新(Update)
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()
print("Total number of rows updated :", cursor.rowcount)
conn.close()
删除(Delete)
conn = sqlite3.connect('my_database.db')
cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()
print("Total number of rows deleted :", cursor.rowcount)
conn.close()
DB2
连贯数据库
Python 能够应用 ibm_db 库连贯 DB2 数据库:
import ibm_db
dsn = ("DRIVER={{IBM DB2 ODBC DRIVER}};"
"DATABASE=my_database;"
"HOSTNAME=127.0.0.1;"
"PORT=50000;"
"PROTOCOL=TCPIP;"
"UID=username;"
"PWD=password;"
)
conn = ibm_db.connect(dsn, "","")
print("Opened DB2 database successfully")
ibm_db.close(conn)
CRUD 操作
接下来,咱们将展现在 DB2 中如何进行根本的 CRUD 操作。
创立(Create)
conn = ibm_db.connect(dsn, "","")
sql = '''CREATE TABLE Employees
(ID INT PRIMARY KEY NOT NULL,
NAME VARCHAR(20) NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY DECIMAL(9, 2));'''
stmt = ibm_db.exec_immediate(conn, sql)
print("Table created successfully")
ibm_db.close(conn)
读取(Retrieve)
conn = ibm_db.connect(dsn, "","")
sql = "SELECT id, name, address, salary from Employees"
stmt = ibm_db.exec_immediate(conn, sql)
while ibm_db.fetch_row(stmt):
print("ID =", ibm_db.result(stmt, "ID"))
print("NAME =", ibm_db.result(stmt, "NAME"))
print("ADDRESS =", ibm_db.result(stmt, "ADDRESS"))
print("SALARY =", ibm_db.result(stmt, "SALARY"))
ibm_db.close(conn)
更新(Update)
conn = ibm_db.connect(dsn, "","")
sql = "UPDATE Employees set SALARY = 25000.00 where ID = 1"
stmt = ibm_db.exec_immediate(conn, sql)
ibm_db.commit(conn)
print("Total number of rows updated :", ibm_db.num_rows(stmt))
ibm_db.close(conn)
删除(Delete)
conn = ibm_db.connect(dsn, "","")
sql = "DELETE from Employees where ID = 1"
stmt = ibm_db.exec_immediate(conn, sql)
ibm_db.commit(conn)
print("Total number of rows deleted :", ibm_db.num_rows(stmt))
ibm_db.close(conn)
Microsoft Access
连贯数据库
Python 能够应用 pyodbc 库连贯 Microsoft Access 数据库:
import pyodbc
conn_str = (r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
r'DBQ=path_to_your_access_file.accdb;'
)
conn = pyodbc.connect(conn_str)
print("Opened Access database successfully")
conn.close()
CRUD 操作
接下来,咱们将展现在 Access 中如何进行根本的 CRUD 操作。
创立(Create)
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
cursor.execute('''CREATE TABLE Employees
(ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
AGE INT NOT NULL,
ADDRESS CHAR(50),
SALARY DECIMAL(9, 2));''')
conn.commit()
print("Table created successfully")
conn.close()
读取(Retrieve)
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
cursor.execute("SELECT id, name, address, salary from Employees")
rows = cursor.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("ADDRESS =", row[2])
print("SALARY =", row[3])
conn.close()
更新(Update)
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
cursor.execute("UPDATE Employees set SALARY = 25000.00 where ID = 1")
conn.commit()
print("Total number of rows updated :", cursor.rowcount)
conn.close()
删除(Delete)
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
cursor.execute("DELETE from Employees where ID = 1")
conn.commit()
print("Total number of rows deleted :", cursor.rowcount)
conn.close()
Cassandra
连贯数据库
Python 能够应用 cassandra-driver 库连贯 Cassandra 数据库:
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
print("Opened Cassandra database successfully")
cluster.shutdown()
CRUD 操作
接下来,咱们将展现在 Cassandra 中如何进行根本的 CRUD 操作。
创立(Create)
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("""
CREATE TABLE Employees (
id int PRIMARY KEY,
name text,
age int,
address text,
salary decimal
)
""")
print("Table created successfully")
cluster.shutdown()
读取(Retrieve)
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
rows = session.execute('SELECT id, name, address, salary FROM Employees')
for row in rows:
print("ID =", row.id)
print("NAME =", row.name)
print("ADDRESS =", row.address)
print("SALARY =", row.salary)
cluster.shutdown()
更新(Update)
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("UPDATE Employees SET salary = 25000.00 WHERE id = 1")
print("Row updated successfully")
cluster.shutdown()
删除(Delete)
cluster = Cluster(['127.0.0.1'])
session = cluster.connect('my_keyspace')
session.execute("DELETE FROM Employees WHERE id = 1")
print("Row deleted successfully")
cluster.shutdown()
Redis
连贯数据库
Python 能够应用 redis-py 库连贯 Redis 数据库:
import redis
r = redis.Redis(host='localhost', port=6379, db=0)
print("Opened Redis database successfully")
CRUD 操作
接下来,咱们将展现在 Redis 中如何进行根本的 CRUD 操作。
创立(Create)
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:name', 'John')
r.set('employee:1:age', '30')
r.set('employee:1:address', 'New York')
r.set('employee:1:salary', '1000.00')
print("Keys created successfully")
读取(Retrieve)
r = redis.Redis(host='localhost', port=6379, db=0)
print("NAME =", r.get('employee:1:name').decode('utf-8'))
print("AGE =", r.get('employee:1:age').decode('utf-8'))
print("ADDRESS =", r.get('employee:1:address').decode('utf-8'))
print("SALARY =", r.get('employee:1:salary').decode('utf-8'))
更新(Update)
r = redis.Redis(host='localhost', port=6379, db=0)
r.set('employee:1:salary', '25000.00')
print("Key updated successfully")
删除(Delete)
r = redis.Redis(host='localhost', port=6379, db=0)
r.delete('employee:1:name', 'employee:1:age', 'employee:1:address', 'employee:1:salary')
print("Keys deleted successfully")
ElasticSearch
连贯数据库
Python 能够应用 elasticsearch 库连贯 ElasticSearch 数据库:
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
print("Opened ElasticSearch database successfully")
CRUD 操作
接下来,咱们将展现在 ElasticSearch 中如何进行根本的 CRUD 操作。
创立(Create)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
employee = {
'name': 'John',
'age': 30,
'address': 'New York',
'salary': 1000.00
}
res = es.index(index='employees', doc_type='employee', id=1, body=employee)
print("Document created successfully")
读取(Retrieve)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.get(index='employees', doc_type='employee', id=1)
print("Document details:")
for field, details in res['_source'].items():
print(f"{field.upper()} =", details)
更新(Update)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.update(index='employees', doc_type='employee', id=1, body={
'doc': {'salary': 25000.00}
})
print("Document updated successfully")
删除(Delete)
es = Elasticsearch([{'host': 'localhost', 'port': 9200}])
res = es.delete(index='employees', doc_type='employee', id=1)
print("Document deleted successfully")
Neo4j
连贯数据库
Python 能够应用 neo4j 库连贯 Neo4j 数据库:
from neo4j import GraphDatabase
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
print("Opened Neo4j database successfully")
driver.close()
CRUD 操作
接下来,咱们将展现在 Neo4j 中如何进行根本的 CRUD 操作。
创立(Create)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
session.run("CREATE (:Employee {id: 1, name:'John', age: 30, address:'New York', salary: 1000.00})")
print("Node created successfully")
driver.close()
读取(Retrieve)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
result = session.run("MATCH (n:Employee) WHERE n.id = 1 RETURN n")
for record in result:
print("ID =", record["n"]["id"])
print("NAME =", record["n"]["name"])
print("ADDRESS =", record["n"]["address"])
print("SALARY =", record["n"]["salary"])
driver.close()
更新(Update)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
session.run("MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00")
print("Node updated successfully")
driver.close()
删除(Delete)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
with driver.session() as session:
session.run("MATCH (n:Employee) WHERE n.id = 1 DETACH DELETE n")
print("Node deleted successfully")
driver.close()
InfluxDB
连贯数据库
Python 能够应用 InfluxDB-Python 库连贯 InfluxDB 数据库:
from influxdb import InfluxDBClient
client = InfluxDBClient(host='localhost', port=8086)
print("Opened InfluxDB database successfully")
client.close()
CRUD 操作
接下来,咱们将展现在 InfluxDB 中如何进行根本的 CRUD 操作。
创立(Create)
client = InfluxDBClient(host='localhost', port=8086)
json_body = [
{
"measurement": "employees",
"tags": {"id": "1"},
"fields": {
"name": "John",
"age": 30,
"address": "New York",
"salary": 1000.00
}
}
]
client.write_points(json_body)
print("Point created successfully")
client.close()
读取(Retrieve)
client = InfluxDBClient(host='localhost', port=8086)
result = client.query('SELECT"name","age","address","salary"FROM"employees"')
for point in result.get_points():
print("ID =", point['id'])
print("NAME =", point['name'])
print("AGE =", point['age'])
print("ADDRESS =", point['address'])
print("SALARY =", point['salary'])
client.close()
更新(Update)
InfluxDB 的数据模型和其余数据库不同,它没有更新操作。然而你能够通过写入一个雷同的数据点(即具备雷同的工夫戳和标签)并扭转字段值,实现相似更新操作的成果。
删除(Delete)
同样,InfluxDB 也没有提供删除单个数据点的操作。然而,你能够删除整个系列(即表)或者删除某个时间段的数据。
client = InfluxDBClient(host='localhost', port=8086)
# 删除整个系列
client.query('DROP SERIES FROM"employees"')
# 删除某个时间段的数据
# client.query('DELETE FROM"employees"WHERE time < now() - 1d')
print("Series deleted successfully")
client.close()
Snowflake
连贯数据库
Python 能够应用 snowflake-connector-python 库连贯 Snowflake 数据库:
from snowflake.connector import connect
con = connect(
user='username',
password='password',
account='account_url',
warehouse='warehouse',
database='database',
schema='schema'
)
print("Opened Snowflake database successfully")
con.close()
CRUD 操作
接下来,咱们将展现在 Snowflake 中如何进行根本的 CRUD 操作。
创立(Create)
con = connect(
user='username',
password='password',
account='account_url',
warehouse='warehouse',
database='database',
schema='schema'
)
cur = con.cursor()
cur.execute("""
CREATE TABLE EMPLOYEES (
ID INT,
NAME STRING,
AGE INT,
ADDRESS STRING,
SALARY FLOAT
)
""")
cur.execute("""
INSERT INTO EMPLOYEES (ID, NAME, AGE, ADDRESS, SALARY) VALUES
(1, 'John', 30, 'New York', 1000.00)
""")
print("Table created and row inserted successfully")
con.close()
读取(Retrieve)
con = connect(
user='username',
password='password',
account='account_url',
warehouse='warehouse',
database='database',
schema='schema'
)
cur = con.cursor()
cur.execute("SELECT * FROM EMPLOYEES WHERE ID = 1")
rows = cur.fetchall()
for row in rows:
print("ID =", row[0])
print("NAME =", row[1])
print("AGE =", row[2])
print("ADDRESS =", row[3])
print("SALARY =", row[4])
con.close()
更新(Update)
con = connect(
user='username',
password='password',
account='account_url',
warehouse='warehouse',
database='database',
schema='schema'
)
cur = con.cursor()
cur.execute("UPDATE EMPLOYEES SET SALARY = 25000.00 WHERE ID = 1")
print("Row updated successfully")
con.close()
删除(Delete)
con = connect(
user='username',
password='password',
account='account_url',
warehouse='warehouse',
database='database',
schema='schema'
)
cur = con.cursor()
cur.execute("DELETE FROM EMPLOYEES WHERE ID = 1")
print("Row deleted successfully")
con.close()
Amazon DynamoDB
连贯数据库
Python 能够应用 boto3 库连贯 Amazon DynamoDB:
import boto3
dynamodb = boto3.resource('dynamodb', region_name='us-west-2',
aws_access_key_id='Your AWS Access Key',
aws_secret_access_key='Your AWS Secret Key')
print("Opened DynamoDB successfully")
CRUD 操作
接下来,咱们将展现在 DynamoDB 中如何进行根本的 CRUD 操作。
创立(Create)
table = dynamodb.create_table(
TableName='Employees',
KeySchema=[
{
'AttributeName': 'id',
'KeyType': 'HASH'
},
],
AttributeDefinitions=[
{
'AttributeName': 'id',
'AttributeType': 'N'
},
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)
table.put_item(
Item={
'id': 1,
'name': 'John',
'age': 30,
'address': 'New York',
'salary': 1000.00
}
)
print("Table created and item inserted successfully")
读取(Retrieve)
table = dynamodb.Table('Employees')
response = table.get_item(
Key={'id': 1,}
)
item = response['Item']
print(item)
更新(Update)
table = dynamodb.Table('Employees')
table.update_item(
Key={'id': 1,},
UpdateExpression='SET salary = :val1',
ExpressionAttributeValues={':val1': 25000.00}
)
print("Item updated successfully")
删除(Delete)
table = dynamodb.Table('Employees')
table.delete_item(
Key={'id': 1,}
)
print("Item deleted successfully")
Microsoft Azure CosMos DB
连贯数据库
Python 能够应用 azure-cosmos 库连贯 Microsoft Azure CosMos DB:
from azure.cosmos import CosmosClient, PartitionKey, exceptions
url = 'Cosmos DB Account URL'
key = 'Cosmos DB Account Key'
client = CosmosClient(url, credential=key)
database_name = 'testDB'
database = client.get_database_client(database_name)
container_name = 'Employees'
container = database.get_container_client(container_name)
print("Opened CosMos DB successfully")
CRUD 操作
接下来,咱们将展现在 CosMos DB 中如何进行根本的 CRUD 操作。
创立(Create)
database = client.create_database_if_not_exists(id=database_name)
container = database.create_container_if_not_exists(
id=container_name,
partition_key=PartitionKey(path="/id"),
offer_throughput=400
)
container.upsert_item({
'id': '1',
'name': 'John',
'age': 30,
'address': 'New York',
'salary': 1000.00
})
print("Container created and item upserted successfully")
读取(Retrieve)
for item in container.read_all_items():
print(item)
更新(Update)
for item in container.read_all_items():
if item['id'] == '1':
item['salary'] = 25000.00
container.upsert_item(item)
print("Item updated successfully")
删除(Delete)
for item in container.read_all_items():
if item['id'] == '1':
container.delete_item(item, partition_key='1')
print("Item deleted successfully")
如有帮忙,请多关注
集体微信公众号:【Python 全视角】
TeahLead_KrisChang,10+ 年的互联网和人工智能从业教训,10 年 + 技术和业务团队治理教训,同济软件工程本科,复旦工程治理硕士,阿里云认证云服务资深架构师,上亿营收 AI 产品业务负责人。