基于Rust、Java和DynamoDB的实用示例
以下是一些基于Rust和DynamoDB的实用示例,涵盖从基础操作到高级功能的实现方式。示例使用官方AWS SDK aws-sdk-dynamodb
或社区库如 rusoto_dynamodb
。
基础操作:表管理
创建表
use aws_sdk_dynamodb::{Client, Error};
async fn create_table(client: &Client, table_name: &str) -> Result<(), Error> {
let response = client
.create_table()
.table_name(table_name)
.key_schema(
aws_sdk_dynamodb::types::KeySchemaElement::builder()
.attribute_name("id")
.key_type(aws_sdk_dynamodb::types::KeyType::Hash)
.build(),
)
.attribute_definitions(
aws_sdk_dynamodb::types::AttributeDefinition::builder()
.attribute_name("id")
.attribute_type(aws_sdk_dynamodb::types::ScalarAttributeType::S)
.build(),
)
.billing_mode(aws_sdk_dynamodb::types::BillingMode::PayPerRequest)
.send()
.await?;
println!("Table created: {:?}", response.table_description);
Ok(())
}
删除表
async fn delete_table(client: &Client, table_name: &str) -> Result<(), Error> {
client.delete_table().table_name(table_name).send().await?;
println!("Table deleted");
Ok(())
}
数据操作:增删改查
插入数据
async fn put_item(client: &Client, table_name: &str, id: &str, value: &str) -> Result<(), Error> {
let item = serde_dynamo::to_item(
serde_json::json!({
"id": id,
"value": value
})
)?;
client
.put_item()
.table_name(table_name)
.set_item(Some(item))
.send()
.await?;
Ok(())
}
批量写入
async fn batch_write(client: &Client, table_name: &str, items: Vec<serde_json::Value>) -> Result<(), Error> {
let writes = items
.into_iter()
.map(|item| {
aws_sdk_dynamodb::types::WriteRequest::builder()
.put_request(
aws_sdk_dynamodb::types::PutRequest::builder()
.set_item(Some(serde_dynamo::to_item(item).unwrap()))
.build(),
)
.build()
})
.collect::<Vec<_>>();
client
.batch_write_item()
.request_items(table_name, writes)
.send()
.await?;
Ok(())
}
查询数据
async fn get_item(client: &Client, table_name: &str, id: &str) -> Result<(), Error> {
let response = client
.get_item()
.table_name(table_name)
.key("id", aws_sdk_dynamodb::types::AttributeValue::S(id.to_string()))
.send()
.await?;
println!("Item: {:?}", response.item);
Ok(())
}
高级功能
条件更新
async fn conditional_update(client: &Client, table_name: &str, id: &str, new_value: &str) -> Result<(), Error> {
client
.update_item()
.table_name(table_name)
.key("id", aws_sdk_dynamodb::types::AttributeValue::S(id.to_string()))
.update_expression("SET #v = :newval")
.condition_expression("#v = :oldval")
.expression_attribute_names("#v", "value")
.expression_attribute_values(":newval", aws_sdk_dynamodb::types::AttributeValue::S(new_value.to_string()))
.expression_attribute_values(":oldval", aws_sdk_dynamodb::types::AttributeValue::S("old_value".to_string()))
.send()
.await?;
Ok(())
}
事务操作
async fn transact_write(client: &Client, table_name: &str, id1: &str, id2: &str) -> Result<(), Error> {
client
.transact_write_items()
.transact_items(
aws_sdk_dynamodb::types::TransactWriteItem::builder()
.put(
aws_sdk_dynamodb::types::Put::builder()
.table_name(table_name)
.item("id", aws_sdk_dynamodb::types::AttributeValue::S(id1.to_string()))
.item("value", aws_sdk_dynamodb::types::AttributeValue::S("data1".to_string()))
.build(),
)
.build(),
)
.transact_items(
aws_sdk_dynamodb::types::TransactWriteItem::builder()
.delete(
aws_sdk_dynamodb::types::Delete::builder()
.table_name(table_name)
.key("id", aws_sdk_dynamodb::types::AttributeValue::S(id2.to_string()))
.build(),
)
.build(),
)
.send()
.await?;
Ok(())
}
索引与查询优化
全局二级索引查询
async fn query_gsi(client: &Client, table_name: &str, index_name: &str, value: &str) -> Result<(), Error> {
let response = client
.query()
.table_name(table_name)
.index_name(index_name)
.key_condition_expression("#v = :val")
.expression_attribute_names("#v", "value")
.expression_attribute_values(":val", aws_sdk_dynamodb::types::AttributeValue::S(value.to_string()))
.send()
.await?;
println!("Query results: {:?}", response.items);
Ok(())
}
分页处理
async fn paginated_scan(client: &Client, table_name: &str) -> Result<(), Error> {
let mut last_evaluated_key = None;
loop {
let response = client
.scan()
.table_name(table_name)
.set_exclusive_start_key(last_evaluated_key)
.limit(10)
.send()
.await?;
println!("Page items: {:?}", response.items);
last_evaluated_key = response.last_evaluated_key;
if last_evaluated_key.is_none() {
break;
}
}
Ok(())
}
实用工具
数据序列化工具
use serde::{Serialize, Deserialize};
#[derive(Serialize, Deserialize)]
struct Item {
id: String,
value: String,
}
async fn serialize_item(client: &Client, table_name: &str, item: Item) -> Result<(), Error> {
let item_av = serde_dynamo::to_item(item)?;
client
.put_item()
.table_name(table_name)
.set_item(Some(item_av))
.send()
.await?;
Ok(())
}
错误处理
async fn handle_errors(client: &Client, table_name: &str) {
match get_item(client, table_name, "nonexistent_id").await {
Ok(_) => println!("Item found"),
Err(aws_sdk_dynamodb::types::SdkError::ServiceError { err, .. }) => {
println!("DynamoDB error: {:?}", err);
}
Err(e) => println!("Unexpected error: {:?}", e),
}
}
以上示例覆盖了DynamoDB的核心功能,可根据实际需求调整参数或组合使用。完整项目建议参考AWS SDK官方文档和serde_dynamo
库的序列化能力。
Java与Amazon DynamoDB交互
以下是Java与Amazon DynamoDB交互的实用示例,涵盖基础操作到高级功能,均使用AWS SDK for Java 2.x版本实现。
基础操作示例
创建DynamoDB客户端
DynamoDbClient client = DynamoDbClient.builder()
.region(Region.US_EAST_1)
.credentialsProvider(ProfileCredentialsProvider.create("default"))
.build();
创建表
CreateTableRequest request = CreateTableRequest.builder()
.attributeDefinitions(AttributeDefinition.builder()
.attributeName("id")
.attributeType(ScalarAttributeType.S)
.build())
.keySchema(KeySchemaElement.builder()
.attributeName("id")
.keyType(KeyType.HASH)
.build())
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(10L)
.writeCapacityUnits(10L)
.build())
.tableName("Users")
.build();
client.createTable(request);
插入数据
PutItemRequest request = PutItemRequest.builder()
.tableName("Users")
.item(Map.of(
"id", AttributeValue.builder().s("user123").build(),
"name", AttributeValue.builder().s("John Doe").build(),
"age", AttributeValue.builder().n("30").build()
))
.build();
client.putItem(request);
获取单条数据
GetItemRequest request = GetItemRequest.builder()
.tableName("Users")
.key(Map.of("id", AttributeValue.builder().s("user123").build()))
.build();
GetItemResponse response = client.getItem(request);
批量写入
List<WriteRequest> writeRequests = new ArrayList<>();
writeRequests.add(WriteRequest.builder()
.putRequest(PutRequest.builder()
.item(Map.of(
"id", AttributeValue.builder().s("user456").build(),
"name", AttributeValue.builder().s("Jane Smith").build()
)).build())
.build());
BatchWriteItemRequest request = BatchWriteItemRequest.builder()
.requestItems(Map.of("Users", writeRequests))
.build();
client.batchWriteItem(request);
查询与扫描示例
简单查询
QueryRequest request = QueryRequest.builder()
.tableName("Orders")
.keyConditionExpression("customerId = :cid")
.expressionAttributeValues(Map.of(":cid", AttributeValue.builder().s("cust100").build()))
.build();
QueryResponse response = client.query(request);
带过滤的扫描
ScanRequest request = ScanRequest.builder()
.tableName("Products")
.filterExpression("price > :minPrice")
.expressionAttributeValues(Map.of(
":minPrice", AttributeValue.builder().n("50").build()
))
.build();
ScanResponse response = client.scan(request);
分页查询
QueryRequest request = QueryRequest.builder()
.tableName("Messages")
.limit(10)
.exclusiveStartKey(lastEvaluatedKey)
.build();
QueryResponse response = client.query(request);
Map<String, AttributeValue> lastKey = response.lastEvaluatedKey();
高级功能示例
事务操作
TransactWriteItemsRequest request = TransactWriteItemsRequest.builder()
.transactItems(
TransactWriteItem.builder()
.put(Put.builder()
.tableName("Inventory")
.item(/* item data */)
.build())
.build(),
TransactWriteItem.builder()
.update(Update.builder()
.tableName("Orders")
.key(/* key */)
.updateExpression("SET status = :s")
.expressionAttributeValues(/* values */)
.build())
.build()
)
.build();
client.transactWriteItems(request);
条件更新
UpdateItemRequest request = UpdateItemRequest.builder()
.tableName("Accounts")
.key(Map.of("accountId", AttributeValue.builder().s("acc123").build()))
.updateExpression("SET balance = balance + :deposit")
.conditionExpression("attribute_exists(accountId)")
.expressionAttributeValues(Map.of(
":deposit", AttributeValue.builder().n("100.00").build()
))
.build();
client.updateItem(request);
二级索引示例
全局二级索引查询
QueryRequest request = QueryRequest.builder()
.tableName("Products")
.indexName("CategoryIndex")
.keyConditionExpression("category = :cat")
.expressionAttributeValues(Map.of(
":cat", AttributeValue.builder().s("Electronics").build()
))
.build();
client.query(request);
错误处理示例
处理ConditionalCheckFailedException
try {
UpdateItemRequest request = /* request with condition */;
client.updateItem(request);
} catch (ConditionalCheckFailedException e) {
System.err.println("Item did not meet the expected conditions");
}
性能优化示例
启用分页自动迭代
QueryIterable queryIterable = client.queryPaginator(QueryRequest.builder()
.tableName("LargeTable")
.build());
queryIterable.items().forEach(item -> {
// 处理每个item
});
其他实用示例
获取表描述信息
DescribeTableRequest request = DescribeTableRequest.builder()
.tableName("Users")
.build();
DescribeTableResponse response = client.describeTable(request);
TableDescription table = response.table();
删除表
DeleteTableRequest request = DeleteTableRequest.builder()
.tableName("TempTable")
.build();
client.deleteTable(request);
更新表吞吐量
UpdateTableRequest request = UpdateTableRequest.builder()
.tableName("HighTrafficTable")
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(50L)
.writeCapacityUnits(50L)
.build())
.build();
client.updateTable(request);
以上示例覆盖DynamoDB核心操作场景,实际使用时需根据具体业务需求调整参数和异常处理逻辑。建议结合AWS官方文档进行更深入的功能探索。
基于Rust和MySQL的CRM系统实例
以下是一个基于Rust和MySQL的CRM系统实例,包含功能的代码示例。这些示例涵盖了从数据库连接到复杂查询的典型CRM操作。
数据库连接与初始化
use mysql::*;
use mysql::prelude::*;
let url = "mysql://username:password@localhost:3306/crm_db";
let pool = Pool::new(url)?;
let mut conn = pool.get_conn()?;
客户表创建
conn.query_drop(
r"CREATE TABLE IF NOT EXISTS customers (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) UNIQUE,
phone VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)"
)?;
插入新客户
let new_customer = ("John Doe", "john@example.com", "+123456789");
conn.exec_drop(
"INSERT INTO customers (name, email, phone) VALUES (?, ?, ?)",
new_customer
)?;
批量插入客户
let customers = vec![
("Alice Smith", "alice@example.com", "+987654321"),
("Bob Johnson", "bob@example.com", "+1122334455")
];
conn.exec_batch(
"INSERT INTO customers (name, email, phone) VALUES (?, ?, ?)",
customers.iter().map(|c| (c.0, c.1, c.2))
)?;
查询所有客户
let customers: Vec<(i32, String, String, String)> = conn.query_map(
"SELECT id, name, email, phone FROM customers",
|(id, name, email, phone)| (id, name, email, phone)
)?;
按ID查询客户
let customer_id = 1;
let customer: Option<(String, String)> = conn.query_first(
"SELECT name, email FROM customers WHERE id = ?",
(customer_id,)
)?;
更新客户信息
conn.exec_drop(
"UPDATE customers SET phone = ? WHERE id = ?",
("+15555555555", 1)
)?;
删除客户
conn.exec_drop(
"DELETE FROM customers WHERE id = ?",
(2,)
)?;
交易记录表
conn.query_drop(
r"CREATE TABLE IF NOT EXISTS transactions (
id INT AUTO_INCREMENT PRIMARY KEY,
customer_id INT NOT NULL,
amount DECIMAL(10,2) NOT NULL,
date DATE NOT NULL,
FOREIGN KEY (customer_id) REFERENCES customers(id)
)"
)?;
添加交易记录
let transaction = (1, 199.99, "2023-05-15");
conn.exec_drop(
"INSERT INTO transactions (customer_id, amount, date) VALUES (?, ?, ?)",
transaction
)?;
客户交易统计
let stats: Vec<(String, f64)> = conn.query_map(
r"SELECT c.name, SUM(t.amount) as total
FROM customers c
JOIN transactions t ON c.id = t.customer_id
GROUP BY c.id",
|(name, total)| (name, total)
)?;
分页查询客户
let page = 1;
let per_page = 10;
let offset = (page - 1) * per_page;
let customers: Vec<(String, String)> = conn.query_map(
"SELECT name, email FROM customers LIMIT ? OFFSET ?",
(per_page, offset),
|(name, email)| (name, email)
)?;
搜索客户
let search_term = "%john%";
let results: Vec<(String, String)> = conn.query_map(
"SELECT name, email FROM customers WHERE name LIKE ? OR email LIKE ?",
(search_term, search_term),
|(name, email)| (name, email)
)?;
客户活动记录表
conn.query_drop(
r"CREATE TABLE IF NOT EXISTS activities (
id INT AUTO_INCREMENT PRIMARY KEY,
customer_id