使用DeepSeek+本地知识库,尝试从0到1搭建高度定制化工作流(自动化篇)

发布于:2025-02-21 ⋅ 阅读:(25) ⋅ 点赞:(0)
7.5. 配图生成
  • 目的:由于小红书发布文章要求图文格式,因此在生成文案的基础上,我们还需要生成图文搭配文案进行发布。

  • 原实现思路:

    • 起初我打算使用deepseek的文生图模型Janus进行本地部署生成,参考博客:Deepseek发布的Janus-Pro-1B初体验但后来尝试使用后发现Janus现阶段对于这类特定任务的生成图还不太能够胜任。以下是我尝试使用文案让Janus生成的图片:
    • 文案图
  • 现实现思路:

    • 当下普遍的方案是使用文案生成一段相关的html代码,再使用python中的自动化库来进行相应部分的截图,最后将截图与文案进行组合,形成图文格式。
  • 代码实现:

    • html生成:

      '''
      Author: yeffky
      Date: 2025-02-14 08:43:28
      LastEditTime: 2025-02-15 19:28:28
      '''
      import requests
      import json
      import os
      from datetime import datetime 
      
      def build_prompt(drafts):
          prompt = "根据下面的小红书文案,帮我生成一个html页面,包含小红书的封面(需要一个卡片状的封面,上面只需文案内容即可,需要吸引眼球),以及下方几个要点内容,要点内容和封面我希望制作成卡片形式,并且每一部分的div请为我附上属性id,id为'card1', 'card2', ...。要求符合小红书平台的图文要求规则以及平替风格,还要符合小红书平台的用户审美。回复只要给出代码即可,请不要添加多余表达" 
          return f"""{prompt} \n\n小红书文案:\n\n{drafts}"""
      
      
      def get_deepseek_response(prompt, api_key):
          url = "https://api.deepseek.com/chat/completions"
          headers = {
              "Authorization": f"Bearer {api_key}",
              'Content-Type': 'application/json',
              'Accept': 'application/json',
          }
          payload = json.dumps({
              "messages": [
                  {
                      "content": prompt,
                      "role": "user"
                  }
              ],
              "model": "deepseek-reasoner",
              "frequency_penalty": 0,
              "max_tokens": 2048,
              "presence_penalty": 0,
              "response_format": {
                  "type": "text"
              },
              "stop": None,
              "stream": False,
              "stream_options": None,
              "temperature": 1,
              "top_p": 1,
              "tools": None,
              "tool_choice": "none",
              "logprobs": False,
              "top_logprobs": None
          })
          response = None
          while not response:
              try:
                  print("发送请求")
                  response = requests.post(url, data=payload, headers=headers, timeout=200)
                  response.raise_for_status()
                  if not response.json():
                      response = None
              except requests.exceptions.RequestException as e:
                  print(f"请求失败:{str(e)},开始重试...")
                  response = None
          return response.json()['choices'][0]['message']['content']
      
      def generate_html():
          api_key = os.getenv("DEEPSEEK_API_KEY")
          today = datetime.now().strftime("%Y-%m-%d")
      
          file_path = "./xiaohongshu_drafts/小红书_推广文案_千战系列" + today +".txt"
          drafts = open(file_path, "r", encoding="utf-8").read()
          prompt = build_prompt(drafts=drafts)
      
          response = get_deepseek_response(prompt, api_key)
          print(response)
          with open('./pic_generate/pic.html', 'w', encoding='utf-8') as f:
              f.write(response)
      
    • 截图:

      '''
      Author: yeffky
      Date: 2025-02-14 09:41:09
      LastEditTime: 2025-02-15 10:44:51
      '''
      from playwright.sync_api import sync_playwright
      import time
      import re
      
      def generate_pic(url):
          # 启动浏览器
          player = sync_playwright().start()  # 初始化Playwright并启动
          chrome_driver = player.chromium  # 获取Chromium浏览器实例
          browser = chrome_driver.launch(headless=False)  # 启动浏览器,headless=False表示以非无头模式启动,即显示浏览器窗口
          context = browser.new_context()  # 创建一个新的浏览器上下文(类似于一个新的浏览器窗口)
          page = context.new_page()  # 在该上下文中创建一个新的页面(标签页)
          # 访问页面
          card_cnt = 0
          with(open('./pic_generate/pic.html', 'r', encoding='utf-8')) as f:
              page_content = f.read()
              card_cnt = len(re.findall(r'<div class="card" id="card\d+">', page_content))
          print(card_cnt)
          page.goto(url)  # 导航到指定的URL
          #  截取相关卡片的截图
          for i in range(1, card_cnt + 1):
              card_pic = page.query_selector(f"id=card{i}")  # 使用CSS选择器查找页面中的搜索按钮元素
              card_pic.screenshot(path=f"./pictures/card{i}.png")  # 对搜索按钮元素进行截图并保存为b.png
      
          # 停止访问
          context.close()  # 关闭浏览器上下文
          browser.close()  # 关闭浏览器
          player.stop()  # 停止Playwright
      
      if __name__ == '__main__':
          url = 'D:/Project/UUCrawl/Code/pic_generate/pic.html'
          generate_pic(url)
      
7.6. 自动化发布
  • 目的:将生成的图片和文案自动发布到小红书
  • 实现思路:
    • 1.使用python中的selenium库,模拟页面操作,登陆后需要将cookie保存下来,下次使用时直接读取cookie,避免重复登陆。同时保存一份token,每次调用登录时检查token是否过期,如未过期则无需登录操作。
    • 2.登录后,模拟页面操作前往发布页面,使用send_keys()方法输入标题和正文,使用click()方法点击发布按钮。
    • 参考开源项目:xhs_ai_publisher
  • 代码实现:
'''
Author: yeffky
Date: 2025-02-15 20:28:32
LastEditTime: 2025-02-17 14:08:45
'''
import sys
sys.path.append("./")
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.keys import Keys
from utils import line_process
import time
import json
import os

class XiaohongshuClient:
  
    def __init__(self):
        self.driver = webdriver.Chrome()
        self.wait = WebDriverWait(self.driver, 10)
        # 获取当前执行文件所在目录
        current_dir = os.path.dirname(os.path.abspath(__file__))
        self.token_file = os.path.join(current_dir, "xiaohongshu_token.json")
        self.cookies_file = os.path.join(current_dir, "xiaohongshu_cookies.json")
        self.token = self._load_token()
        self._load_cookies()
      
    def _load_token(self):
        """从文件加载token"""
        if os.path.exists(self.token_file):
            try:
                with open(self.token_file, 'r') as f:
                    token_data = json.load(f)
                    # 检查token是否过期
                    if token_data.get('expire_time', 0) > time.time():
                        return token_data.get('token')
            except:
                pass
        return None
      
    def _save_token(self, token):
        """保存token到文件"""
        token_data = {
            'token': token,
            # token有效期设为30天
            'expire_time': time.time() + 30 * 24 * 3600
        }
        with open(self.token_file, 'w') as f:
            json.dump(token_data, f)
  
    def _load_cookies(self):
        """从文件加载cookies"""
        if os.path.exists(self.cookies_file):
            try:
                with open(self.cookies_file, 'r') as f:
                    cookies = json.load(f)
                    self.driver.get("https://creator.xiaohongshu.com")
                    for cookie in cookies:
                        self.driver.add_cookie(cookie)
            except:
                pass
  
    def _save_cookies(self):
        """保存cookies到文件"""
        cookies = self.driver.get_cookies()
        with open(self.cookies_file, 'w') as f:
            json.dump(cookies, f)
          
    def login(self, phone, country_code="+86"):
        """登录小红书"""
        # 如果token有效则直接返回
        if self.token:
            return
      
        # 尝试加载cookies进行登录
        self.driver.get("https://creator.xiaohongshu.com/login")
        self._load_cookies()
        self.driver.refresh()
        time.sleep(3)
        # 检查是否已经登录
        if self.driver.current_url != "https://creator.xiaohongshu.com/login":
            print("使用cookies登录成功")
            self.token = self._load_token()
            self._save_cookies()
            time.sleep(2)
            return
        else:
            # 清理无效的cookies
            self.driver.delete_all_cookies()
            print("无效的cookies,已清理")
          
        # 如果cookies登录失败,则进行手动登录
        self.driver.get("https://creator.xiaohongshu.com/login")

        # 等待登录页面加载完成
        time.sleep(5)
        # 点击国家区号输入框
        country_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='请选择选项']")))
        country_input.click()
        time.sleep(5)
      
        # 等待区号列表出现并点击+886
        # 等待区号列表出现并点击+86
        try:
            self.driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div/div[2]/div[1]/div[2]/div/div/div/div/div/div[2]/div[1]/div[1]/div/div/div[1]/input").click()
            time.sleep(3)
            self.driver.find_element(By.XPATH, "/html/body/div[1]/div/div/div/div[2]/div[1]/div[2]/div/div/div/div/div/div[2]/div[1]/div[1]/div/div/div[1]/input").send_keys(country_code)
            time.sleep(3)
            # self.driver.find_element(By.XPATH, "/html/body/div[6]/div/div").click()
            # china_option = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//div[contains(@class, 'css-cqcgee')]//div[contains(text(), '+86')]")))
            time.sleep(5)
        except Exception as e:
            print("无法找到国家区号选项")
            print(e)
      
        # 定位手机号输入框
        phone_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='手机号']")))
        phone_input.clear()
        phone_input.send_keys(phone)
      
        # 点击发送验证码按钮
        try:
            send_code_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".css-uyobdj")))
            send_code_btn.click()
        except:
            # 尝试其他可能的选择器
            try:
                send_code_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".css-1vfl29"))) 
                send_code_btn.click()
            except:
                try:
                    send_code_btn = self.wait.until(EC.element_to_be_clickable((By.XPATH, "//button[contains(text(),'发送验证码')]")))
                    send_code_btn.click()
                except:
                    print("无法找到发送验证码按钮")
      
        # 输入验证码
        verification_code = input("请输入验证码: ")
        code_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, "input[placeholder='验证码']")))
        code_input.clear()
        code_input.send_keys(verification_code)
              
        # 点击登录按钮
        login_button = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".beer-login-btn")))
        login_button.click()
      
        # 等待登录成功,获取token
        time.sleep(3)
        # 保存cookies
        self._save_cookies()

        # 关闭浏览器
        # self.driver.quit()
          
        # print(f"获取到的token: {token}")
      
        # if token:
        #     self._save_token(token)
        #     self.token = token
        # else:
        #     print("未能获取到token")
      
    def post_article(self, title, content, images=None):
        """发布文章
        Args:
            title: 文章标题
            content: 文章内容
            images: 图片路径列表
        """
        # 如果token失效则重新登录
          
        # 设置token
        # self.driver.execute_script(f'localStorage.setItem("token", "{self.token}")')
        time.sleep(3)
        print("点击发布按钮")
        # 点击发布按钮
        publish_btn = self.wait.until(EC.element_to_be_clickable((By.CSS_SELECTOR, ".btn.el-tooltip__trigger.el-tooltip__trigger")))
        publish_btn.click()

        # 如果是发布视频,则不操作这一步
        # 切换到上传图文
        time.sleep(3)
        tabs = self.driver.find_elements(By.CSS_SELECTOR, ".creator-tab")
        if len(tabs) > 1:
            tabs[1].click()
        time.sleep(3)
        # # 输入标题和内容
        # title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".title-input")))
        # content_input = self.driver.find_element(By.CSS_SELECTOR, ".content-input")
      
        # title_input.send_keys(title)
        # content_input.send_keys(content)
      
        # 上传图片
        if images:
            upload_input = self.driver.find_element(By.CSS_SELECTOR,'input[type="file"]')
            # 将所有图片路径用\n连接成一个字符串一次性上传
            upload_input.send_keys('\n'.join(images))
            time.sleep(1)
        time.sleep(3)
      
        JS_ADD_TEXT_TO_INPUT = """
        var elm = arguments[0], txt = arguments[1];
        elm.value += txt;
        elm.dispatchEvent(new Event('change'));
        """
      
        title_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".d-text")))
        self.driver.execute_script(JS_ADD_TEXT_TO_INPUT, title_input, title)
        # title_input.send_keys(title)
      
        # Start of Selection
        # Start of Selection
        print(content)
        JS_ADD_TEXT_TO_P = """
        var elm = arguments[0], txt = arguments[1];
        elm.textContent = txt;
        """
        content_input = self.wait.until(EC.presence_of_element_located((By.CSS_SELECTOR, ".ql-editor")))
        p_element = content_input.find_element(By.CSS_SELECTOR, "p")
        print(p_element)
        self.driver.execute_script(JS_ADD_TEXT_TO_P, p_element, content)
        # content_input.send_keys(123)
        # 发布
        time.sleep(600)
        submit_btn = self.driver.find_element(By.CSS_SELECTOR, ".el-button.publishBtn")
        submit_btn.click()
      
    def close(self):
        """关闭浏览器"""
        self.driver.quit()
      
def post_article():
    poster = XiaohongshuClient()
    phone = open('./docs/phone.txt').read()
    poster.login(phone)
    print("登录成功")
    print("开始发布文章")
    print(os.getcwd())
    title = open('./xiaohongshu_drafts/小红书_推广文案_千战系列2025-02-15.txt', 'r', encoding='utf-8').readline()
    article = line_process.get_article('./xiaohongshu_drafts/小红书_推广文案_千战系列2025-02-15.txt')
    print(article)
    images = os.listdir('./pictures')
    images = map(lambda x: os.path.join(r"D:\Project\UUCrawl\Code\pictures", x), images)
    poster.post_article(title, article, images)
    poster.close()
7.7. 主程序
from crawler import ip_crawler, data_crawler
from analysis import data_analysis
from pic_generate import pic_generate, html_generate
from post import xiaohongshu_post
import file_handler

if __name__ == '__main__':
    url = 'D:/Project/UUCrawl/Code/pic_generate/pic.html'
    # 获取IP
    ip = ip_crawler.crawl_ip()
    # 获取数据
    data = data_crawler.crawl_data()
    
    # 数据分析
    data_analysis.analysis_data()
    
    file_handler.start_observer()
    
    # 生成html
    html_generate.generate_html()
    # 生成图片
    pic_generate.generate_pic(url)

    # 发布小红书
    xiaohongshu_post.post_article()

网站公告

今日签到

点亮在社区的每一天
去签到