文章目录
OceanBase与.NET开发概述
OceanBase是阿里巴巴集团自主研发的分布式关系型数据库,具有高可用、高性能、高扩展等特点。在.NET环境下开发OceanBase应用,主要通过ADO.NET接口或ORM框架实现。
OceanBase特点
- 分布式架构:支持水平扩展,数据自动分片
- 高可用性:基于Paxos协议的多副本一致性
- 兼容性:兼容MySQL协议,支持大部分MySQL语法
- HTAP能力:同时支持OLTP和OLAP场景
.NET开发选择
- ADO.NET:使用MySQL官方提供的Connector/NET
- ORM框架:Entity Framework Core、Dapper等
- 第三方驱动:如MySqlConnector等替代驱动
环境配置与连接管理
安装驱动
# 使用NuGet安装MySQL官方驱动
Install-Package MySql.Data
基础连接配置
using MySql.Data.MySqlClient;
string connectionString = "server=obproxy.mydomain.com;port=2883;user=root;password=yourpassword;database=test;";
using (var connection = new MySqlConnection(connectionString))
{
try
{
connection.Open();
Console.WriteLine("连接OceanBase成功!");
}
catch (Exception ex)
{
Console.WriteLine($"连接失败: {ex.Message}");
}
}
连接池配置
string connectionString = "server=obproxy.mydomain.com;port=2883;user=root;password=yourpassword;database=test;" +
"Pooling=true;MinimumPoolSize=5;MaximumPoolSize=100;ConnectionTimeout=30;";
// 高级连接池配置示例
var builder = new MySqlConnectionStringBuilder(connectionString)
{
Pooling = true,
MinimumPoolSize = 10,
MaximumPoolSize = 200,
ConnectionIdleTimeout = 300,
ConnectionLifeTime = 1800
};
多租户连接配置
OceanBase支持多租户架构,连接时需要指定租户:
string tenantConnectionString = "server=obproxy.mydomain.com;port=2883;user=user@tenant;password=password;database=test;";
基础CRUD操作
创建表
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
var createTableSql = @"
CREATE TABLE IF NOT EXISTS employees (
id BIGINT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE,
salary DECIMAL(15,2),
hire_date DATETIME,
department_id INT,
INDEX idx_department (department_id),
INDEX idx_name (name)
) PARTITION BY HASH(id) PARTITIONS 16;";
using (var command = new MySqlCommand(createTableSql, connection))
{
command.ExecuteNonQuery();
Console.WriteLine("表创建成功");
}
}
插入数据
// 单条插入
var insertSql = "INSERT INTO employees (id, name, email, salary, hire_date, department_id) " +
"VALUES (@id, @name, @email, @salary, @hire_date, @department_id)";
using (var command = new MySqlCommand(insertSql, connection))
{
command.Parameters.AddWithValue("@id", 1);
command.Parameters.AddWithValue("@name", "张三");
command.Parameters.AddWithValue("@email", "zhangsan@example.com");
command.Parameters.AddWithValue("@salary", 15000.50m);
command.Parameters.AddWithValue("@hire_date", DateTime.Now);
command.Parameters.AddWithValue("@department_id", 101);
int rowsAffected = command.ExecuteNonQuery();
Console.WriteLine($"插入了 {rowsAffected} 行数据");
}
查询数据
// 基础查询
var query = "SELECT id, name, email, salary, hire_date FROM employees WHERE department_id = @deptId";
using (var command = new MySqlCommand(query, connection))
{
command.Parameters.AddWithValue("@deptId", 101);
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"ID: {reader["id"]}, 姓名: {reader["name"]}, " +
$"邮箱: {reader["email"]}, 薪资: {reader.GetDecimal(3)}");
}
}
}
更新数据
var updateSql = "UPDATE employees SET salary = salary * 1.1 WHERE department_id = @deptId";
using (var command = new MySqlCommand(updateSql, connection))
{
command.Parameters.AddWithValue("@deptId", 101);
int rowsUpdated = command.ExecuteNonQuery();
Console.WriteLine($"更新了 {rowsUpdated} 行数据");
}
删除数据
var deleteSql = "DELETE FROM employees WHERE id = @id";
using (var command = new MySqlCommand(deleteSql, connection))
{
command.Parameters.AddWithValue("@id", 1);
int rowsDeleted = command.ExecuteNonQuery();
Console.WriteLine($"删除了 {rowsDeleted} 行数据");
}
事务处理
基础事务示例
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
{
try
{
// 插入员工记录
var insertEmp = "INSERT INTO employees (id, name, salary) VALUES (1001, '李四', 20000)";
using (var cmd1 = new MySqlCommand(insertEmp, connection, transaction))
{
cmd1.ExecuteNonQuery();
}
// 更新部门预算
var updateBudget = "UPDATE departments SET budget = budget - 20000 WHERE id = 10";
using (var cmd2 = new MySqlCommand(updateBudget, connection, transaction))
{
int affected = cmd2.ExecuteNonQuery();
if (affected == 0)
{
throw new Exception("部门不存在,回滚事务");
}
}
transaction.Commit();
Console.WriteLine("事务提交成功");
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($"事务回滚: {ex.Message}");
}
}
}
分布式事务处理
OceanBase支持分布式事务,可通过XA协议实现:
// XA事务示例
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
// 开启XA事务
using (var xaStart = new MySqlCommand("XA START 'transaction1'", connection))
{
xaStart.ExecuteNonQuery();
}
try
{
// 执行多个SQL操作
using (var cmd1 = new MySqlCommand("INSERT INTO table1 VALUES (1, 'data1')", connection))
{
cmd1.ExecuteNonQuery();
}
using (var cmd2 = new MySqlCommand("UPDATE table2 SET value = 'new' WHERE id = 1", connection))
{
cmd2.ExecuteNonQuery();
}
// 准备阶段
using (var xaEnd = new MySqlCommand("XA END 'transaction1'", connection))
using (var xaPrepare = new MySqlCommand("XA PREPARE 'transaction1'", connection))
{
xaEnd.ExecuteNonQuery();
xaPrepare.ExecuteNonQuery();
}
// 提交阶段
using (var xaCommit = new MySqlCommand("XA COMMIT 'transaction1'", connection))
{
xaCommit.ExecuteNonQuery();
Console.WriteLine("分布式事务提交成功");
}
}
catch (Exception)
{
// 回滚
using (var xaRollback = new MySqlCommand("XA ROLLBACK 'transaction1'", connection))
{
xaRollback.ExecuteNonQuery();
Console.WriteLine("分布式事务回滚");
}
throw;
}
}
批量操作与性能优化
批量插入
// 使用Bulk Insert提高性能
var insertSql = "INSERT INTO employees (id, name, email, salary, hire_date, department_id) " +
"VALUES (@id, @name, @email, @salary, @hire_date, @department_id)";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
using (var command = new MySqlCommand(insertSql, connection, transaction))
{
// 添加参数
command.Parameters.Add("@id", MySqlDbType.Int64);
command.Parameters.Add("@name", MySqlDbType.VarChar);
command.Parameters.Add("@email", MySqlDbType.VarChar);
command.Parameters.Add("@salary", MySqlDbType.Decimal);
command.Parameters.Add("@hire_date", MySqlDbType.DateTime);
command.Parameters.Add("@department_id", MySqlDbType.Int32);
// 批量添加数据
for (int i = 0; i < 1000; i++)
{
command.Parameters["@id"].Value = 2000 + i;
command.Parameters["@name"].Value = $"员工_{i}";
command.Parameters["@email"].Value = $"user_{i}@company.com";
command.Parameters["@salary"].Value = 5000 + (i % 10) * 1000;
command.Parameters["@hire_date"].Value = DateTime.Now.AddDays(-i);
command.Parameters["@department_id"].Value = 100 + (i % 5);
command.ExecuteNonQuery();
}
transaction.Commit();
Console.WriteLine("批量插入完成");
}
}
使用MySqlBulkLoader
// 创建CSV文件
File.WriteAllText("employees.csv",
"1,张三,zhangsan@example.com,15000.50,2023-01-15,101\n" +
"2,李四,lisi@example.com,18000.00,2022-11-20,102");
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
var bulkLoader = new MySqlBulkLoader(connection)
{
TableName = "employees",
FieldTerminator = ",",
LineTerminator = "\n",
FileName = "employees.csv",
NumberOfLinesToSkip = 0,
Columns = { "id", "name", "email", "salary", "hire_date", "department_id" }
};
int count = bulkLoader.Load();
Console.WriteLine($"通过BulkLoader插入了 {count} 行数据");
}
批量更新
// 使用CASE WHEN实现批量更新
var updateSql = @"
UPDATE employees
SET salary = CASE id
WHEN @id1 THEN @salary1
WHEN @id2 THEN @salary2
WHEN @id3 THEN @salary3
END
WHERE id IN (@id1, @id2, @id3)";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(updateSql, connection))
{
command.Parameters.AddWithValue("@id1", 101);
command.Parameters.AddWithValue("@salary1", 20000);
command.Parameters.AddWithValue("@id2", 102);
command.Parameters.AddWithValue("@salary2", 22000);
command.Parameters.AddWithValue("@id3", 103);
command.Parameters.AddWithValue("@salary3", 25000);
int rowsUpdated = command.ExecuteNonQuery();
Console.WriteLine($"批量更新了 {rowsUpdated} 行数据");
}
}
存储过程与函数调用
创建存储过程
var createProcSql = @"
CREATE PROCEDURE increase_salary(IN dept_id INT, IN increase_percent DECIMAL(5,2))
BEGIN
UPDATE employees
SET salary = salary * (1 + increase_percent / 100)
WHERE department_id = dept_id;
SELECT COUNT(*) AS affected_rows FROM employees WHERE department_id = dept_id;
END";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(createProcSql, connection))
{
command.ExecuteNonQuery();
Console.WriteLine("存储过程创建成功");
}
}
调用存储过程
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand("increase_salary", connection))
{
command.CommandType = CommandType.StoredProcedure;
command.Parameters.AddWithValue("@dept_id", 101);
command.Parameters.AddWithValue("@increase_percent", 5.0m);
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"影响了 {reader["affected_rows"]} 行数据");
}
}
}
}
使用输出参数
var createProcWithOutput = @"
CREATE PROCEDURE get_employee_stats(IN dept_id INT, OUT avg_salary DECIMAL(15,2), OUT max_salary DECIMAL(15,2))
BEGIN
SELECT AVG(salary), MAX(salary) INTO avg_salary, max_salary
FROM employees WHERE department_id = dept_id;
END";
// 先创建存储过程
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(createProcWithOutput, connection))
{
command.ExecuteNonQuery();
}
}
// 调用带输出参数的存储过程
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand("get_employee_stats", connection))
{
command.CommandType = CommandType.StoredProcedure;
command.Parameters.AddWithValue("@dept_id", 101);
var avgSalaryParam = new MySqlParameter("@avg_salary", MySqlDbType.Decimal)
{
Direction = ParameterDirection.Output,
Precision = 15,
Scale = 2
};
command.Parameters.Add(avgSalaryParam);
var maxSalaryParam = new MySqlParameter("@max_salary", MySqlDbType.Decimal)
{
Direction = ParameterDirection.Output,
Precision = 15,
Scale = 2
};
command.Parameters.Add(maxSalaryParam);
command.ExecuteNonQuery();
Console.WriteLine($"部门101的平均薪资: {avgSalaryParam.Value}, 最高薪资: {maxSalaryParam.Value}");
}
}
数据类型映射与处理
.NET与OceanBase类型映射
.NET类型 | OceanBase类型 | MySqlDbType枚举 |
---|---|---|
int | INT | Int32 |
long | BIGINT | Int64 |
decimal | DECIMAL | Decimal |
float | FLOAT | Float |
double | DOUBLE | Double |
string | VARCHAR, CHAR | VarChar, String |
DateTime | DATETIME | DateTime |
bool | TINYINT(1) | Bool |
byte[] | BLOB | Blob |
特殊类型处理
JSON类型处理
// 创建包含JSON列的表
var createTableWithJson = @"
CREATE TABLE IF NOT EXISTS products (
id BIGINT PRIMARY KEY,
name VARCHAR(100),
attributes JSON,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)";
// 插入JSON数据
var insertJson = "INSERT INTO products (id, name, attributes) VALUES (1, '智能手机', @attributes)";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
// 创建表
using (var cmdCreate = new MySqlCommand(createTableWithJson, connection))
{
cmdCreate.ExecuteNonQuery();
}
// 插入JSON数据
using (var cmdInsert = new MySqlCommand(insertJson, connection))
{
var json = new {
color = "黑色",
memory = "128GB",
camera = "48MP",
features = new[] { "防水", "面部识别" }
};
cmdInsert.Parameters.AddWithValue("@attributes", JsonConvert.SerializeObject(json));
cmdInsert.ExecuteNonQuery();
}
// 查询JSON数据
using (var cmdQuery = new MySqlCommand("SELECT id, name, attributes->'$.color' AS color FROM products", connection))
using (var reader = cmdQuery.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"产品: {reader["name"]}, 颜色: {reader["color"]}");
}
}
}
枚举类型处理
// 枚举处理示例
public enum EmployeeStatus { Active, OnLeave, Terminated }
var insertWithEnum = "INSERT INTO employees (id, name, status) VALUES (@id, @name, @status)";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(insertWithEnum, connection))
{
command.Parameters.AddWithValue("@id", 1001);
command.Parameters.AddWithValue("@name", "王五");
command.Parameters.AddWithValue("@status", EmployeeStatus.Active.ToString());
command.ExecuteNonQuery();
}
}
高级查询技术
分页查询
// OceanBase分页查询(兼容MySQL语法)
public List<Employee> GetEmployeesPaged(int pageNumber, int pageSize, string sortColumn, bool sortAsc)
{
var employees = new List<Employee>();
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
var offset = (pageNumber - 1) * pageSize;
var sortDirection = sortAsc ? "ASC" : "DESC";
var query = $@"
SELECT id, name, email, salary, hire_date, department_id
FROM employees
ORDER BY {sortColumn} {sortDirection}
LIMIT {pageSize} OFFSET {offset}";
using (var command = new MySqlCommand(query, connection))
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
employees.Add(new Employee
{
Id = reader.GetInt64("id"),
Name = reader.GetString("name"),
Email = reader.IsDBNull("email") ? null : reader.GetString("email"),
Salary = reader.GetDecimal("salary"),
HireDate = reader.GetDateTime("hire_date"),
DepartmentId = reader.GetInt32("department_id")
});
}
}
}
return employees;
}
窗口函数
// 使用窗口函数计算排名
var windowQuery = @"
SELECT
id,
name,
salary,
department_id,
RANK() OVER (PARTITION BY department_id ORDER BY salary DESC) AS dept_salary_rank,
salary - LAG(salary, 1, 0) OVER (PARTITION BY department_id ORDER BY salary) AS salary_diff
FROM employees
ORDER BY department_id, salary DESC";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(windowQuery, connection))
using (var reader = command.ExecuteReader())
{
Console.WriteLine("ID\t姓名\t薪资\t部门\t部门排名\t薪资差");
while (reader.Read())
{
Console.WriteLine($"{reader["id"]}\t{reader["name"]}\t{reader["salary"]}\t" +
$"{reader["department_id"]}\t{reader["dept_salary_rank"]}\t" +
$"{reader["salary_diff"]}");
}
}
}
全文检索
// 创建全文索引
var createFulltextIndex = @"
ALTER TABLE employees
ADD FULLTEXT INDEX ft_name_email (name, email)";
// 全文检索查询
var fulltextQuery = @"
SELECT id, name, email,
MATCH(name, email) AGAINST(@searchTerm IN NATURAL LANGUAGE MODE) AS relevance
FROM employees
WHERE MATCH(name, email) AGAINST(@searchTerm IN NATURAL LANGUAGE MODE)
ORDER BY relevance DESC";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
// 创建全文索引
using (var cmdIndex = new MySqlCommand(createFulltextIndex, connection))
{
cmdIndex.ExecuteNonQuery();
}
// 执行全文检索
using (var cmdSearch = new MySqlCommand(fulltextQuery, connection))
{
cmdSearch.Parameters.AddWithValue("@searchTerm", "张经理 OR 总监");
using (var reader = cmdSearch.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"匹配结果: {reader["name"]} ({reader["email"]}), 相关度: {reader["relevance"]}");
}
}
}
}
JSON数据处理
OceanBase支持丰富的JSON函数,可以高效处理JSON数据。
JSON查询与修改
// JSON数据查询与修改示例
var jsonOperations = @"
-- 插入JSON数据
INSERT INTO products (id, name, attributes)
VALUES (2, '笔记本电脑', JSON_OBJECT('brand', 'Dell', 'cpu', 'i7', 'ram', '16GB'));
-- 查询JSON属性
SELECT id, name, JSON_EXTRACT(attributes, '$.brand') AS brand FROM products;
-- 修改JSON属性
UPDATE products
SET attributes = JSON_SET(attributes, '$.ram', '32GB', '$.storage', '1TB SSD')
WHERE id = 2;
-- 查询修改后的JSON
SELECT id, name, attributes FROM products WHERE id = 2;";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
// 执行多条JSON操作
using (var command = new MySqlCommand(jsonOperations, connection))
{
using (var reader = command.ExecuteReader())
{
do
{
while (reader.Read())
{
if (reader.FieldCount == 3)
{
Console.WriteLine($"产品: {reader["name"]}, 品牌: {reader["brand"]}");
}
else if (reader.FieldCount == 2)
{
Console.WriteLine($"产品: {reader["name"]}, 完整属性: {reader["attributes"]}");
}
}
} while (reader.NextResult());
}
}
}
JSON数组处理
// JSON数组处理示例
var jsonArrayQuery = @"
-- 创建包含JSON数组的表
CREATE TABLE IF NOT EXISTS orders (
id BIGINT PRIMARY KEY,
customer_id BIGINT,
items JSON,
order_date DATETIME
);
-- 插入包含数组的JSON数据
INSERT INTO orders (id, customer_id, items, order_date)
VALUES (1, 1001,
JSON_ARRAY(
JSON_OBJECT('product_id', 101, 'quantity', 2, 'price', 5999),
JSON_OBJECT('product_id', 205, 'quantity', 1, 'price', 1299)
),
NOW());
-- 查询JSON数组
SELECT
id,
customer_id,
JSON_LENGTH(items) AS item_count,
JSON_EXTRACT(items, '$[0].product_id') AS first_product_id,
JSON_SEARCH(items, 'one', 205, NULL, '$[*].product_id') AS item_path
FROM orders;";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(jsonArrayQuery, connection))
{
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
Console.WriteLine($"订单ID: {reader["id"]}, 商品数量: {reader["item_count"]}, " +
$"第一个商品ID: {reader["first_product_id"]}, " +
$"商品205路径: {reader["item_path"]}");
}
}
}
}
分布式事务处理
OceanBase作为分布式数据库,提供了强大的分布式事务支持。
跨分区事务
// 跨分区事务示例
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
{
try
{
// 操作分区表1
var updateAccount = @"
UPDATE accounts
SET balance = balance - 1000
WHERE account_id = 'A1001'";
using (var cmd1 = new MySqlCommand(updateAccount, connection, transaction))
{
int affected = cmd1.ExecuteNonQuery();
if (affected == 0)
{
throw new Exception("账户A1001不存在或余额不足");
}
}
// 操作分区表2
var insertTransaction = @"
INSERT INTO transactions
(txn_id, from_account, to_account, amount, txn_time)
VALUES
(UUID(), 'A1001', 'B2001', 1000, NOW())";
using (var cmd2 = new MySqlCommand(insertTransaction, connection, transaction))
{
cmd2.ExecuteNonQuery();
}
// 操作分区表3
var updateTargetAccount = @"
UPDATE accounts
SET balance = balance + 1000
WHERE account_id = 'B2001'";
using (var cmd3 = new MySqlCommand(updateTargetAccount, connection, transaction))
{
int affected = cmd3.ExecuteNonQuery();
if (affected == 0)
{
throw new Exception("目标账户B2001不存在");
}
}
transaction.Commit();
Console.WriteLine("分布式事务提交成功");
}
catch (Exception ex)
{
transaction.Rollback();
Console.WriteLine($"分布式事务回滚: {ex.Message}");
throw;
}
}
}
两阶段提交(2PC)模式
// 两阶段提交示例
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
// 第一阶段:准备
using (var cmdPrepare1 = new MySqlCommand("XA START 'txn123'", connection))
using (var cmdPrepare2 = new MySqlCommand("UPDATE accounts SET balance = balance - 500 WHERE account_id = 'A1001'", connection))
using (var cmdPrepare3 = new MySqlCommand("XA END 'txn123'", connection))
using (var cmdPrepare4 = new MySqlCommand("XA PREPARE 'txn123'", connection))
{
cmdPrepare1.ExecuteNonQuery();
cmdPrepare2.ExecuteNonQuery();
cmdPrepare3.ExecuteNonQuery();
cmdPrepare4.ExecuteNonQuery();
}
try
{
// 第二阶段:提交
using (var cmdCommit = new MySqlCommand("XA COMMIT 'txn123'", connection))
{
cmdCommit.ExecuteNonQuery();
Console.WriteLine("两阶段提交完成");
}
}
catch (Exception)
{
// 如果提交失败,尝试回滚
using (var cmdRollback = new MySqlCommand("XA ROLLBACK 'txn123'", connection))
{
cmdRollback.ExecuteNonQuery();
Console.WriteLine("两阶段提交失败,已回滚");
}
throw;
}
}
性能监控与调优
查询执行计划分析
// 获取查询执行计划
var explainQuery = "EXPLAIN SELECT * FROM employees WHERE department_id = 101 AND salary > 10000";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(explainQuery, connection))
using (var reader = command.ExecuteReader())
{
Console.WriteLine("执行计划分析结果:");
Console.WriteLine("ID\tSELECT_TYPE\tTABLE\tTYPE\tPOSSIBLE_KEYS\tKEY\tKEY_LEN\tREF\tROWS\tEXTRA");
while (reader.Read())
{
Console.WriteLine($"{reader["id"]}\t{reader["select_type"]}\t" +
$"{reader["table"]}\t{reader["type"]}\t" +
$"{reader["possible_keys"]}\t{reader["key"]}\t" +
$"{reader["key_len"]}\t{reader["ref"]}\t" +
$"{reader["rows"]}\t{reader["Extra"]}");
}
}
}
性能监控查询
// 查询OceanBase性能视图
var performanceQueries = @"
-- 查看当前会话
SHOW PROCESSLIST;
-- 查看表统计信息
SHOW TABLE STATUS LIKE 'employees';
-- 查看索引统计信息
SHOW INDEX FROM employees;
-- 查看OceanBase系统视图
SELECT * FROM oceanbase.v$sql_audit ORDER BY request_time DESC LIMIT 10;";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(performanceQueries, connection))
{
using (var reader = command.ExecuteReader())
{
do
{
Console.WriteLine($"--- {command.CommandText.Substring(0, 30)}... ---");
for (int i = 0; i < reader.FieldCount; i++)
{
Console.Write($"{reader.GetName(i)}\t");
}
Console.WriteLine();
while (reader.Read())
{
for (int i = 0; i < reader.FieldCount; i++)
{
Console.Write($"{reader[i]}\t");
}
Console.WriteLine();
}
Console.WriteLine();
} while (reader.NextResult());
}
}
}
连接池监控
// 获取连接池状态
var poolStats = typeof(MySqlConnection).GetProperty("PoolStats",
System.Reflection.BindingFlags.NonPublic | System.Reflection.BindingFlags.Static);
if (poolStats != null)
{
var stats = poolStats.GetValue(null);
Console.WriteLine("连接池状态:");
Console.WriteLine($"Number of pools: {stats.GetType().GetProperty("NumberOfPools").GetValue(stats)}");
Console.WriteLine($"Available connections: {stats.GetType().GetProperty("Available").GetValue(stats)}");
Console.WriteLine($"Connections in use: {stats.GetType().GetProperty("InUse").GetValue(stats)}");
}
异常处理与日志记录
常见异常处理
try
{
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
var query = "SELECT * FROM non_existing_table";
using (var command = new MySqlCommand(query, connection))
{
command.ExecuteReader();
}
}
}
catch (MySqlException ex)
{
switch (ex.Number)
{
case 1045: // 访问被拒绝
Console.WriteLine("数据库访问被拒绝,请检查用户名和密码");
break;
case 1049: // 未知数据库
Console.WriteLine($"指定的数据库不存在: {ex.Message}");
break;
case 1146: // 表不存在
Console.WriteLine($"表不存在: {ex.Message}");
// 尝试创建表或提示用户
break;
case 1213: // 死锁
Console.WriteLine("发生死锁,请重试操作");
break;
case 2006: // 服务器已断开连接
Console.WriteLine("与数据库的连接已断开,尝试重新连接");
break;
case 2013: // 查询期间丢失连接
Console.WriteLine("查询期间连接丢失,请检查网络或重试");
break;
default:
Console.WriteLine($"数据库错误({ex.Number}): {ex.Message}");
break;
}
// 记录完整异常信息
LogError(ex);
}
catch (Exception ex)
{
Console.WriteLine($"发生意外错误: {ex.Message}");
LogError(ex);
}
void LogError(Exception ex)
{
// 实际项目中可以使用Log4Net、NLog等日志框架
string logMessage = $"[{DateTime.Now}] ERROR: {ex.GetType().Name}\n" +
$"Message: {ex.Message}\n" +
$"Stack Trace:\n{ex.StackTrace}\n";
if (ex is MySqlException mySqlEx)
{
logMessage += $"MySQL Error Number: {mySqlEx.Number}\n";
}
File.AppendAllText("db_errors.log", logMessage + new string('-', 50) + "\n");
}
重试机制实现
public T ExecuteWithRetry<T>(Func<T> operation, int maxRetries = 3, int delayMs = 1000)
{
int retryCount = 0;
while (true)
{
try
{
return operation();
}
catch (MySqlException ex) when (IsTransientError(ex) && retryCount < maxRetries)
{
retryCount++;
Console.WriteLine($"遇到临时错误,正在进行第 {retryCount} 次重试: {ex.Message}");
Thread.Sleep(delayMs * retryCount); // 指数退避
}
catch (Exception)
{
throw;
}
}
}
private bool IsTransientError(MySqlException ex)
{
// 定义需要重试的错误代码
int[] transientErrorNumbers = { 1213, 1205, 2006, 2013, 1040, 1317 };
// 连接超时或死锁等临时性错误
return transientErrorNumbers.Contains(ex.Number) ||
ex.Message.Contains("deadlock") ||
ex.Message.Contains("timeout");
}
// 使用重试机制
var result = ExecuteWithRetry(() => {
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var cmd = new MySqlCommand("SELECT COUNT(*) FROM employees", connection))
{
return Convert.ToInt32(cmd.ExecuteScalar());
}
}
});
Console.WriteLine($"员工总数: {result}");
安全最佳实践
参数化查询防止SQL注入
// 不安全的拼接SQL方式 - 容易受到SQL注入攻击
var unsafeQuery = $"SELECT * FROM users WHERE username = '{userInput}' AND password = '{passwordInput}'";
// 安全的参数化查询方式
var safeQuery = "SELECT * FROM users WHERE username = @username AND password = @password";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
using (var command = new MySqlCommand(safeQuery, connection))
{
command.Parameters.AddWithValue("@username", userInput);
command.Parameters.AddWithValue("@password", passwordInput);
using (var reader = command.ExecuteReader())
{
if (reader.HasRows)
{
Console.WriteLine("登录成功");
}
else
{
Console.WriteLine("用户名或密码错误");
}
}
}
}
使用最小权限原则
// 为应用创建专用用户并授予最小必要权限
var createAppUser = @"
-- 创建只读用户
CREATE USER 'app_readonly'@'%' IDENTIFIED BY 'StrongPassword123!';
GRANT SELECT ON mydb.* TO 'app_readonly'@'%';
-- 创建读写用户
CREATE USER 'app_readwrite'@'%' IDENTIFIED BY 'EvenStrongerPassword456!';
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'app_readwrite'@'%';
-- 创建仅能访问特定表的用户
CREATE USER 'report_user'@'%' IDENTIFIED BY 'ReportPassword789!';
GRANT SELECT ON mydb.sales_data TO 'report_user'@'%';
GRANT SELECT ON mydb.products TO 'report_user'@'%';";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
foreach (var statement in createAppUser.Split(';'))
{
if (!string.IsNullOrWhiteSpace(statement))
{
using (var command = new MySqlCommand(statement, connection))
{
command.ExecuteNonQuery();
}
}
}
Console.WriteLine("安全用户创建完成");
}
敏感数据加密
// 使用AES加密敏感数据
public static string Encrypt(string plainText, string key)
{
using (Aes aes = Aes.Create())
{
aes.Key = Encoding.UTF8.GetBytes(key);
aes.IV = new byte[16]; // 在实际应用中应使用随机IV
ICryptoTransform encryptor = aes.CreateEncryptor(aes.Key, aes.IV);
using (MemoryStream ms = new MemoryStream())
using (CryptoStream cs = new CryptoStream(ms, encryptor, CryptoStreamMode.Write))
{
using (StreamWriter sw = new StreamWriter(cs))
{
sw.Write(plainText);
}
return Convert.ToBase64String(ms.ToArray());
}
}
}
// 在数据库中存储加密数据
var insertSensitiveData = "INSERT INTO customers (id, name, encrypted_credit_card) VALUES (@id, @name, @card)";
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
string creditCardNumber = "4111111111111111";
string encryptionKey = "my-secret-key-123"; // 实际应用中应从安全的地方获取
using (var command = new MySqlCommand(insertSensitiveData, connection))
{
command.Parameters.AddWithValue("@id", 1001);
command.Parameters.AddWithValue("@name", "张三");
command.Parameters.AddWithValue("@card", Encrypt(creditCardNumber, encryptionKey));
command.ExecuteNonQuery();
Console.WriteLine("加密数据存储完成");
}
}
ORM集成
使用Dapper进行数据访问
// 安装Dapper
// Install-Package Dapper
public class Employee
{
public long Id { get; set; }
public string Name { get; set; }
public string Email { get; set; }
public decimal Salary { get; set; }
public DateTime HireDate { get; set; }
public int DepartmentId { get; set; }
}
using Dapper;
// 查询示例
using (var connection = new MySqlConnection(connectionString))
{
connection.Open();
// 简单查询
var employees = connection.Query<Employee>(
"SELECT * FROM employees WHERE department_id = @deptId",
new { deptId = 101 });
foreach (var emp in employees)
{
Console.WriteLine($"{emp.Id}: {emp.Name}, {emp.Salary:C}");
}
// 多映射查询
var sql = @"
SELECT e.*, d.name AS department_name
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.salary > @minSalary";
var results = connection.Query<Employee, string, dynamic>(
sql,
(employee, departmentName) =>
{
return new { Employee = employee, Department = departmentName };
},
new { minSalary = 10000 },
splitOn: "department_name");
foreach (var item in results)
{
Console.WriteLine($"{item.Employee.Name} 在 {item.Department} 部门");
}
// 执行存储过程
var parameters = new DynamicParameters();
parameters.Add("@dept_id", 101);
parameters.Add("@increase_percent", 5.0m);
parameters.Add("@affected_rows", dbType: DbType.Int32, direction: ParameterDirection.Output);
connection.Execute("increase_salary", parameters, commandType: CommandType.StoredProcedure);
Console.WriteLine($"影响了 {parameters.Get<int>("@affected_rows")} 行数据");
}
使用Entity Framework Core
// 安装EF Core和相关包
// Install-Package Microsoft.EntityFrameworkCore
// Install-Package Pomelo.EntityFrameworkCore.MySql
public class AppDbContext : DbContext
{
public DbSet<Employee> Employees { get; set; }
public DbSet<Department> Departments { get; set; }
protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder)
{
optionsBuilder.UseMySql(connectionString,
ServerVersion.AutoDetect(connectionString),
options =>
{
options.EnableRetryOnFailure(
maxRetryCount: 5,
maxRetryDelay: TimeSpan.FromSeconds(30),
errorNumbersToAdd: null);
});
}
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// 配置实体
modelBuilder.Entity<Employee>(entity =>
{
entity.ToTable("employees");
entity.HasKey(e => e.Id);
entity.Property(e => e.Name).IsRequired().HasMaxLength(100);
entity.Property(e => e.Email).HasMaxLength(100);
entity.Property(e => e.Salary).HasColumnType("DECIMAL(15,2)");
entity.HasIndex(e => e.DepartmentId).HasDatabaseName("idx_department");
entity.HasOne<Department>()
.WithMany()
.HasForeignKey(e => e.DepartmentId);
});
modelBuilder.Entity<Department>(entity =>
{
entity.ToTable("departments");
entity.HasKey(d => d.Id);
entity.Property(d => d.Name).IsRequired().HasMaxLength(50);
});
}
}
// 使用EF Core进行CRUD操作
using (var context = new AppDbContext())
{
// 添加数据
var newDept = new Department { Name = "研发部" };
context.Departments.Add(newDept);
var newEmp = new Employee
{
Name = "赵六",
Email = "zhaoliu@example.com",
Salary = 18000,
HireDate = DateTime.Now,
Department = newDept
};
context.Employees.Add(newEmp);
context.SaveChanges();
// 查询数据
var highSalaryEmps = context.Employees
.Where(e => e.Salary > 15000)
.Include(e => e.Department)
.ToList();
foreach (var emp in highSalaryEmps)
{
Console.WriteLine($"{emp.Name} 在 {emp.Department?.Name} 部门,薪资 {emp.Salary:C}");
}
// 更新数据
var empToUpdate = context.Employees.Find(1001L);
if (empToUpdate != null)
{
empToUpdate.Salary *= 1.1m;
context.SaveChanges();
}
// 删除数据
var empToDelete = context.Employees.Find(1002L);
if (empToDelete != null)
{
context.Employees.Remove(empToDelete);
context.SaveChanges();
}
}
实战案例
电商系统数据访问层实现
public interface IProductRepository
{
Task<Product> GetByIdAsync(long id);
Task<IEnumerable<Product>> SearchAsync(string keyword, int page, int pageSize);
Task<long> AddAsync(Product product);
Task<bool> UpdateAsync(Product product);
Task<bool> DeleteAsync(long id);
Task<bool> UpdateStockAsync(long productId, int quantityChange);
}
public class ProductRepository : IProductRepository
{
private readonly string _connectionString;
public ProductRepository(string connectionString)
{
_connectionString = connectionString;
}
public async Task<Product> GetByIdAsync(long id)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
var sql = @"
SELECT p.*, c.name AS category_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
WHERE p.id = @id";
var product = await connection.QueryFirstOrDefaultAsync<Product>(sql, new { id });
return product;
}
}
public async Task<IEnumerable<Product>> SearchAsync(string keyword, int page, int pageSize)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
var offset = (page - 1) * pageSize;
var sql = @"
SELECT p.*, c.name AS category_name
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
WHERE p.name LIKE @keyword OR p.description LIKE @keyword
ORDER BY p.id
LIMIT @pageSize OFFSET @offset";
var products = await connection.QueryAsync<Product>(
sql,
new {
keyword = $"%{keyword}%",
pageSize,
offset
});
return products;
}
}
public async Task<long> AddAsync(Product product)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
var sql = @"
INSERT INTO products
(name, description, price, stock, category_id, created_at, updated_at)
VALUES
(@Name, @Description, @Price, @Stock, @CategoryId, NOW(), NOW());
SELECT LAST_INSERT_ID();";
var id = await connection.ExecuteScalarAsync<long>(sql, product);
return id;
}
}
public async Task<bool> UpdateAsync(Product product)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
var sql = @"
UPDATE products SET
name = @Name,
description = @Description,
price = @Price,
stock = @Stock,
category_id = @CategoryId,
updated_at = NOW()
WHERE id = @Id";
var affected = await connection.ExecuteAsync(sql, product);
return affected > 0;
}
}
public async Task<bool> DeleteAsync(long id)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
var sql = "DELETE FROM products WHERE id = @id";
var affected = await connection.ExecuteAsync(sql, new { id });
return affected > 0;
}
}
public async Task<bool> UpdateStockAsync(long productId, int quantityChange)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
// 使用事务确保数据一致性
using (var transaction = await connection.BeginTransactionAsync())
{
try
{
// 检查库存是否足够
var currentStock = await connection.ExecuteScalarAsync<int>(
"SELECT stock FROM products WHERE id = @productId FOR UPDATE",
new { productId }, transaction);
if (currentStock + quantityChange < 0)
{
throw new InvalidOperationException("库存不足");
}
// 更新库存
var sql = @"
UPDATE products
SET stock = stock + @quantityChange,
updated_at = NOW()
WHERE id = @productId";
var affected = await connection.ExecuteAsync(
sql,
new { productId, quantityChange },
transaction);
await transaction.CommitAsync();
return affected > 0;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}
}
}
报表生成服务
public class ReportService
{
private readonly string _connectionString;
public ReportService(string connectionString)
{
_connectionString = connectionString;
}
public async Task<SalesReport> GenerateSalesReport(DateTime startDate, DateTime endDate)
{
using (var connection = new MySqlConnection(_connectionString))
{
await connection.OpenAsync();
var report = new SalesReport
{
StartDate = startDate,
EndDate = endDate,
GenerationDate = DateTime.Now
};
// 获取总销售额
var totalSalesSql = @"
SELECT IFNULL(SUM(amount), 0)
FROM orders
WHERE order_date BETWEEN @startDate AND @endDate
AND status = 'completed'";
report.TotalSales = await connection.ExecuteScalarAsync<decimal>(
totalSalesSql, new { startDate, endDate });
// 按产品类别统计
var byCategorySql = @"
SELECT
c.name AS category,
COUNT(o.id) AS order_count,
SUM(o.amount) AS total_amount
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
JOIN products p ON oi.product_id = p.id
JOIN categories c ON p.category_id = c.id
WHERE o.order_date BETWEEN @startDate AND @endDate
AND o.status = 'completed'
GROUP BY c.id, c.name
ORDER BY total_amount DESC";
report.SalesByCategory = (await connection.QueryAsync<CategorySales>(
byCategorySql, new { startDate, endDate })).ToList();
// 按地区统计
var byRegionSql = @"
SELECT
r.name AS region,
COUNT(o.id) AS order_count,
SUM(o.amount) AS total_amount
FROM orders o
JOIN customers c ON o.customer_id = c.id
JOIN regions r ON c.region_id = r.id
WHERE o.order_date BETWEEN @startDate AND @endDate
AND o.status = 'completed'
GROUP BY r.id, r.name
ORDER BY total_amount DESC";
report.SalesByRegion = (await connection.QueryAsync<RegionSales>(
byRegionSql, new { startDate, endDate })).ToList();
// 销售趋势(按天)
var trendSql = @"
SELECT
DATE(order_date) AS day,
COUNT(id) AS order_count,
SUM(amount) AS daily_sales
FROM orders
WHERE order_date BETWEEN @startDate AND @endDate
AND status = 'completed'
GROUP BY DATE(order_date)
ORDER BY day";
report.DailyTrends = (await connection.QueryAsync<DailySalesTrend>(
trendSql, new { startDate, endDate })).ToList();
return report;
}
}
public async Task<byte[]> ExportSalesReportToExcel(DateTime startDate, DateTime endDate)
{
var report = await GenerateSalesReport(startDate, endDate);
using (var package = new ExcelPackage())
{
var worksheet = package.Workbook.Worksheets.Add("销售报表");
// 添加报表标题
worksheet.Cells["A1"].Value = "销售报表";
worksheet.Cells["A1:D1"].Merge = true;
worksheet.Cells["A1"].Style.Font.Bold = true;
worksheet.Cells["A1"].Style.Font.Size = 16;
// 添加日期范围
worksheet.Cells["A2"].Value = $"日期范围: {startDate:yyyy-MM-dd} 至 {endDate:yyyy-MM-dd}";
worksheet.Cells["A2:D2"].Merge = true;
// 添加生成日期
worksheet.Cells["A3"].Value = $"生成时间: {report.GenerationDate:yyyy-MM-dd HH:mm:ss}";
worksheet.Cells["A3:D3"].Merge = true;
// 添加总销售额
worksheet.Cells["A5"].Value = "总销售额:";
worksheet.Cells["B5"].Value = report.TotalSales;
worksheet.Cells["B5"].Style.Numberformat.Format = "¥#,##0.00";
// 按类别添加销售数据
worksheet.Cells["A7"].Value = "按产品类别统计";
worksheet.Cells["A7:C7"].Merge = true;
worksheet.Cells["A7"].Style.Font.Bold = true;
int row = 8;
worksheet.Cells[row, 1].Value = "类别";
worksheet.Cells[row, 2].Value = "订单数";
worksheet.Cells[row, 3].Value = "销售额";
worksheet.Cells[row, 1, row, 3].Style.Font.Bold = true;
foreach (var item in report.SalesByCategory)
{
row++;
worksheet.Cells[row, 1].Value = item.Category;
worksheet.Cells[row, 2].Value = item.OrderCount;
worksheet.Cells[row, 3].Value = item.TotalAmount;
worksheet.Cells[row, 3].Style.Numberformat.Format = "¥#,##0.00";
}
// 按地区添加销售数据
row += 2;
worksheet.Cells[row, 1].Value = "按地区统计";
worksheet.Cells[row, 3].Merge = true;
worksheet.Cells[row, 1].Style.Font.Bold = true;
row++;
worksheet.Cells[row, 1].Value = "地区";
worksheet.Cells[row, 2].Value = "订单数";
worksheet.Cells[row, 3].Value = "销售额";
worksheet.Cells[row, 1, row, 3].Style.Font.Bold = true;
foreach (var item in report.SalesByRegion)
{
row++;
worksheet.Cells[row, 1].Value = item.Region;
worksheet.Cells[row, 2].Value = item.OrderCount;
worksheet.Cells[row, 3].Value = item.TotalAmount;
worksheet.Cells[row, 3].Style.Numberformat.Format = "¥#,##0.00";
}
// 自动调整列宽
worksheet.Cells[worksheet.Dimension.Address].AutoFitColumns();
return package.GetAsByteArray();
}
}
}
public class SalesReport
{
public DateTime StartDate { get; set; }
public DateTime EndDate { get; set; }
public DateTime GenerationDate { get; set; }
public decimal TotalSales { get; set; }
public List<CategorySales> SalesByCategory { get; set; }
public List<RegionSales> SalesByRegion { get; set; }
public List<DailySalesTrend> DailyTrends { get; set; }
}
public class CategorySales
{
public string Category { get; set; }
public int OrderCount { get; set; }
public decimal TotalAmount { get; set; }
}
public class RegionSales
{
public string Region { get; set; }
public int OrderCount { get; set; }
public decimal TotalAmount { get; set; }
}
public class DailySalesTrend
{
public DateTime Day { get; set; }
public int OrderCount { get; set; }
public decimal DailySales { get; set; }
}
总结
本文详细介绍了在.NET环境下开发OceanBase应用程序的各个方面,包括:
- 基础操作:连接管理、CRUD操作、事务处理
- 性能优化:批量操作、连接池、查询优化
- 高级特性:JSON处理、分布式事务、存储过程
- 安全实践:参数化查询、权限控制、数据加密
- ORM集成:Dapper和Entity Framework Core的使用
- 实战案例:电商系统数据访问层和报表服务的实现
OceanBase作为一款高性能分布式数据库,与.NET生态系统的结合可以构建出强大、可靠的企业级应用。通过合理利用OceanBase的特性和.NET的强大功能,开发者可以构建出高性能、高可用的分布式应用系统。
在实际开发中,建议根据具体业务场景选择合适的访问方式,对于性能要求高的场景可以使用原生ADO.NET或Dapper,对于需要快速开发的场景可以使用Entity Framework Core。同时,要特别注意分布式环境下的事务处理和性能优化,充分利用OceanBase的分布式特性。