需求:使用多台服务器发布一批已经建好的mxd工程切片服务。
解决思路:鉴于开发时间和成本考虑,将mxd文件放在共享文件夹中,每台服务器上部署一个发布服务切片的脚本,多台同时访问共享文件夹,当一台访问某个mxd文件时创建一个标识文件,表示正在发布服务或者已发布完成。发布服务超时删除标识文件,其他服务器可以接手。
参数比较多,我已经将参数和代码分离,分成了几个类。下面给出初步的脚本,有兴趣的同学可以用大模型将其优化或者分离。其实也可以写一个前后端的web程序,部署在多台服务器上,解决好任务分配和超时的问题,效果会更好。
# -*- coding: utf-8 -*-
import arcpy
import os
import sys
import time
import datetime
import traceback
import codecs
import socket
import shutil
arcpy.env.overwriteOutput = True
# ===================== 日志函数 =====================
def log(msg, run_log_file):
ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
line = u"[{0}] {1}".format(ts, msg)
try:
print line.encode(sys.stdout.encoding or 'gbk')
except:
pass
with codecs.open(run_log_file, "a", "utf-8") as f:
f.write(line + u"\r\n")
def log_stats(msg, stats_log_file):
ts = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with codecs.open(stats_log_file, "a", "utf-8") as f:
f.write(u"[{0}] {1}\r\n".format(ts, msg))
# ===================== ArcGIS Server =====================
def get_server_connection_path(ags_dir, ags_conn_name):
conn_path = os.path.join(ags_dir, ags_conn_name)
if not os.path.exists(conn_path):
raise IOError(u"找不到 ArcGIS Server 连接文件: {0}".format(conn_path))
return conn_path
# ===================== 标志文件操作 =====================
def mxd_done_path(mxd_file):
return mxd_file + ".done"
def mxd_lock_path(mxd_file):
return mxd_file + ".lock"
def acquire_mxd_task(mxd_file, server_name, lock_timeout, run_log_file):
lock_file = mxd_lock_path(mxd_file)
done_file = mxd_done_path(mxd_file)
if os.path.exists(done_file):
return False
if os.path.exists(lock_file):
last_mod = os.path.getmtime(lock_file)
if (time.time() - last_mod) > lock_timeout:
log(u"检测到挂起锁文件已超时,删除锁:{0}".format(lock_file), run_log_file)
os.remove(lock_file)
else:
return False
with open(lock_file, "w") as f:
f.write("SERVER={0}\nTIME={1}\n".format(server_name, datetime.datetime.now()))
return True
def release_mxd_task(mxd_file, server_name, success, run_log_file):
lock_file = mxd_lock_path(mxd_file)
done_file = mxd_done_path(mxd_file)
try:
if success:
if os.path.exists(lock_file):
os.remove(lock_file)
with open(done_file, "w") as f:
f.write("DONE={0}\nSERVER={1}\n".format(datetime.datetime.now(), server_name))
log(u"任务完成,done 文件创建成功: {0}".format(done_file), run_log_file)
else:
if os.path.exists(lock_file):
os.remove(lock_file)
log(u"任务失败,已删除锁文件,允许其他服务器接手: {0}".format(lock_file), run_log_file)
except Exception as e:
log(u"释放锁/创建 done 文件异常: {0}".format(e), run_log_file)
# ===================== 临时 MXD 目录 =====================
def create_temp_mxd(mxd_file, temp_dir_root, server_name):
temp_server_dir = os.path.join(temp_dir_root, server_name)
if not os.path.exists(temp_server_dir):
os.makedirs(temp_server_dir)
temp_mxd = os.path.join(temp_server_dir, os.path.basename(mxd_file))
shutil.copy2(mxd_file, temp_mxd)
return temp_mxd
# ===================== 强校验切片完成 =====================
def check_jobstatus_gdb(cache_path, run_log_file):
try:
jobstatus_gdb = os.path.join(cache_path, "Layers", "Status.gdb")
if not os.path.exists(jobstatus_gdb):
log(u"未找到 JobStatus.gdb: {0}".format(jobstatus_gdb), run_log_file)
return False
arcpy.env.workspace = jobstatus_gdb
fcs = arcpy.ListFeatureClasses("*JobStatus*")
if not fcs:
log(u"未找到 JobStatus 表", run_log_file)
return False
table = fcs[0]
with arcpy.da.SearchCursor(table, ["TotalTasks", "DoneTasks"]) as cursor:
for row in cursor:
total, done = row
if total != done:
return False
return True
except Exception as e:
log(u"check_jobstatus_gdb 异常: {0}".format(e), run_log_file)
return False
def wait_for_cache_completion(result, service_name, cache_root, service_dir, run_log_file):
cache_path = os.path.join(cache_root, service_dir + "_" + service_name)
try:
log(u"等待切片任务完成...", run_log_file)
while True:
if result.status == 4 and check_jobstatus_gdb(cache_path, run_log_file):
log(u"切片任务已完全完成 ✅", run_log_file)
break
else:
log(u"切片未完成,等待中...", run_log_file)
time.sleep(10)
except Exception as e:
log(u"切片任务监控异常: {0}".format(e), run_log_file)
# ===================== 发布与切片 =====================
def publish_and_cache(mxd_file, server_conn_path, cache_root, service_dir,
dpi, tile_size, scales, scales2, cache_tile_format,
tile_compression, thread_count, run_log_file):
service_name = os.path.splitext(os.path.basename(mxd_file))[0]
service_path = os.path.join(server_conn_path[:-4], service_dir, service_name + ".MapServer")
try:
log(u"开始发布 MXD: {0}".format(mxd_file), run_log_file)
temp_mxd = create_temp_mxd(mxd_file, temp_dir_root=os.path.dirname(os.path.dirname(mxd_file)), server_name=SERVER_NAME)
sddraft = temp_mxd.replace(".mxd", ".sddraft")
sd = temp_mxd.replace(".mxd", ".sd")
mxd_doc = arcpy.mapping.MapDocument(temp_mxd)
arcpy.mapping.CreateMapSDDraft(
mxd_doc, sddraft, service_name, "ARCGIS_SERVER",
server_conn_path, True, None, None, service_dir
)
del mxd_doc
analysis = arcpy.mapping.AnalyzeForSD(sddraft)
if analysis['errors'] != {}:
log(u"服务草稿存在错误: {0}".format(analysis['errors']), run_log_file)
raise RuntimeError("AnalyzeForSD errors")
arcpy.StageService_server(sddraft, sd)
arcpy.UploadServiceDefinition_server(
sd, server_conn_path, "", "", "EXISTING", service_dir
)
log(u"服务发布完成: {0}".format(service_name), run_log_file)
log(u"创建缓存 schema: {0}".format(service_name), run_log_file)
arcpy.CreateMapServerCache_server(
service_path, cache_root, "NEW", "CUSTOM",
18, dpi, tile_size,
"", "-180 90", scales, cache_tile_format, tile_compression, "COMPACT"
)
log(u"开始生成切片: {0}".format(service_name), run_log_file)
result = arcpy.ManageMapServerCacheTiles_server(
service_path, scales2, "RECREATE_ALL_TILES", thread_count, "", "", "WAIT"
)
wait_for_cache_completion(result, service_name, cache_root, service_dir, run_log_file)
os.remove(temp_mxd)
log(u"临时 MXD 删除: {0}".format(temp_mxd), run_log_file)
except Exception as e:
log(u"发布/切片出错: {0}\r\n{1}".format(e, traceback.format_exc()), run_log_file)
raise
# ===================== 主流程 =====================
def main():
# 配置参数(全部局部变量)
mxd_dir = r"D:\test\mxd"
temp_dir_root = r"D:\test\temp_publish"
ags_dir = os.path.join(os.environ['APPDATA'], r"ESRI\Desktop10.2\ArcCatalog")
ags_conn_name = "arcgis on localhost_6080 (admin).ags"
service_dir = "zw"
cache_dir = r"D:\arcgisserver\directories\arcgiscache"
cache_root = os.path.join(cache_dir, service_dir)
server_name = os.environ.get("COMPUTERNAME") or socket.gethostname()
log_dir = r"D:\test\log"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
run_log_file = os.path.join(log_dir,
"publish_log_{0}.log".format(datetime.datetime.now().strftime('%Y%m%d_%H%M%S')))
stats_log_file = os.path.join(log_dir,
"stats_log_{0}.log".format(datetime.datetime.now().strftime('%Y%m%d')))
scales = "591658710.9;295829355.45;147914677.725;73957338.8625;36978669.43125;18489334.715625;9244667.3578125;4622333.67890625;2311166.839453125;1155583.4197265625;577791.7098632812;288895.8549316406;144447.9274658203;72223.96373291015;36111.98186645508;18055.99093322754;9027.99546661377"
scales2 = "72223.96373291015;36111.98186645508;18055.99093322754;9027.99546661377"
cache_tile_format = "MIXED"
tile_compression = 75
tile_size = "256 x 256"
dpi = 96
thread_count = 24
lock_timeout = 24 * 60 * 60 # 24 小时
try:
conn_path = get_server_connection_path(ags_dir, ags_conn_name)
log(u"本机: {0}".format(server_name), run_log_file)
log(u"服务文件夹: {0}".format(service_dir), run_log_file)
mxd_list = [os.path.join(mxd_dir, f) for f in os.listdir(mxd_dir) if f.lower().endswith(".mxd")]
if not mxd_list:
log(u"未找到 MXD 文件", run_log_file)
sys.exit(0)
for mxd_file in mxd_list:
svc_name = os.path.splitext(os.path.basename(mxd_file))[0]
try:
if not acquire_mxd_task(mxd_file, server_name, lock_timeout, run_log_file):
continue
publish_and_cache(mxd_file, conn_path, cache_root, service_dir,
dpi, tile_size, scales, scales2, cache_tile_format,
tile_compression, thread_count, run_log_file)
release_mxd_task(mxd_file, server_name, success=True, run_log_file=run_log_file)
log_stats(u"完成: {0}".format(svc_name), stats_log_file)
except Exception as e:
release_mxd_task(mxd_file, server_name, success=False, run_log_file=run_log_file)
log_stats(u"失败: {0} 错误: {1}".format(svc_name, e), stats_log_file)
log(u"全部 MXD 发布与切片任务完成!", run_log_file)
except Exception as e:
log(u"主程序异常: {0}\r\n{1}".format(e, traceback.format_exc()), run_log_file)
if __name__ == "__main__":
main()