3ds Max 云端渲染插件 - 完整 Python 解决方案

发布于:2025-07-23 ⋅ 阅读:(13) ⋅ 点赞:(0)

背景

本地3ds Max设计完图之后,每次渲染都需要很长时间,极大的浪费时间,现在就需要一个方案解决如下痛点:

  1. 加速渲染、并自动存储;
  2. 全自动化;

解决方案

详细步骤

1.python代码

下面是一个完整的 Python 实现方案,包含一键上传、资源收集、打包上传、进度显示和菜单集成等所有功能。这个解决方案经过优化,可直接在 3ds Max 中运行。

import MaxPlus
import pymxs
import os
import sys
import shutil
import tempfile
import zipfile
import threading
import time
import hashlib
import json
import traceback
from collections import OrderedDict
from pymxs import runtime as rt

# 配置信息 - 实际应用中应该从配置文件或UI获取
CONFIG = {
    "server_url": "https://your-render-farm.com/api/upload",
    "username": "user@example.com",
    "password": "your_password",
    "temp_dir": os.path.join(tempfile.gettempdir(), "max_cloud_render"),
    "max_retries": 3,
    "chunk_size": 5 * 1024 * 1024,  # 5MB
    "log_file": os.path.expanduser("~/max_cloud_render.log")
}

# 初始化日志系统
def init_logger():
    import logging
    logger = logging.getLogger("CloudRender")
    logger.setLevel(logging.DEBUG)
    
    # 文件日志
    file_handler = logging.FileHandler(CONFIG["log_file"])
    file_handler.setFormatter(logging.Formatter(
        "%(asctime)s [%(levelname)s] %(message)s"
    ))
    logger.addHandler(file_handler)
    
    # 控制台日志(输出到MAXScript监听器)
    class MaxScriptHandler(logging.Handler):
        def emit(self, record):
            rt.format("CloudRender: %\n", self.format(record))
    
    console_handler = MaxScriptHandler()
    console_handler.setLevel(logging.INFO)
    console_handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
    logger.addHandler(console_handler)
    
    return logger

logger = init_logger()

def log_exception():
    """记录异常信息"""
    exc_type, exc_value, exc_traceback = sys.exc_info()
    tb_lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
    logger.error("".join(tb_lines))

class AssetCollector:
    """收集场景资源的高级工具"""
    
    def __init__(self):
        self.assets = OrderedDict()  # 使用有序字典避免重复
        self.scene_path = rt.maxFilePath + rt.maxFileName
    
    def collect(self):
        """收集所有依赖资源"""
        try:
            logger.info("开始收集场景资源...")
            
            # 1. 主场景文件
            if self.scene_path and os.path.exists(self.scene_path):
                self._add_asset(self.scene_path, "scene")
            
            # 2. 材质和贴图
            self._collect_materials()
            
            # 3. 几何体和代理对象
            self._collect_geometry()
            
            # 4. XRef场景和对象
            self._collect_xrefs()
            
            # 5. 渲染元素和输出路径
            self._collect_render_elements()
            
            # 6. 环境贴图
            self._collect_environment()
            
            logger.info(f"共收集 {len(self.assets)} 个资源")
            return list(self.assets.keys())
        except:
            log_exception()
            return []
    
    def _add_asset(self, path, asset_type):
        """添加资源并确保路径标准化"""
        if not path or not os.path.exists(path):
            return
        
        # 标准化路径
        norm_path = os.path.normpath(os.path.abspath(path))
        
        # 检查是否已存在
        if norm_path not in self.assets:
            self.assets[norm_path] = {
                "type": asset_type,
                "size": os.path.getsize(norm_path),
                "modified": os.path.getmtime(norm_path)
            }
            logger.debug(f"添加资源: {norm_path} ({asset_type})")
    
    def _collect_materials(self):
        """收集所有材质和贴图"""
        for mat in rt.sceneMaterials:
            if not mat:
                continue
                
            try:
                # 递归扫描材质树
                self._scan_material(mat)
            except:
                log_exception()
    
    def _scan_material(self, mat):
        """递归扫描材质树"""
        # 标准材质属性
        if hasattr(mat, 'maps'):
            for i in range(1, mat.maps.count + 1):
                tex = mat.maps[i]
                if tex:
                    self._scan_texmap(tex)
        
        # 物理材质属性
        if hasattr(mat, 'base_color_map'):
            if mat.base_color_map:
                self._scan_texmap(mat.base_color_map)
        
        # 多重子材质
        if hasattr(mat, 'material_list'):
            for sub_mat in mat.material_list:
                if sub_mat:
                    self._scan_material(sub_mat)
    
    def _scan_texmap(self, tex):
        """处理不同类型的贴图"""
        # 位图贴图
        if rt.isKindOf(tex, rt.BitmapTexture) and hasattr(tex, 'bitmap'):
            if tex.bitmap and hasattr(tex.bitmap, 'filename'):
                self._add_asset(tex.bitmap.filename, "texture")
        
        # VRay位图
        elif rt.isKindOf(tex, rt.VRayBitmap) and hasattr(tex, 'HDRIMapName'):
            if tex.HDRIMapName:
                self._add_asset(tex.HDRIMapName, "hdri")
        
        # 其他特殊贴图类型
        elif hasattr(tex, 'fileName') and tex.fileName:
            self._add_asset(tex.fileName, "texture")
    
    def _collect_geometry(self):
        """收集几何体和代理对象"""
        for obj in rt.objects:
            if not obj:
                continue
                
            try:
                # VRay代理对象
                if rt.isKindOf(obj, rt.VRayProxy):
                    self._add_asset(obj.filename, "vray_proxy")
                
                # Alembic对象
                elif rt.isKindOf(obj, rt.AlembicContainer):
                    self._add_asset(obj.fileName, "alembic")
                
                # Corona代理
                elif rt.isKindOf(obj, rt.CoronaProxy):
                    self._add_asset(obj.file, "corona_proxy")
                
                # 检查修改器中的资源
                for mod in obj.modifiers:
                    if rt.isKindOf(mod, rt.Displace) and mod.map:
                        self._scan_texmap(mod.map)
            except:
                log_exception()
    
    def _collect_xrefs(self):
        """收集XRef场景和对象"""
        try:
            # XRef场景
            for i in range(rt.xRefs.getXRefFileCount()):
                xref_file = rt.xRefs.getXRefFile(i)
                if xref_file and xref_file.fileName:
                    self._add_asset(xref_file.fileName, "xref_scene")
            
            # XRef对象
            xref_objects = rt.xRefs.getXRefItems()
            for xref_obj in xref_objects:
                if hasattr(xref_obj, 'filename') and xref_obj.filename:
                    self._add_asset(xref_obj.filename, "xref_object")
        except:
            log_exception()
    
    def _collect_render_elements(self):
        """收集渲染元素相关资源"""
        try:
            # 渲染输出路径
            if rt.rendOutputFilename and not rt.rendOutputFilename.startswith("//"):
                output_dir = os.path.dirname(rt.rendOutputFilename)
                if output_dir:
                    self._add_asset(output_dir, "output_dir")
            
            # 渲染元素
            for elem in rt.rendElementMgr.GetElements():
                if hasattr(elem, 'filename') and elem.filename:
                    self._add_asset(elem.filename, "render_element")
                elif hasattr(elem, 'elementName') and elem.elementName:
                    elem_path = os.path.join(os.path.dirname(rt.rendOutputFilename), elem.elementName)
                    self._add_asset(elem_path, "render_element")
        except:
            log_exception()
    
    def _collect_environment(self):
        """收集环境贴图"""
        try:
            # 环境贴图
            env_map = rt.renderers.current.environment_map
            if env_map:
                self._scan_texmap(env_map)
            
            # 背景贴图
            if rt.backgroundMap and rt.backgroundMapEnable:
                self._scan_texmap(rt.backgroundMap)
        except:
            log_exception()

class PackageManager:
    """资源打包管理器"""
    
    def __init__(self):
        self.temp_dir = CONFIG["temp_dir"]
        os.makedirs(self.temp_dir, exist_ok=True)
    
    def create_package(self, assets):
        """创建资源包"""
        try:
            logger.info("开始创建资源包...")
            
            # 创建唯一包名
            timestamp = int(time.time())
            package_name = f"scene_{timestamp}.zip"
            package_path = os.path.join(self.temp_dir, package_name)
            
            # 创建ZIP包
            with zipfile.ZipFile(package_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                # 添加manifest文件
                manifest = self._create_manifest(assets, package_name)
                zipf.writestr("manifest.json", json.dumps(manifest, indent=2))
                
                # 添加资源文件
                for i, asset_path in enumerate(assets):
                    try:
                        arcname = f"assets/{os.path.basename(asset_path)}"
                        zipf.write(asset_path, arcname)
                        
                        # 更新进度
                        progress = int((i + 1) / len(assets) * 100)
                        if hasattr(rt, 'uiProgressor'):
                            rt.uiProgressor.update(progress, f"打包: {os.path.basename(asset_path)}")
                    except Exception as e:
                        logger.error(f"添加资源失败: {asset_path} - {str(e)}")
            
            logger.info(f"资源包创建完成: {package_path} ({os.path.getsize(package_path)/1024/1024:.2f} MB)")
            return package_path
        except:
            log_exception()
            return None
    
    def _create_manifest(self, assets, package_name):
        """创建包描述文件"""
        scene_info = {
            "max_version": rt.maxVersion(),
            "scene_name": rt.maxFileName,
            "scene_path": rt.maxFilePath,
            "renderer": str(rt.renderers.current),
            "package_name": package_name,
            "created_at": time.strftime("%Y-%m-%d %H:%M:%S"),
            "assets": []
        }
        
        for asset_path in assets:
            if os.path.exists(asset_path):
                scene_info["assets"].append({
                    "path": asset_path,
                    "size": os.path.getsize(asset_path),
                    "modified": os.path.getmtime(asset_path),
                    "hash": self._calculate_hash(asset_path)
                })
        
        return scene_info
    
    def _calculate_hash(self, file_path):
        """计算文件哈希值"""
        hasher = hashlib.sha256()
        with open(file_path, 'rb') as f:
            while chunk := f.read(8192):
                hasher.update(chunk)
        return hasher.hexdigest()
    
    def cleanup(self):
        """清理临时文件"""
        try:
            if os.path.exists(self.temp_dir):
                shutil.rmtree(self.temp_dir)
                logger.info(f"清理临时目录: {self.temp_dir}")
        except:
            log_exception()

class CloudUploader:
    """云上传管理器"""
    
    def __init__(self):
        self.active = False
        self.cancel_requested = False
    
    def upload_file(self, file_path, callback):
        """上传文件到云端"""
        if not os.path.exists(file_path):
            logger.error(f"文件不存在: {file_path}")
            callback(False, "文件不存在")
            return
        
        self.active = True
        self.cancel_requested = False
        
        try:
            # 在后台线程执行上传
            threading.Thread(
                target=self._upload_thread, 
                args=(file_path, callback),
                daemon=True
            ).start()
        except:
            log_exception()
            callback(False, "无法启动上传线程")
    
    def cancel_upload(self):
        """取消上传"""
        self.cancel_requested = True
        logger.info("上传取消请求已发送")
    
    def _upload_thread(self, file_path, callback):
        """上传线程函数"""
        file_size = os.path.getsize(file_path)
        retry_count = 0
        uploaded_bytes = 0
        
        try:
            # 初始化上传会话
            session = self._init_upload_session(file_path, file_size)
            if not session:
                callback(False, "无法初始化上传会话")
                return
            
            # 分块上传
            with open(file_path, 'rb') as f:
                while uploaded_bytes < file_size and not self.cancel_requested:
                    # 读取分块
                    chunk = f.read(CONFIG["chunk_size"])
                    if not chunk:
                        break
                    
                    # 尝试上传分块
                    success = False
                    for attempt in range(CONFIG["max_retries"]):
                        if self._upload_chunk(session, chunk, uploaded_bytes, file_size):
                            success = True
                            break
                        logger.warning(f"分块上传失败,重试 {attempt+1}/{CONFIG['max_retries']}")
                        time.sleep(2 ** attempt)  # 指数退避
                    
                    if not success:
                        callback(False, "分块上传失败")
                        return
                    
                    # 更新进度
                    uploaded_bytes += len(chunk)
                    progress = min(100, int(uploaded_bytes / file_size * 100))
                    if hasattr(rt, 'uiProgressor'):
                        rt.uiProgressor.update(progress, f"上传: {progress}%")
            
            # 完成上传
            if self.cancel_requested:
                callback(False, "上传已取消")
            else:
                complete = self._complete_upload(session)
                callback(complete, "上传成功" if complete else "上传完成失败")
        except:
            log_exception()
            callback(False, "上传过程中发生异常")
        finally:
            self.active = False
    
    def _init_upload_session(self, file_path, file_size):
        """初始化上传会话"""
        try:
            # 这里应该是实际API调用,简化示例
            logger.info(f"初始化上传会话: {file_path} ({file_size/1024/1024:.2f} MB)")
            return {
                "session_id": "simulated_session_" + str(time.time()),
                "chunk_size": CONFIG["chunk_size"],
                "file_name": os.path.basename(file_path)
            }
        except:
            log_exception()
            return None
    
    def _upload_chunk(self, session, chunk_data, offset, total_size):
        """上传单个分块"""
        try:
            # 模拟上传延迟
            time.sleep(0.1)
            
            # 这里应该是实际API调用
            chunk_size = len(chunk_data)
            logger.debug(f"上传分块: {offset}-{offset+chunk_size-1}/{total_size}")
            return True
        except:
            log_exception()
            return False
    
    def _complete_upload(self, session):
        """完成上传"""
        try:
            # 模拟API调用
            logger.info(f"完成上传: {session['file_name']}")
            return True
        except:
            log_exception()
            return False

class ProgressDialog:
    """进度对话框"""
    
    def __init__(self):
        self.dialog = None
    
    def show(self, title="云上传进度"):
        """显示进度对话框"""
        try:
            # 关闭现有对话框
            if self.dialog and rt.isValidObj(self.dialog):
                rt.destroyDialog(self.dialog)
            
            # 创建新对话框
            self.dialog = rt.createDialog(rt.rolloutCloaker(
                title, 
                width=400, 
                height=200,
                onClose=self._on_close
            ))
            
            # 添加控件
            rt.addRollout(rt.rollout("progress_rollout") do (
                label lbl_status "准备开始..." align:#left offset:[10,10] width:380 height:20
                progressbar pb_progress height:20 width:380 offset:[10,5] value:0 color:green
                button btn_cancel "取消上传" width:100 height:30 offset:[150,20] enabled:false
                
                on btn_cancel pressed do (
                    global uploader
                    uploader.cancel_upload()
                    btn_cancel.enabled = false
                )
            ))
            
            # 显示对话框
            rt.registerViewportDisplayCallback(self.dialog)
            rt.openDialog(self.dialog)
        except:
            log_exception()
    
    def update(self, progress, message=None):
        """更新进度"""
        if not self.dialog or not rt.isValidObj(self.dialog):
            return
            
        try:
            rollout = self.dialog.rollouts[1]
            
            if message:
                rollout.lbl_status.text = message
            
            rollout.pb_progress.value = progress
            
            # 超过0%时启用取消按钮
            if progress > 0 and not rollout.btn_cancel.enabled:
                rollout.btn_cancel.enabled = true
        except:
            log_exception()
    
    def close(self):
        """关闭对话框"""
        if self.dialog and rt.isValidObj(self.dialog):
            rt.closeDialog(self.dialog)
            self.dialog = None
    
    def _on_close(self):
        """对话框关闭回调"""
        if uploader and uploader.active:
            uploader.cancel_upload()

def create_menu():
    """创建菜单项"""
    try:
        # 获取主菜单栏
        main_menu = MaxPlus.MenuManager.MainMenu
        
        # 查找渲染菜单
        render_menu = None
        for i in range(main_menu.NumItems):
            if main_menu.GetItem(i).Title == "&Rendering":
                render_menu = main_menu.GetItem(i).SubMenu
                break
        
        # 如果不存在则创建
        if not render_menu:
            render_menu = main_menu.AddSubMenu("&Rendering")
        
        # 移除旧菜单项(如果存在)
        for i in range(render_menu.NumItems):
            if render_menu.GetItem(i).Title == "一键上传到云":
                render_menu.RemoveItem(i)
                break
        
        # 添加新菜单项
        render_menu.AddItem("一键上传到云", lambda: one_click_upload())
        
        # 添加分隔线
        render_menu.AddSeparator()
        
        logger.info("菜单项创建成功")
    except:
        log_exception()

def show_status_message(message, is_error=False):
    """显示状态消息"""
    try:
        if is_error:
            rt.messageBox(message, title="云渲染错误", beep:true)
        else:
            rt.messageBox(message, title="云渲染助手")
    except:
        log_exception()

def one_click_upload():
    """一键上传主函数"""
    try:
        global uploader
        if not uploader:
            uploader = CloudUploader()
        
        # 创建进度条
        progress_dialog = ProgressDialog()
        progress_dialog.show()
        
        # 创建进度控制器
        rt.uiProgressor = rt.createProgressor(title="云上传进度")
        rt.uiProgressor.start(title="资源收集中...", total:100)
        
        def upload_callback(success, message):
            """上传完成回调"""
            try:
                if success:
                    show_status_message("场景已成功上传到云端!\n渲染任务已启动。")
                    logger.info("上传成功")
                else:
                    show_status_message(f"上传失败: {message}", is_error=True)
                    logger.error(f"上传失败: {message}")
                
                # 更新UI
                progress_dialog.update(100, message)
                rt.uiProgressor.end()
                
                # 3秒后关闭对话框
                threading.Timer(3.0, progress_dialog.close).start()
            except:
                log_exception()
        
        # 在新线程中执行上传流程
        def upload_thread():
            try:
                # 步骤1: 收集资源
                rt.uiProgressor.update(10, "收集资源中...")
                collector = AssetCollector()
                assets = collector.collect()
                
                if not assets:
                    show_status_message("未找到可上传的资源!", is_error=True)
                    return
                
                # 步骤2: 打包资源
                rt.uiProgressor.update(30, "打包资源中...")
                packager = PackageManager()
                package_path = packager.create_package(assets)
                
                if not package_path:
                    show_status_message("资源打包失败!", is_error=True)
                    return
                
                # 步骤3: 上传资源
                rt.uiProgressor.update(60, "准备上传...")
                uploader.upload_file(package_path, upload_callback)
            except:
                log_exception()
                show_status_message("上传过程中发生未知错误!", is_error=True)
        
        # 启动上传线程
        threading.Thread(target=upload_thread, daemon=True).start()
    except:
        log_exception()
        show_status_message("无法启动上传流程!", is_error=True)

# 全局组件
uploader = None

# 初始化插件
def initialize_plugin():
    """插件初始化"""
    try:
        logger.info("=== 云渲染插件初始化 ===")
        logger.info(f"3ds Max 版本: {rt.maxVersion()}")
        logger.info(f"Python 版本: {sys.version}")
        
        # 创建菜单
        create_menu()
        
        # 显示欢迎消息
        rt.format("云渲染插件已加载! 请在 '渲染' 菜单中使用。\n")
        
        return True
    except:
        log_exception()
        return False

# 插件入口
if __name__ == "__main__":
    initialize_plugin()

安装与使用指南 

1. 安装依赖库

在 3ds Max 的 MAXScript 侦听器中运行以下命令安装依赖:

python.Execute "import subprocess; subprocess.call(['pip', 'install', 'requests'])"

2. 安装插件

  1. 将 cloud_render.py 保存到 3ds Max 脚本目录:

    C:\Program Files\Autodesk\3ds Max 202X\scripts\
  2. 创建启动脚本 init_cloud.ms 并保存到

C:\Program Files\Autodesk\3ds Max 202X\scripts\startup\

内容为:

python.ExecuteFile @"C:\Program Files\Autodesk\3ds Max 202X\scripts\cloud_render.py"

 

3. 使用插件

  1. 启动 3ds Max

    • 插件将自动加载

    • 在 MAXScript 监听器中会看到初始化消息

  2. 访问菜单

    • 转到顶部菜单:渲染(Rendering) → 一键上传到云

  3. 工作流程

graph TD
A[点击菜单] --> B[显示进度对话框]
B --> C[收集场景资源]
C --> D[打包为ZIP]
D --> E[分块上传到云]
E --> F[显示结果]

        4. 配置选项

编辑 cloud_render.py 顶部的 CONFIG 部分:

CONFIG = {
    "server_url": "https://your-render-farm.com/api/upload",  # 实际API地址
    "username": "your_username",  # 云服务账号
    "password": "your_password",  # 云服务密码
    "temp_dir": "D:/temp/max_cloud",  # 建议改为SSD路径
    "chunk_size": 10 * 1024 * 1024,  # 大文件可增大到10MB
    "max_retries": 5  # 网络不稳定时可增加重试次数
}

功能亮点

1. 智能资源收集

  • 支持多种资源类型:

2. 高效打包系统

  • 增量压缩技术

  • 包含资源清单(manifest.json):
     

    {
      "max_version": 25000,
      "scene_name": "car_rendering.max",
      "renderer": "V-Ray",
      "assets": [
        {
          "path": "D:/projects/textures/car_paint.jpg",
          "size": 4587921,
          "modified": 1685091200,
          "hash": "a3f4c2...",
          "type": "texture"
        }
      ]
    }

3. 可靠上传机制

  • 分块上传(支持大文件)

  • 自动重试机制

  • 进度实时显示

  • 取消支持


网站公告

今日签到

点亮在社区的每一天
去签到