程序结构
├── main.cpp
├── config.json
├── hive_export/
├── parquet_data/
├── sql_scripts/
└── logs/
核心代码实现 (main.cpp)
#include <iostream>
#include <fstream>
#include <vector>
#include <thread>
#include <mutex>
#include <queue>
#include <cstdlib>
#include <ctime>
#include <filesystem>
#include <nlohmann/json.hpp>
#include <unistd.h>
namespace fs = std::filesystem;
using json = nlohmann::json;
using namespace std;
// 全局锁用于日志和队列同步
mutex log_mutex, queue_mutex;
// 配置文件结构
struct Config {
string hive_jdbc;
string hql_output_dir;
string parquet_output_dir;
string sql_script_dir;
string snowflake_cfg;
int export_threads;
int import_threads;
};
// 日志记录函数
void log_message(const string& message, const string& log_path) {
lock_guard<mutex> guard(log_mutex);
ofstream log_file(log_path, ios::app);
if (log_file) {
time_t now = time(nullptr);
log_file << "[" << put_time(localtime(&now), "%F %T") << "] "
<< message << endl;
}
}
// 解析配置文件
Config load_config(const string& config_path) {
ifstream config_file(config_path);
if (!config_file) throw runtime_error("Config file not found");
json j;
config_file >> j;
return {
j["hive_jdbc"],
j["directories"]["hql_output"],
j["directories"]["parquet_output"],
j["directories"]["sql_scripts"],
j["snowflake"]["config_path"],
j["threads"]["export"],
j["threads"]["import"]
};
}
// 导出Hive建表语句
void export_hql(const Config& cfg, const string& log_path) {
string cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 "
"-e 'SHOW DATABASES;' | tail -n +2 > databases.txt";
system(cmd.c_str());
ifstream db_file("databases.txt");
string db;
while (getline(db_file, db)) {
cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 "
"-e 'USE " + db + "; SHOW TABLES;' | tail -n +2 > " + db + "_tables.txt";
system(cmd.c_str());
ifstream table_file(db + "_tables.txt");
string table;
while (getline(table_file, table)) {
fs::path dir = fs::path(cfg.hql_output_dir) / db;
fs::create_directories(dir);
string hql_path = (dir / (table + ".hql")).string();
cmd = "beeline -u '" + cfg.hive_jdbc + "' --silent=true --outputformat=csv2 "
"-e 'USE " + db + "; SHOW CREATE TABLE " + table + ";' | "
"awk 'NR>2' | head -n -1 > " + hql_path;
if (system(cmd.c_str()) == 0) {
log_message("SUCCESS: Exported HQL for " + db + "." + table, log_path);
} else {
log_message("ERROR: Failed to export HQL for " + db + "." + table, log_path);
}
}
}
}
// 导出Parquet数据(线程任务)
void export_worker(queue<string> tasks, const Config& cfg, const string& log_path) {
while (true) {
string task;
{
lock_guard<mutex> guard(queue_mutex);
if (tasks.empty()) return;
task = move(tasks.front());
tasks.pop();
}
size_t pos = task.find('.');
string db = task.substr(0, pos);
string table = task.substr(pos + 1);
fs::path out_dir = fs::path(cfg.parquet_output_dir) / db / table;
fs::create_directories(out_dir);
string cmd = "hive -e \"SET hive.exec.compress.output=false; "
"INSERT OVERWRITE DIRECTORY '" + out_dir.string() + "' "
"STORED AS PARQUET SELECT * FROM " + task + ";\"";
if (system(cmd.c_str()) == 0) {
log_message("SUCCESS: Exported Parquet for " + task, log_path);
} else {
log_message("ERROR: Failed to export Parquet for " + task, log_path);
}
}
}
// 多线程导出Parquet
void export_parquet(const Config& cfg, const string& log_path) {
ifstream db_file("databases.txt");
queue<string> tasks;
string db;
while (getline(db_file, db)) {
ifstream table_file(db + "_tables.txt");
string table;
while (getline(table_file, table)) {
tasks.push(db + "." + table);
}
}
vector<thread> threads;
for (int i = 0; i < cfg.export_threads; ++i) {
threads.emplace_back(export_worker, tasks, ref(cfg), ref(log_path));
}
for (auto& t : threads) t.join();
}
// 执行SnowSQL脚本
void run_snowsql(const Config& cfg, const string& log_path) {
for (const auto& entry : fs::directory_iterator(cfg.sql_script_dir)) {
if (entry.path().extension() == ".sql") {
string cmd = "snowsql -c " + cfg.snowflake_cfg + " -f " + entry.path().string();
if (system(cmd.c_str()) == 0) {
log_message("SUCCESS: Executed SQL " + entry.path().filename().string(), log_path);
} else {
log_message("ERROR: Failed to execute SQL " + entry.path().filename().string(), log_path);
}
}
}
}
// 导入Parquet到Snowflake(线程任务)
void import_worker(queue<fs::path> tasks, const Config& cfg, const string& log_path) {
while (true) {
fs::path task;
{
lock_guard<mutex> guard(queue_mutex);
if (tasks.empty()) return;
task = move(tasks.front());
tasks.pop();
}
string db = task.parent_path().filename();
string table = task.stem();
string cmd = "snowsql -c " + cfg.snowflake_cfg + " -q \""
"COPY INTO " + db + "." + table + " "
"FROM @" + cfg.parquet_output_dir + "/" + db + "/" + table + " "
"FILE_FORMAT = (TYPE = PARQUET);\"";
if (system(cmd.c_str()) == 0) {
log_message("SUCCESS: Imported Parquet to " + db + "." + table, log_path);
} else {
log_message("ERROR: Failed to import Parquet to " + db + "." + table, log_path);
}
}
}
// 多线程导入Parquet
void import_parquet(const Config& cfg, const string& log_path) {
queue<fs::path> tasks;
for (const auto& db_entry : fs::directory_iterator(cfg.parquet_output_dir)) {
for (const auto& table_entry : fs::directory_iterator(db_entry.path())) {
tasks.push(table_entry.path());
}
}
vector<thread> threads;
for (int i = 0; i < cfg.import_threads; ++i) {
threads.emplace_back(import_worker, tasks, ref(cfg), ref(log_path));
}
for (auto& t : threads) t.join();
}
int main() {
try {
// 初始化配置和日志
Config cfg = load_config("config.json");
string log_path = "logs/transfer_" + to_string(time(nullptr)) + ".log";
fs::create_directories("logs");
// 执行全流程
export_hql(cfg, log_path);
export_parquet(cfg, log_path);
run_snowsql(cfg, log_path);
import_parquet(cfg, log_path);
log_message("ALL OPERATIONS COMPLETED", log_path);
} catch (const exception& e) {
cerr << "CRITICAL ERROR: " << e.what() << endl;
return 1;
}
return 0;
}
配置文件示例 (config.json)
{
"hive_jdbc": "jdbc:hive2://hive-server:10000",
"directories": {
"hql_output": "hive_export",
"parquet_output": "parquet_data",
"sql_scripts": "sql_scripts"
},
"snowflake": {
"config_path": "~/.snowsql/config"
},
"threads": {
"export": 8,
"import": 8
}
}
关键功能说明
HQL导出:
- 使用
beeline
连接Hive获取所有数据库和表 - 按
数据库/表名.hql
格式存储建表语句 - 自动跳过系统表(通过
tail
和awk
过滤)
- 使用
Parquet导出:
- 使用Hive的
INSERT OVERWRITE DIRECTORY
导出为Parquet格式 - 多线程处理不同表(线程数由配置控制)
- 输出路径:
parquet_data/数据库/表名/
- 使用Hive的
SnowSQL执行:
- 遍历指定目录的所有
.sql
文件 - 使用
snowsql -c
执行配置文件中的连接 - 支持认证文件自动加载(需预先配置)
- 遍历指定目录的所有
Parquet导入:
- 使用Snowflake的
COPY INTO
命令 - 多线程并发导入不同表
- 自动匹配目录结构与表名
- 使用Snowflake的
日志系统:
- 按天分割日志文件(文件名含时间戳)
- 记录操作类型、状态和时间
- 线程安全的日志写入
异常处理:
- 配置文件缺失检测
- 命令执行状态码检查
- 目录创建失败处理
- JSON解析异常捕获
编译与运行
- 安装依赖:
sudo apt-get install libboost-filesystem-dev nlohmann-json3-dev
- 编译程序:
g++ -std=c++17 -o hive2snowflake main.cpp -lboost_filesystem -lpthread
- 运行程序:
./hive2snowflake
注意事项
需要预先配置:
- Hive的beeline客户端
- SnowSQL及认证配置
- Hive表访问权限
- Snowflake表结构匹配
性能调整:
- 通过
config.json
调整线程数 - 大表建议单独处理
- 可添加重试机制应对网络波动
- 通过
安全增强建议:
- 配置文件加密(如使用jq解密)
- 敏感信息使用环境变量
- 添加操作审计日志