LangGraph4j框架的人工干预(Human-in-Loop)
在当今快速发展的技术浪潮中,人工智能和机器学习的应用已经渗透到各个领域,从自动化流程到智能决策支持,它们正在重塑我们的工作和生活方式。然而,尽管这些技术取得了巨大的进步,但在某些情况下,人类的智慧和经验仍然是不可或缺的。LangGraph4j的“人在回路”(Human-in-the-Loop)功能正是为了解决这一需求而设计的,它将人类的智慧与机器的效率相结合,为智能工作流带来了前所未有的灵活性和精准性。
一、什么是“Human-in-Loop”
“Human-in-Loop”功能是LangGraph4j的核心亮点之一。该功能允许在工作流的任何点引入人工干预,从而实现对模型输出的验证、更正或附加上下文。这种设计特别适用于大型语言模型(LLM)驱动的应用程序,因为这些模型的输出有时可能需要人工的进一步确认或调整。
二、为什么需要“Human-in-Loop”
尽管大型语言模型在处理自然语言方面表现出色,但它们并非万无一失。模型的输出可能会出现偏差、误解或缺乏必要的上下文信息。例如,在一个自动化的客户服务场景中,模型可能无法完全理解客户的复杂需求,或者在生成建议时忽略了某些关键因素。在这种情况下,人工干预就显得尤为重要。通过“Human-in-Loop”功能,人类可以对模型的输出进行审查、编辑和批准,确保最终结果的准确性和可靠性。
三、“Human-in-Loop”的关键功能
(一)持久执行状态
LangGraph4j的一个显著特点是其全局状态管理功能。该功能允许工作流在任何步骤暂停,等待人工输入,而不会丢失执行上下文。这种设计使得工作流可以在几分钟、几小时甚至几天后恢复执行,从暂停的地方继续进行。这对于需要异步人工审阅或输入的场景非常有用,例如在复杂的决策过程中,人类专家可能需要时间来评估模型的建议并提供反馈。
(二)灵活的集成点
LangGraph4j的“Human-in-Loop”功能提供了灵活的集成点,允许在工作流的任何阶段引入人工干预。这种灵活性使得开发者可以根据具体需求,将人工干预逻辑嵌入到工作流的关键环节。例如,在一个自动化营销流程中,可以在发送关键营销信息之前暂停工作流,让人工审核内容是否符合品牌形象和法律要求。
四、如何实现“Human-in-Loop”?
(1)中断机制
langgraph4j通过自定义异常来实现中断机制,这里我们定义了一个GraphInterruptException。
public class GraphInterruptException extends RuntimeException {
public GraphInterruptException(String errorMessage) {
super(errorMessage);
}
}
当某个节点需要等待人工反馈时,会抛出这个异常,导致整个流程暂停。一旦获得用户反馈后,可以通过特定方法恢复流程的执行。
(2)状态管理
状态管理是实现human-in-loop的关键部分。每个节点都可以访问和修改全局状态State,这使得在不同节点之间传递信息变得简单。同时,状态对象还包含了是否继续执行(resume)的标志位,用于控制流程是否应该被中断。
(3)节点行为定义
通过实现NodeAction接口,可以自定义节点的行为。在这个接口的apply方法中,可以根据当前的状态决定是否需要中断流程,并且可以在中断前后对状态进行更新。
(4)代码实现
LangGraphStreamingServer 类提供了一个基于 Servlet 的流式服务,允许客户端通过 HTTP 请求与服务器端的状态图进行交互。其中,resume 参数的作用是实现人为干预。resume参数用于指示是否从之前的检查点恢复执行流程。当resume为true时,系统会尝试从指定的检查点开始继续执行任务。这种机制允许在任务执行过程中插入人工审核、确认或其他形式的人工干预。
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
response.setHeader("Accept", "application/json");
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
//var session = request.getSession(true);
//Objects.requireNonNull(session, "session cannot be null");
var threadId = ofNullable(request.getParameter("thread"))
.orElseThrow(() -> new IllegalStateException("Missing thread id!"));
var resume = ofNullable(request.getParameter("resume"))
.map(Boolean::parseBoolean).orElse(false);
final PrintWriter writer = response.getWriter();
// 开始异步处理
var asyncContext = request.startAsync();
try {
AsyncGenerator<? extends NodeOutput<? extends AgentState>> generator = null;
var persistentConfig = new PersistentConfig(threadId);
var compiledGraph = graphCache.get(persistentConfig);
final Map<String, Object> candidateDataMap;
if (stateGraph.getStateSerializer() instanceof PlainTextStateSerializer<? extends AgentState> textSerializer) {
candidateDataMap = textSerializer.read(new InputStreamReader(request.getInputStream())).data();
} else {
candidateDataMap = objectMapper.readValue(request.getInputStream(), new TypeReference<>() {});
}
var dataMap = candidateDataMap.entrySet().stream()
.map( entry -> {
var newValue = args.stream()
.filter(arg -> arg.name().equals(entry.getKey()) && arg.converter() != null).findAny()
.map(arg -> arg.converter.apply(entry.getValue()));
return newValue.map( v -> entryOf(entry.getKey(), v ))
.orElse(entry);
})
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue) );
// 保存输入
aigcReportService.saveReport(threadId,String.valueOf(dataMap.get("input")),"",0);
if (resume) {
log.trace("RESUME REQUEST PREPARE");
if (compiledGraph == null) {
throw new IllegalStateException("Missing CompiledGraph in session!");
}
var checkpointId = ofNullable(request.getParameter("checkpoint"))
.orElseThrow(() -> new IllegalStateException("Missing checkpoint id!"));
var node = request.getParameter("node");
var runnableConfig = RunnableConfig.builder()
.threadId(threadId)
.checkPointId(checkpointId)
.nextNode(node)
.build();
var stateSnapshot = compiledGraph.getState(runnableConfig);
runnableConfig = stateSnapshot.config();
log.trace("RESUME UPDATE STATE FORM {} USING CONFIG {}\n{}", node, runnableConfig, dataMap);
// 添加resume
dataMap.put(RESUME, true);
runnableConfig = compiledGraph.updateState(runnableConfig, dataMap, node);
log.trace("RESUME REQUEST STREAM {}", runnableConfig);
generator = compiledGraph.streamSnapshots(null, runnableConfig);
} else {
log.trace("dataMap: {}", dataMap);
if (compiledGraph == null) {
compiledGraph = stateGraph.compile(compileConfig(persistentConfig));
graphCache.put(persistentConfig, compiledGraph);
}
generator = compiledGraph.streamSnapshots(dataMap, runnableConfig(persistentConfig));
}
CompiledGraph<?> finalCompiledGraph = compiledGraph;
generator.forEachAsync(s -> {
try {
serializeOutput(writer, threadId, s);
writer.println();
writer.flush();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new CompletionException(e);
}
})
//.thenAccept(v -> writer.close())
//.thenAccept(v -> asyncContext.complete())
.thenAccept(v -> {
// 获取最终 state
StateSnapshot<? extends AgentState> finalState = finalCompiledGraph.getState(runnableConfig(persistentConfig));
// 提取最终结果
AgentState state = finalState.state();
String userInput = (String) state.value("input").orElseThrow();
String reportResult = (String) state.value("report_output").orElseThrow();
// 存储到数据库
aigcReportService.saveReport(threadId,userInput,reportResult,1);
writer.close();
asyncContext.complete();
})
.exceptionally(e -> {
log.error("Error streaming", e);
writer.close();
asyncContext.complete();
return null;
});
} catch (Throwable e) {
log.error("Error streaming", e);
throw new ServletException(e);
}
}
通过以上的介绍,我们确认了通过自定义异常实现中断,通过调整resume为true恢复执行。基于以上两个机制,我们可以实现一个简单的带有human-in-loop功能的节点:
public class HumanFeedbackNode implements NodeAction<State> {
private static final Logger log = LoggerFactory.getLogger(HumanFeedbackNode.class);
// always or conditioned
private final String interruptStrategy;
private Function<State, Boolean> interruptCondition;
private Function<State, Map<String, Object>> stateUpdateFunc;
public HumanFeedbackNode() {
this.interruptStrategy = "always";
}
public HumanFeedbackNode(String interruptStrategy, Function<State, Boolean> interruptCondition) {
this.interruptStrategy = interruptStrategy;
this.interruptCondition = interruptCondition;
}
public HumanFeedbackNode(String interruptStrategy, Function<State, Boolean> interruptCondition,
Function<State, Map<String, Object>> stateUpdateFunc) {
this.interruptStrategy = interruptStrategy;
this.interruptCondition = interruptCondition;
this.stateUpdateFunc = stateUpdateFunc;
}
@Override
public Map<String, Object> apply(State state) throws GraphInterruptException{
var shouldInterrupt = "always".equals(interruptStrategy)
|| ("conditioned".equals(interruptStrategy) && interruptCondition.apply(state));
if (shouldInterrupt) {
interrupt(state);
Map<String, Object> data = new HashMap<>();
if (stateUpdateFunc != null) {
data = stateUpdateFunc.apply(state);
}
else {
data = state.data();
}
return AgentState.updateState(data, Map.of(RESUME, false), State.SCHEMA);
}
return Map.of();
}
/**
* 中断流程
* @param state
* @throws GraphInterruptException
*/
private void interrupt(State state) throws GraphInterruptException{
if (!state.isResume()) {
log.warn("[HumanFeedbackNode] 流程中断了...");
throw new GraphInterruptException("interrupt");
}
}
}
代码实现说明
构造函数:提供了多种构造方式,允许设置不同的中断策略(始终中断、条件中断等)。
apply方法:这是节点的主要逻辑所在。根据中断策略判断是否需要中断流程,如果需要,则调用interrupt方法并抛出GraphInterruptException。
interrupt方法:检查State状态中的resume的值,如果不为true,则抛出异常中断流程。
状态更新:在中断之后,可以选择使用自定义的状态更新函数或者直接返回当前状态的数据
五、总结
LangGraph4j的“Human-in-Loop”功能为智能工作流的提供了一个强大的工具。通过将人类的智慧与机器的效率相结合,它不仅提高了工作流的灵活性和精准性,还为企业和开发者提供了更多的控制权和安全保障。同时,“Human-in-Loop”功能也为我们提供了一个重要的启示:技术的进步并非意味着完全取代人类,而是通过与人类智慧的结合,实现更加高效、精准和人性化的解决方案。