diff --git a/src/main/resources/template/AGENTS.md b/src/main/resources/template/AGENTS.md
index d7a7aaa1c95b13f59d73fb592fdb55fb98f086a0..799087903d65b6445d22d12db4c7c0b0ab93c4e8 100644
--- a/src/main/resources/template/AGENTS.md
+++ b/src/main/resources/template/AGENTS.md
@@ -97,12 +97,13 @@
- 如果当前运行没有可靠的“当前时间”来源,就明确说明“无法确认当前时间”,不要自行编造现在几点,也不要基于不确定时间创建任务
- 对相对时间表达,应先基于真实当前时间换算出绝对触发时间,再创建任务
- 对绝对时间表达,必须结合真实当前时间判断这是今天稍后、明天,还是一个已经过去的时间;不要默认把已过期时间当作未来时间
-- 除非用户明确要求你使用其他机制,否则所有时间触发型需求都优先使用 `add_job`
-- 对时间触发型需求,是否成功必须以 `add_job` 等定时任务工具实际调用成功为准;只有工具成功返回后,才能对用户声称已创建、已安排、已设置或将会按时执行
-- 如果没有真正调用 `add_job`,或工具调用失败、参数不完整、结果不确定,就必须明确说明“尚未创建成功”以及原因,不能仅凭语言生成一个看似已安排的回复
-- 创建定时任务时,要先把用户当前意图抽象成“未来触发时可独立执行的任务指令”,再作为 `add_job` 的 `prompt` 保存
-- `add_job` 的 `prompt` 必须自包含、可脱离当前对话单独理解;不要把依赖当下语境、代词指代、临时省略、上下文暗示的表达原样保存进去
-- 生成 `prompt` 时,应保留用户真实目标、对象、约束和期望结果,把口语化请求整理成未来可直接执行的任务描述,而不是简单复述用户原话
+- 除非用户明确要求你使用其他机制,否则所有时间触发型需求都优先使用现有定时任务工具:提醒、通知、向主会话注入内部事件时优先使用 `add_system_job`;需要未来自动执行一段独立任务时优先使用 `add_agent_job`
+- 对时间触发型需求,是否成功必须以 `add_system_job` / `add_agent_job` 等定时任务工具实际调用成功为准;只有工具成功返回后,才能对用户声称已创建、已安排、已设置或将会按时执行
+- 如果没有真正调用 `add_system_job` 或 `add_agent_job`,或工具调用失败、参数不完整、结果不确定,就必须明确说明“尚未创建成功”以及原因,不能仅凭语言生成一个看似已安排的回复
+- 创建 `add_system_job` 时,要先把用户当前意图抽象成“未来触发时可独立理解的 system event 文本”,再作为 `systemEventText` 保存
+- 创建 `add_agent_job` 时,要先把用户当前意图抽象成“未来触发时可独立执行的任务指令”,再作为 `message` 保存
+- `add_system_job.systemEventText` 和 `add_agent_job.message` 都必须自包含、可脱离当前对话单独理解;不要把依赖当下语境、代词指代、临时省略、上下文暗示的表达原样保存进去
+- 生成未来任务文本时,应保留用户真实目标、对象、约束和期望结果,把口语化请求整理成未来可直接执行的任务描述,而不是简单复述用户原话
- 如果用户目标已经足够明确,直接生成可执行 `prompt`,不要为了形式完整而追问显而易见的信息
- 只有在关键目标、对象或执行前提缺失,并且这种缺失会导致未来任务无法正确执行时,才允许提问补充
- 创建定时任务后,不要在当前运行里再次自行等待并额外通知一次;后续提醒只应由定时任务触发
diff --git a/src/test/java/com/jimuqu/claw/agent/job/JobStoreServiceTest.java b/src/test/java/com/jimuqu/claw/agent/job/JobStoreServiceTest.java
index e1122e535c1c98dd046c22a686355fb702606ae8..d016daf0e094c737ac89e738003322ba95b8844f 100644
--- a/src/test/java/com/jimuqu/claw/agent/job/JobStoreServiceTest.java
+++ b/src/test/java/com/jimuqu/claw/agent/job/JobStoreServiceTest.java
@@ -23,9 +23,13 @@ class JobStoreServiceTest {
definition.setName("demo");
definition.setMode("once_delay");
definition.setScheduleValue("1000");
- definition.setPrompt("hello");
- definition.setSessionKey("dingtalk:private:demo");
- definition.setReplyTarget(new ReplyTarget(ChannelType.DINGTALK, ConversationType.PRIVATE, "cid", "uid"));
+ definition.setPayloadKind(JobPayloadKind.SYSTEM_EVENT);
+ definition.setSessionTarget(JobSessionTarget.MAIN);
+ definition.setWakeMode(JobWakeMode.NOW);
+ definition.setDeliveryMode(JobDeliveryMode.NONE);
+ definition.setSystemEventText("hello");
+ definition.setBoundSessionKey("dingtalk:private:demo");
+ definition.setBoundReplyTarget(new ReplyTarget(ChannelType.DINGTALK, ConversationType.PRIVATE, "cid", "uid"));
definition.setEnabled(true);
definition.setCreatedAt(1L);
definition.setUpdatedAt(2L);
@@ -36,7 +40,8 @@ class JobStoreServiceTest {
assertNotNull(saved);
assertEquals("once_delay", saved.getMode());
assertEquals("1000", saved.getScheduleValue());
- assertEquals("hello", saved.getPrompt());
+ assertEquals(JobPayloadKind.SYSTEM_EVENT, saved.getPayloadKind());
+ assertEquals("hello", saved.getSystemEventText());
assertTrue(storeService.getJobsFile().exists());
}
}
diff --git a/src/test/java/com/jimuqu/claw/agent/job/WorkspaceJobServiceTest.java b/src/test/java/com/jimuqu/claw/agent/job/WorkspaceJobServiceTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..1c2c308141d96cc30dddc9878c9ed84796b0f456
--- /dev/null
+++ b/src/test/java/com/jimuqu/claw/agent/job/WorkspaceJobServiceTest.java
@@ -0,0 +1,356 @@
+package com.jimuqu.claw.agent.job;
+
+import com.jimuqu.claw.agent.channel.ChannelRegistry;
+import com.jimuqu.claw.agent.model.enums.ChannelType;
+import com.jimuqu.claw.agent.model.enums.ConversationType;
+import com.jimuqu.claw.agent.model.enums.RuntimeSourceKind;
+import com.jimuqu.claw.agent.model.enums.SystemEventPolicy;
+import com.jimuqu.claw.agent.model.route.ReplyTarget;
+import com.jimuqu.claw.agent.runtime.api.ConversationAgent;
+import com.jimuqu.claw.agent.runtime.impl.ConversationScheduler;
+import com.jimuqu.claw.agent.runtime.impl.IsolatedAgentRunService;
+import com.jimuqu.claw.agent.runtime.impl.SystemEventRunner;
+import com.jimuqu.claw.agent.runtime.support.AgentTurnRequest;
+import com.jimuqu.claw.agent.runtime.support.SystemEventRequest;
+import com.jimuqu.claw.agent.store.RuntimeStoreService;
+import com.jimuqu.claw.agent.workspace.AgentWorkspaceService;
+import com.jimuqu.claw.config.SolonClawProperties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.noear.solon.core.Lifecycle;
+import org.noear.solon.core.util.RankEntity;
+import org.noear.solon.scheduling.ScheduledException;
+import org.noear.solon.scheduling.annotation.Scheduled;
+import org.noear.solon.scheduling.scheduled.JobHandler;
+import org.noear.solon.scheduling.scheduled.JobHolder;
+import org.noear.solon.scheduling.scheduled.JobInterceptor;
+import org.noear.solon.scheduling.scheduled.manager.IJobManager;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class WorkspaceJobServiceTest {
+ @TempDir
+ Path tempDir;
+
+ @Test
+ void addSystemJobPersistsNewPayloadFields() {
+ TestContext ctx = new TestContext(tempDir);
+
+ JobDefinition definition = ctx.workspaceJobService.addSystemJob(
+ "drink-water",
+ "fixed_rate",
+ "60000",
+ "提醒我喝水",
+ 0L,
+ "Asia/Shanghai",
+ JobWakeMode.NOW
+ );
+
+ assertEquals(JobPayloadKind.SYSTEM_EVENT, definition.getPayloadKind());
+ assertEquals(JobSessionTarget.MAIN, definition.getSessionTarget());
+ assertEquals(JobWakeMode.NOW, definition.getWakeMode());
+ assertEquals(JobDeliveryMode.NONE, definition.getDeliveryMode());
+ assertEquals("提醒我喝水", definition.getSystemEventText());
+ assertEquals(ctx.boundSessionKey, definition.getBoundSessionKey());
+ assertEquals(ctx.boundReplyTarget.getConversationId(), definition.getBoundReplyTarget().getConversationId());
+ assertNotNull(ctx.jobStoreService.get("drink-water"));
+ }
+
+ @Test
+ void addAgentJobPersistsAgentTurnFields() {
+ TestContext ctx = new TestContext(tempDir);
+ AgentTurnSpec spec = new AgentTurnSpec();
+ spec.setMessage("检查服务器状态");
+ spec.setModel("qwen");
+ spec.setThinking("medium");
+ spec.setTimeoutSeconds(120);
+ spec.setLightContext(true);
+
+ JobDefinition definition = ctx.workspaceJobService.addAgentJob(
+ "check-server",
+ "fixed_rate",
+ "60000",
+ spec,
+ 0L,
+ "Asia/Shanghai",
+ JobDeliveryMode.LAST_ROUTE
+ );
+
+ assertEquals(JobPayloadKind.AGENT_TURN, definition.getPayloadKind());
+ assertEquals(JobSessionTarget.ISOLATED, definition.getSessionTarget());
+ assertEquals(JobDeliveryMode.LAST_ROUTE, definition.getDeliveryMode());
+ assertEquals("检查服务器状态", definition.getAgentTurn().getMessage());
+ assertEquals("qwen", definition.getAgentTurn().getModel());
+ assertTrue(definition.getAgentTurn().isLightContext());
+ }
+
+ @Test
+ void invalidSystemEventAndAgentTurnCombinationsAreRejected() {
+ TestContext ctx = new TestContext(tempDir);
+
+ JobDefinition invalidSystem = new JobDefinition();
+ invalidSystem.setName("bad-system");
+ invalidSystem.setMode("fixed_rate");
+ invalidSystem.setScheduleValue("60000");
+ invalidSystem.setPayloadKind(JobPayloadKind.SYSTEM_EVENT);
+ invalidSystem.setSessionTarget(JobSessionTarget.ISOLATED);
+ invalidSystem.setWakeMode(JobWakeMode.NOW);
+ invalidSystem.setDeliveryMode(JobDeliveryMode.NONE);
+ invalidSystem.setBoundSessionKey(ctx.boundSessionKey);
+ invalidSystem.setBoundReplyTarget(ctx.boundReplyTarget);
+ invalidSystem.setSystemEventText("bad");
+ ctx.jobStoreService.save(invalidSystem);
+
+ JobDefinition invalidAgent = new JobDefinition();
+ invalidAgent.setName("bad-agent");
+ invalidAgent.setMode("fixed_rate");
+ invalidAgent.setScheduleValue("60000");
+ invalidAgent.setPayloadKind(JobPayloadKind.AGENT_TURN);
+ invalidAgent.setSessionTarget(JobSessionTarget.MAIN);
+ invalidAgent.setDeliveryMode(JobDeliveryMode.BOUND_REPLY_TARGET);
+ invalidAgent.setBoundSessionKey(ctx.boundSessionKey);
+ invalidAgent.setBoundReplyTarget(ctx.boundReplyTarget);
+ AgentTurnSpec spec = new AgentTurnSpec();
+ spec.setMessage("bad");
+ invalidAgent.setAgentTurn(spec);
+ ctx.jobStoreService.save(invalidAgent);
+
+ assertThrows(IllegalArgumentException.class, () -> ctx.workspaceJobService.startJob("bad-system"));
+ assertThrows(IllegalArgumentException.class, () -> ctx.workspaceJobService.startJob("bad-agent"));
+ }
+
+ @Test
+ void schedulerDispatchesSystemEventAndAgentTurnToDifferentRunners() throws Exception {
+ TestContext ctx = new TestContext(tempDir);
+ AgentTurnSpec spec = new AgentTurnSpec();
+ spec.setMessage("收集日志");
+
+ ctx.workspaceJobService.addSystemJob(
+ "system-job",
+ "fixed_rate",
+ "60000",
+ "提醒我站起来活动一下",
+ 0L,
+ "Asia/Shanghai",
+ JobWakeMode.NOW
+ );
+ ctx.workspaceJobService.addAgentJob(
+ "agent-job",
+ "fixed_rate",
+ "60000",
+ spec,
+ 0L,
+ "Asia/Shanghai",
+ JobDeliveryMode.BOUND_REPLY_TARGET
+ );
+
+ ctx.jobManager.trigger("system-job");
+ ctx.jobManager.trigger("agent-job");
+
+ assertNotNull(ctx.systemEventRunner.lastRequest.get());
+ assertEquals(RuntimeSourceKind.JOB_SYSTEM_EVENT, ctx.systemEventRunner.lastRequest.get().getSourceKind());
+ assertEquals(SystemEventPolicy.USER_VISIBLE_OPTIONAL, ctx.systemEventRunner.lastRequest.get().getPolicy());
+ assertEquals("提醒我站起来活动一下", ctx.systemEventRunner.lastRequest.get().getContent());
+
+ assertNotNull(ctx.isolatedAgentRunService.lastRequest.get());
+ assertEquals(RuntimeSourceKind.JOB_AGENT_TURN, ctx.isolatedAgentRunService.lastRequest.get().getSourceKind());
+ assertEquals("收集日志", ctx.isolatedAgentRunService.lastRequest.get().getAgentTurn().getMessage());
+ assertEquals(JobDeliveryMode.BOUND_REPLY_TARGET, ctx.isolatedAgentRunService.lastRequest.get().getDeliveryMode());
+ }
+
+ private static final class TestContext {
+ private final ReplyTarget boundReplyTarget;
+ private final String boundSessionKey;
+ private final TestJobManager jobManager;
+ private final JobStoreService jobStoreService;
+ private final RuntimeStoreService runtimeStoreService;
+ private final CapturingSystemEventRunner systemEventRunner;
+ private final CapturingIsolatedAgentRunService isolatedAgentRunService;
+ private final WorkspaceJobService workspaceJobService;
+ private final ConversationScheduler scheduler;
+
+ private TestContext(Path tempDir) {
+ SolonClawProperties properties = new SolonClawProperties();
+ AgentWorkspaceService workspaceService = new AgentWorkspaceService(tempDir.toString());
+ this.jobManager = new TestJobManager();
+ this.jobStoreService = new JobStoreService(workspaceService);
+ this.runtimeStoreService = new RuntimeStoreService(tempDir.resolve("runtime").toFile());
+ this.scheduler = new ConversationScheduler(1);
+ this.boundReplyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.PRIVATE, "cid", "uid");
+ this.boundSessionKey = "dingtalk:private:cid";
+ this.runtimeStoreService.rememberReplyTarget(boundSessionKey, boundReplyTarget);
+
+ ConversationAgent noopAgent = (request, progressConsumer) -> "noop";
+ ChannelRegistry registry = new ChannelRegistry();
+ this.systemEventRunner = new CapturingSystemEventRunner(
+ noopAgent,
+ runtimeStoreService,
+ scheduler,
+ registry,
+ properties
+ );
+ this.isolatedAgentRunService = new CapturingIsolatedAgentRunService(
+ noopAgent,
+ runtimeStoreService,
+ scheduler,
+ registry,
+ properties
+ );
+ this.workspaceJobService = new WorkspaceJobService(
+ jobManager,
+ jobStoreService,
+ runtimeStoreService,
+ systemEventRunner,
+ isolatedAgentRunService,
+ properties
+ );
+ }
+ }
+
+ private static class CapturingSystemEventRunner extends SystemEventRunner {
+ private final AtomicReference
lastRequest = new AtomicReference();
+
+ private CapturingSystemEventRunner(
+ ConversationAgent conversationAgent,
+ RuntimeStoreService runtimeStoreService,
+ ConversationScheduler conversationScheduler,
+ ChannelRegistry channelRegistry,
+ SolonClawProperties properties
+ ) {
+ super(
+ conversationAgent,
+ runtimeStoreService,
+ conversationScheduler,
+ channelRegistry,
+ properties
+ );
+ }
+
+ @Override
+ public String submit(SystemEventRequest request) {
+ lastRequest.set(request);
+ return "captured-system";
+ }
+ }
+
+ private static class CapturingIsolatedAgentRunService extends IsolatedAgentRunService {
+ private final AtomicReference lastRequest = new AtomicReference();
+
+ private CapturingIsolatedAgentRunService(
+ ConversationAgent conversationAgent,
+ RuntimeStoreService runtimeStoreService,
+ ConversationScheduler conversationScheduler,
+ ChannelRegistry channelRegistry,
+ SolonClawProperties properties
+ ) {
+ super(
+ conversationAgent,
+ runtimeStoreService,
+ conversationScheduler,
+ channelRegistry,
+ properties
+ );
+ }
+
+ @Override
+ public String submit(AgentTurnRequest request) {
+ lastRequest.set(request);
+ return "captured-agent";
+ }
+ }
+
+ private static class TestJobManager implements IJobManager {
+ private final Map jobs = new LinkedHashMap();
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void stop() {
+ }
+
+ @Override
+ public void addJobInterceptor(int index, JobInterceptor interceptor) {
+ }
+
+ @Override
+ public boolean hasJobInterceptor() {
+ return false;
+ }
+
+ @Override
+ public List> getJobInterceptors() {
+ return new ArrayList>();
+ }
+
+ @Override
+ public JobHolder jobAdd(String name, Scheduled scheduled, JobHandler handler) {
+ JobHolder holder = new JobHolder(this, name, scheduled, handler);
+ jobs.put(name, holder);
+ return holder;
+ }
+
+ @Override
+ public JobHolder jobAdd(String name, Scheduled scheduled, JobHandler handler, Map data) {
+ JobHolder holder = jobAdd(name, scheduled, handler);
+ holder.setData(data);
+ return holder;
+ }
+
+ @Override
+ public boolean jobExists(String name) {
+ return jobs.containsKey(name);
+ }
+
+ @Override
+ public JobHolder jobGet(String name) {
+ return jobs.get(name);
+ }
+
+ @Override
+ public Map jobGetAll() {
+ return jobs;
+ }
+
+ @Override
+ public void jobRemove(String name) {
+ jobs.remove(name);
+ }
+
+ @Override
+ public void jobStart(String name, Map data) throws ScheduledException {
+ }
+
+ @Override
+ public void jobStop(String name) throws ScheduledException {
+ }
+
+ @Override
+ public boolean isStarted() {
+ return true;
+ }
+
+ private void trigger(String name) throws Exception {
+ JobHolder holder = jobs.get(name);
+ if (holder == null) {
+ throw new IllegalArgumentException("job 不存在: " + name);
+ }
+ try {
+ holder.getHandler().handle(null);
+ } catch (Throwable throwable) {
+ throw new RuntimeException(throwable);
+ }
+ }
+ }
+}
diff --git a/src/test/java/com/jimuqu/claw/agent/runtime/AgentRuntimeServiceTest.java b/src/test/java/com/jimuqu/claw/agent/runtime/AgentRuntimeServiceTest.java
index f319013e61cd8b914c45e9b9ba7a28a3654247d1..39ee74afbc90638752b04041b84c4becae07794c 100644
--- a/src/test/java/com/jimuqu/claw/agent/runtime/AgentRuntimeServiceTest.java
+++ b/src/test/java/com/jimuqu/claw/agent/runtime/AgentRuntimeServiceTest.java
@@ -2,22 +2,24 @@ package com.jimuqu.claw.agent.runtime;
import com.jimuqu.claw.agent.channel.ChannelAdapter;
import com.jimuqu.claw.agent.channel.ChannelRegistry;
-import com.jimuqu.claw.agent.model.run.AgentRun;
-import com.jimuqu.claw.agent.model.enums.ChannelType;
-import com.jimuqu.claw.agent.model.event.ConversationEvent;
-import com.jimuqu.claw.agent.model.enums.ConversationType;
import com.jimuqu.claw.agent.model.envelope.InboundEnvelope;
-import com.jimuqu.claw.agent.model.enums.InboundTriggerType;
import com.jimuqu.claw.agent.model.envelope.OutboundEnvelope;
-import com.jimuqu.claw.agent.model.route.ReplyTarget;
+import com.jimuqu.claw.agent.model.enums.ChannelType;
+import com.jimuqu.claw.agent.model.enums.ConversationType;
+import com.jimuqu.claw.agent.model.enums.RuntimeSourceKind;
import com.jimuqu.claw.agent.model.enums.RunStatus;
+import com.jimuqu.claw.agent.model.event.RunEvent;
+import com.jimuqu.claw.agent.model.route.ReplyTarget;
+import com.jimuqu.claw.agent.model.run.AgentRun;
import com.jimuqu.claw.agent.runtime.api.ConversationAgent;
import com.jimuqu.claw.agent.runtime.impl.AgentRuntimeService;
import com.jimuqu.claw.agent.runtime.impl.ConversationScheduler;
+import com.jimuqu.claw.agent.runtime.impl.SystemEventRunner;
import com.jimuqu.claw.agent.runtime.support.ConversationExecutionRequest;
+import com.jimuqu.claw.agent.runtime.support.DeliveryResult;
import com.jimuqu.claw.agent.runtime.support.NotificationResult;
-import com.jimuqu.claw.agent.store.RuntimeStoreService;
import com.jimuqu.claw.agent.runtime.support.ParentRunChildrenSummary;
+import com.jimuqu.claw.agent.store.RuntimeStoreService;
import com.jimuqu.claw.config.SolonClawProperties;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -27,27 +29,19 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * 验证 Agent 运行时的并发调度和忙时回执行为。
- */
class AgentRuntimeServiceTest {
- /** 临时测试目录。 */
@TempDir
Path tempDir;
- /**
- * 验证同会话繁忙时第二条消息会收到即时回执。
- *
- * @throws Exception 执行异常
- */
@Test
void secondMessageGetsImmediateAckWhenConversationBusy() throws Exception {
CountDownLatch firstStarted = new CountDownLatch(1);
@@ -69,25 +63,21 @@ class AgentRuntimeServiceTest {
ChannelRegistry registry = new ChannelRegistry();
RecordingChannelAdapter adapter = new RecordingChannelAdapter();
registry.register(adapter);
-
SolonClawProperties properties = new SolonClawProperties();
properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
properties.getAgent().getScheduler().setAckWhenBusy(true);
+ AgentRuntimeService runtimeService = runtimeService(conversationAgent, store, scheduler, registry, properties);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
-
String firstRunId = runtimeService.submitInbound(inbound("msg-1", "question-1"));
assertTrue(firstStarted.await(2, TimeUnit.SECONDS));
String secondRunId = runtimeService.submitInbound(inbound("msg-2", "question-2"));
assertNotNull(firstRunId);
assertNotNull(secondRunId);
-
assertTrue(waitUntil(() -> adapter.messages.stream().anyMatch(message -> message.contains("已收到")), 2000));
releaseFirst.countDown();
-
assertTrue(waitUntil(() -> {
AgentRun run1 = runtimeService.getRun(firstRunId);
AgentRun run2 = runtimeService.getRun(secondRunId);
@@ -95,240 +85,12 @@ class AgentRuntimeServiceTest {
&& run1.getStatus() == RunStatus.SUCCEEDED
&& run2.getStatus() == RunStatus.SUCCEEDED;
}, 5000));
-
- assertEquals(3, adapter.outbounds.size());
- assertEquals("reply-question-1", runtimeService.getRun(firstRunId).getFinalResponse());
- assertEquals("reply-question-2", runtimeService.getRun(secondRunId).getFinalResponse());
} finally {
releaseFirst.countDown();
scheduler.shutdown();
}
}
- /**
- * 验证父运行可派生子任务,子任务完成后会触发父会话 continuation run。
- *
- * @throws Exception 执行异常
- */
- @Test
- void childRunCompletionContinuesParentConversation() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- String message = request.getCurrentMessage();
- if ("question-parent".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("research-child");
- progressConsumer.accept("spawned");
- return "parent-waiting";
- }
- if ("research-child".equals(message)) {
- progressConsumer.accept("child-running");
- return "child-result";
- }
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- return "final-parent-answer";
- }
- return "reply-" + message;
- };
-
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- String parentRunId = runtimeService.submitInbound(inbound("msg-parent", "question-parent"));
- assertNotNull(parentRunId);
-
- assertTrue(waitUntil(() -> {
- AgentRun parentRun = runtimeService.getRun(parentRunId);
- return parentRun != null && parentRun.getStatus() == RunStatus.WAITING_CHILDREN;
- }, 3000));
-
- assertTrue(waitUntil(() -> adapter.messages.contains("final-parent-answer"), 5000));
-
- assertEquals(1, adapter.outbounds.size());
- assertEquals("final-parent-answer", adapter.outbounds.get(0).getContent());
- assertEquals(RunStatus.WAITING_CHILDREN, runtimeService.getRun(parentRunId).getStatus());
- } finally {
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证子任务默认不能继续派生新的子任务,避免无边界扇出。
- *
- * @throws Exception 执行异常
- */
- @Test
- void childRunCannotSpawnNestedChildByDefault() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- String message = request.getCurrentMessage();
- if ("question-parent-nested".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("child-needs-more");
- return "parent-waiting";
- }
- if ("child-needs-more".equals(message)) {
- try {
- request.getSpawnTaskSupport().spawnTask("nested-child");
- return "nested-allowed";
- } catch (IllegalStateException e) {
- return "child-blocked:" + e.getMessage();
- }
- }
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- return message.contains("当前子任务默认禁止继续派生子任务")
- ? "parent-saw-nested-block"
- : "parent-missed-nested-block";
- }
- return "reply-" + message;
- };
-
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
- properties.getAgent().getSubtasks().setAllowNestedSpawn(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- String parentRunId = runtimeService.submitInbound(inbound("msg-parent-nested", "question-parent-nested"));
- assertNotNull(parentRunId);
-
- assertTrue(waitUntil(() -> adapter.messages.contains("parent-saw-nested-block"), 5000));
- assertEquals(1, runtimeService.listChildRuns(parentRunId, null).size());
- assertTrue(store.getRunEvents(runtimeService.listChildRuns(parentRunId, null).get(0).getRunId(), 0).stream()
- .anyMatch(event -> "spawn_task_blocked".equals(event.getEventType())));
- } finally {
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证后续一句“看看上个任务的情况”可以通过查询能力读取最近子任务状态。
- *
- * @throws Exception 执行异常
- */
- @Test
- void followupMessageCanInspectLatestChildRunStatus() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- String message = request.getCurrentMessage();
- if ("question-parent".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("research-child");
- return "parent-waiting";
- }
- if ("research-child".equals(message)) {
- return "child-result";
- }
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- return "child-finished";
- }
- if ("看看上个任务的情况".equals(message)) {
- AgentRun latestChild = request.getRunQuerySupport().getLatestChildRun();
- return "latest-child-status=" + (latestChild == null ? "NONE" : latestChild.getStatus());
- }
- return "reply-" + message;
- };
-
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- runtimeService.submitInbound(inbound("msg-parent", "question-parent"));
- assertTrue(waitUntil(() -> adapter.messages.contains("child-finished"), 5000));
-
- String inspectRunId = runtimeService.submitInbound(inbound("msg-inspect", "看看上个任务的情况"));
- assertNotNull(inspectRunId);
- assertTrue(waitUntil(() -> adapter.messages.contains("latest-child-status=SUCCEEDED"), 5000));
- } finally {
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证子任务完成后,父运行会记录更清晰的调试事件,并向 continuation 注入结构化汇总。
- *
- * @throws Exception 执行异常
- */
- @Test
- void childCompletionAddsParentDebugEventsAndStructuredSummary() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- String message = request.getCurrentMessage();
- if ("question-parent-summary".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("summary-child");
- return "parent-waiting";
- }
- if ("summary-child".equals(message)) {
- return "summary-child-result";
- }
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- return message.contains("全部子任务汇总: total=1")
- && message.contains("FINAL_REPLY_ONCE:")
- && message.contains("NO_REPLY")
- ? "parent-saw-summary-guidance"
- : "parent-missed-summary-guidance";
- }
- return "reply-" + message;
- };
-
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- String parentRunId = runtimeService.submitInbound(inbound("msg-parent-summary", "question-parent-summary"));
- assertNotNull(parentRunId);
-
- assertTrue(waitUntil(() -> adapter.messages.contains("parent-saw-summary-guidance"), 5000));
-
- List eventTypes = store.getRunEvents(parentRunId, 0).stream()
- .map(event -> event.getEventType())
- .collect(java.util.stream.Collectors.toList());
- assertTrue(eventTypes.contains("child_completion_received"));
- assertTrue(eventTypes.contains("children_all_completed"));
- assertTrue(eventTypes.contains("child_continuation_submitted"));
-
- List events = store.readConversationEvents("dingtalk:group:group-1");
- assertTrue(events.stream().anyMatch(event ->
- "system_event".equals(event.getEventType())
- && event.getContent() != null
- && event.getContent().contains("全部子任务汇总: total=1")
- && event.getContent().contains("FINAL_REPLY_ONCE:")
- && event.getContent().contains("NO_REPLY")));
- } finally {
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证当前运行可通过主动通知能力直接向当前会话用户发消息。
- *
- * @throws Exception 执行异常
- */
@Test
void runCanNotifyUserProactively() throws Exception {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
@@ -336,7 +98,6 @@ class AgentRuntimeServiceTest {
ChannelRegistry registry = new ChannelRegistry();
RecordingChannelAdapter adapter = new RecordingChannelAdapter();
registry.register(adapter);
-
ConversationAgent conversationAgent = (request, progressConsumer) -> {
if ("请主动通知我".equals(request.getCurrentMessage())) {
NotificationResult result = request.getNotificationSupport().notifyUser("这是一条主动通知", false);
@@ -345,316 +106,104 @@ class AgentRuntimeServiceTest {
}
return "reply-" + request.getCurrentMessage();
};
-
SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
+ AgentRuntimeService runtimeService = runtimeService(conversationAgent, store, scheduler, registry, properties);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
String runId = runtimeService.submitInbound(inbound("msg-notify", "请主动通知我"));
assertNotNull(runId);
assertTrue(waitUntil(() -> adapter.messages.contains("这是一条主动通知"), 5000));
assertEquals(1, adapter.outbounds.size());
- assertEquals("这是一条主动通知", adapter.outbounds.get(0).getContent());
} finally {
scheduler.shutdown();
}
}
- /**
- * 验证可按父运行聚合多个子任务,并判断是否全部完成。
- *
- * @throws Exception 执行异常
- */
@Test
- void followupMessageCanInspectParentRunChildSummary() throws Exception {
+ void childRunCompletionUsesSystemEventRunnerForAggregateReply() throws Exception {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
ConversationScheduler scheduler = new ConversationScheduler(1);
ChannelRegistry registry = new ChannelRegistry();
RecordingChannelAdapter adapter = new RecordingChannelAdapter();
registry.register(adapter);
- CountDownLatch slowChildStarted = new CountDownLatch(1);
- CountDownLatch releaseSlowChild = new CountDownLatch(1);
-
ConversationAgent conversationAgent = (request, progressConsumer) -> {
String message = request.getCurrentMessage();
- if ("question-parent-multi".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("child-fast-1");
- request.getSpawnTaskSupport().spawnTask("child-fast-2");
- request.getSpawnTaskSupport().spawnTask("child-slow-3");
- return "parent-waiting-multi";
- }
- if ("child-fast-1".equals(message) || "child-fast-2".equals(message)) {
- return "done-" + message;
- }
- if ("child-slow-3".equals(message)) {
- slowChildStarted.countDown();
- assertTrue(releaseSlowChild.await(5, TimeUnit.SECONDS));
- return "done-" + message;
- }
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- return "child-finished";
- }
- if ("看看这批子任务是否都完成了".equals(message)) {
- ParentRunChildrenSummary summary = request.getRunQuerySupport().getChildSummary(null, null);
- if (summary == null) {
- return "summary-missing";
- }
- return "summary total=" + summary.getTotalChildren()
- + " pending=" + summary.getPendingChildren()
- + " allCompleted=" + summary.isAllCompleted();
- }
- return "reply-" + message;
- };
-
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- String parentRunId = runtimeService.submitInbound(inbound("msg-parent-multi", "question-parent-multi"));
- assertNotNull(parentRunId);
- assertTrue(slowChildStarted.await(3, TimeUnit.SECONDS));
-
- String inspectPendingRunId = runtimeService.submitInbound(inbound("msg-check-pending", "看看这批子任务是否都完成了"));
- assertNotNull(inspectPendingRunId);
- assertTrue(waitUntil(() -> adapter.messages.stream().anyMatch(message ->
- message.contains("summary total=3 pending=1 allCompleted=false")), 5000));
-
- releaseSlowChild.countDown();
- assertTrue(waitUntil(() -> adapter.messages.stream().filter("child-finished"::equals).count() >= 3, 5000));
-
- String inspectDoneRunId = runtimeService.submitInbound(inbound("msg-check-done", "看看这批子任务是否都完成了"));
- assertNotNull(inspectDoneRunId);
- assertTrue(waitUntil(() -> adapter.messages.stream().anyMatch(message ->
- message.contains("summary total=3 pending=0 allCompleted=true")), 5000));
- } finally {
- releaseSlowChild.countDown();
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证父会话可在子任务未全部完成时返回 NO_REPLY,待全部完成后再统一汇总回复。
- *
- * @throws Exception 执行异常
- */
- @Test
- void parentCanSuppressIntermediateRepliesUntilAllChildrenComplete() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- CountDownLatch slowChildStarted = new CountDownLatch(1);
- CountDownLatch releaseSlowChild = new CountDownLatch(1);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- String message = request.getCurrentMessage();
- if ("question-parent-aggregate".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("aggregate-fast-1");
- request.getSpawnTaskSupport().spawnTask("aggregate-fast-2");
- request.getSpawnTaskSupport().spawnTask("aggregate-slow-3");
- return "parent-aggregate-waiting";
- }
- if ("aggregate-fast-1".equals(message) || "aggregate-fast-2".equals(message)) {
- return "done-" + message;
+ if (request.getCurrentSourceKind() == RuntimeSourceKind.USER_MESSAGE && "question-parent".equals(message)) {
+ request.getSpawnTaskSupport().spawnTask("research-child");
+ return "parent-waiting";
}
- if ("aggregate-slow-3".equals(message)) {
- slowChildStarted.countDown();
- assertTrue(releaseSlowChild.await(5, TimeUnit.SECONDS));
- return "done-" + message;
+ if (request.isChildRun() && "research-child".equals(message)) {
+ return "child-result";
}
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- ParentRunChildrenSummary summary = request.getRunQuerySupport().getChildSummary(null, null);
- if (summary == null || !summary.isAllCompleted()) {
- return AgentRuntimeService.NO_REPLY;
- }
- return AgentRuntimeService.FINAL_REPLY_ONCE_PREFIX
- + "final-aggregate total=" + summary.getTotalChildren()
- + " succeeded=" + summary.getSucceededChildren()
- + " failed=" + summary.getFailedChildren();
+ if (request.getCurrentSourceKind() == RuntimeSourceKind.CHILD_CONTINUATION) {
+ return AgentRuntimeService.FINAL_REPLY_ONCE_PREFIX + "final-parent-answer";
}
return "reply-" + message;
};
-
SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
+ AgentRuntimeService runtimeService = runtimeService(conversationAgent, store, scheduler, registry, properties);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- String parentRunId = runtimeService.submitInbound(inbound("msg-parent-aggregate", "question-parent-aggregate"));
+ String parentRunId = runtimeService.submitInbound(inbound("msg-parent", "question-parent"));
assertNotNull(parentRunId);
- assertTrue(slowChildStarted.await(3, TimeUnit.SECONDS));
-
- assertTrue(waitUntil(() -> runtimeService.getRun(parentRunId).getStatus() == RunStatus.WAITING_CHILDREN, 3000));
- assertTrue(waitUntil(() -> adapter.outbounds.isEmpty(), 1000));
-
- releaseSlowChild.countDown();
- assertTrue(waitUntil(() -> adapter.messages.stream().anyMatch(message ->
- message.contains("final-aggregate total=3 succeeded=3 failed=0")), 5000));
+ assertTrue(waitUntil(() -> adapter.messages.contains("final-parent-answer"), 5000));
assertEquals(1, adapter.outbounds.stream()
- .filter(outbound -> outbound.getContent().contains("final-aggregate total=3 succeeded=3 failed=0"))
+ .filter(outbound -> "final-parent-answer".equals(outbound.getContent()))
.count());
+ assertTrue(store.hasRunEventType(parentRunId, "children_aggregated"));
+ assertTrue(store.getRunEvents(parentRunId, 0).stream()
+ .map(RunEvent::getEventType)
+ .anyMatch("child_continuation_triggered"::equals));
} finally {
- releaseSlowChild.countDown();
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证同一父运行下可按 batchKey 查询指定批次的子任务聚合结果。
- *
- * @throws Exception 执行异常
- */
- @Test
- void followupMessageCanInspectChildSummaryByBatchKey() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- CountDownLatch slowBatchStarted = new CountDownLatch(1);
- CountDownLatch releaseSlowBatch = new CountDownLatch(1);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- String message = request.getCurrentMessage();
- if ("question-parent-batch".equals(message)) {
- request.getSpawnTaskSupport().spawnTask("batch-A-fast", "plan-A");
- request.getSpawnTaskSupport().spawnTask("batch-A-slow", "plan-A");
- request.getSpawnTaskSupport().spawnTask("batch-B-fast", "plan-B");
- return "batch-waiting";
- }
- if ("batch-A-fast".equals(message) || "batch-B-fast".equals(message)) {
- return "done-" + message;
- }
- if ("batch-A-slow".equals(message)) {
- slowBatchStarted.countDown();
- assertTrue(releaseSlowBatch.await(5, TimeUnit.SECONDS));
- return "done-" + message;
- }
- if (message != null && message.contains("[内部事件] 子任务已完成")) {
- return AgentRuntimeService.NO_REPLY;
- }
- if ("看看 plan-A 这批任务的情况".equals(message)) {
- ParentRunChildrenSummary summary = request.getRunQuerySupport().getChildSummary(null, "plan-A");
- if (summary == null) {
- return "plan-A-missing";
- }
- return "plan-A total=" + summary.getTotalChildren()
- + " pending=" + summary.getPendingChildren()
- + " allCompleted=" + summary.isAllCompleted();
- }
- return "reply-" + message;
- };
-
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setMaxConcurrentPerConversation(1);
- properties.getAgent().getScheduler().setAckWhenBusy(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- String parentRunId = runtimeService.submitInbound(inbound("msg-parent-batch", "question-parent-batch"));
- assertNotNull(parentRunId);
- assertTrue(slowBatchStarted.await(3, TimeUnit.SECONDS));
-
- String inspectPendingRunId = runtimeService.submitInbound(inbound("msg-planA-pending", "看看 plan-A 这批任务的情况"));
- assertNotNull(inspectPendingRunId);
- assertTrue(waitUntil(() -> adapter.messages.stream().anyMatch(message ->
- message.contains("plan-A total=2 pending=1 allCompleted=false")), 5000));
-
- releaseSlowBatch.countDown();
- String inspectDoneRunId = runtimeService.submitInbound(inbound("msg-planA-done", "看看 plan-A 这批任务的情况"));
- assertNotNull(inspectDoneRunId);
- assertTrue(waitUntil(() -> adapter.messages.stream().anyMatch(message ->
- message.contains("plan-A total=2 pending=0 allCompleted=true")), 5000));
- } finally {
- releaseSlowBatch.countDown();
scheduler.shutdown();
}
}
- /**
- * 验证系统消息不会覆盖最近一次真实外部会话路由。
- */
@Test
- void systemMessageDoesNotOverrideLatestExternalRoute() throws Exception {
+ void debugWebInboundIsHandledLikeNormalUserMessage() throws Exception {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
ConversationScheduler scheduler = new ConversationScheduler(1);
ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
- ConversationAgent conversationAgent = (request, progressConsumer) -> "reply-" + request.getCurrentMessage();
- SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setAckWhenBusy(false);
-
- try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- runtimeService.submitInbound(inbound("msg-latest", "question-latest"));
-
- ReplyTarget otherReplyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-2", "user-2");
- runtimeService.submitSystemMessage("dingtalk:group:group-2", otherReplyTarget, "scheduled-message");
-
- assertEquals("dingtalk:group:group-1", store.getLatestExternalRoute().getSessionKey());
- assertEquals("group-1", store.getLatestExternalRoute().getReplyTarget().getConversationId());
- assertTrue(waitUntil(() -> adapter.outbounds.size() >= 2, 2000));
- } finally {
- scheduler.shutdown();
- }
- }
-
- /**
- * 验证可见系统触发不会被写成用户消息,并会带着系统触发类型进入执行层。
- */
- @Test
- void visibleSystemMessageIsNotPersistedAsUserMessage() throws Exception {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
+ RecordingChannelAdapter debugAdapter = new RecordingChannelAdapter(ChannelType.DEBUG_WEB);
+ registry.register(debugAdapter);
AtomicReference lastRequest = new AtomicReference();
ConversationAgent conversationAgent = (request, progressConsumer) -> {
lastRequest.set(request);
- return "reply-" + request.getCurrentMessage();
+ return "debug-reply";
};
-
SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setAckWhenBusy(false);
+ AgentRuntimeService runtimeService = runtimeService(conversationAgent, store, scheduler, registry, properties);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- runtimeService.submitInbound(inbound("msg-user", "normal-question"));
- assertTrue(waitUntil(() -> adapter.messages.contains("reply-normal-question"), 5000));
-
- ReplyTarget replyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1");
- runtimeService.submitVisibleSystemMessage("dingtalk:group:group-1", replyTarget, "scheduled-task");
-
- assertTrue(waitUntil(() -> adapter.messages.contains("reply-scheduled-task"), 5000));
- assertEquals(InboundTriggerType.SYSTEM_VISIBLE, lastRequest.get().getCurrentMessageTriggerType());
-
- List events = store.readConversationEvents("dingtalk:group:group-1");
- assertEquals("system_event", events.get(2).getEventType());
- assertEquals("system", events.get(2).getRole());
- assertEquals("assistant_reply", events.get(3).getEventType());
- assertEquals(1L, events.get(2).getSourceUserVersion());
- assertEquals(1L, events.get(3).getSourceUserVersion());
+ InboundEnvelope inboundEnvelope = new InboundEnvelope();
+ inboundEnvelope.setMessageId("debug-1");
+ inboundEnvelope.setChannelType(ChannelType.DEBUG_WEB);
+ inboundEnvelope.setChannelInstanceId("debug-web");
+ inboundEnvelope.setSenderId("debug-user");
+ inboundEnvelope.setConversationId("debug-1");
+ inboundEnvelope.setConversationType(ConversationType.PRIVATE);
+ inboundEnvelope.setContent("hello");
+ inboundEnvelope.setReplyTarget(new ReplyTarget(ChannelType.DEBUG_WEB, ConversationType.PRIVATE, "debug-1", "debug-user"));
+ inboundEnvelope.setReceivedAt(System.currentTimeMillis());
+ inboundEnvelope.setSessionKey("debug-web:debug-1");
+ inboundEnvelope.setSourceKind(RuntimeSourceKind.USER_MESSAGE);
+
+ String runId = runtimeService.submitInbound(inboundEnvelope);
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> {
+ AgentRun run = runtimeService.getRun(runId);
+ return run != null && run.getStatus() == RunStatus.SUCCEEDED;
+ }, 5000));
+ assertEquals(RuntimeSourceKind.USER_MESSAGE, lastRequest.get().getCurrentSourceKind());
+ assertNotNull(store.getLatestExternalRoute());
+ assertEquals(ChannelType.DEBUG_WEB, store.getLatestExternalRoute().getReplyTarget().getChannelType());
+ assertEquals("debug-reply", debugAdapter.messages.get(0));
} finally {
scheduler.shutdown();
}
}
- /**
- * 验证只有声明支持进度更新的渠道才会收到运行中的增量内容。
- */
@Test
void progressIsDispatchedOnlyToProgressCapableChannel() throws Exception {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
@@ -670,10 +219,9 @@ class AgentRuntimeServiceTest {
};
SolonClawProperties properties = new SolonClawProperties();
- properties.getAgent().getScheduler().setAckWhenBusy(false);
+ AgentRuntimeService runtimeService = runtimeService(conversationAgent, store, scheduler, registry, properties);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
InboundEnvelope inboundEnvelope = new InboundEnvelope();
inboundEnvelope.setMessageId("msg-feishu");
inboundEnvelope.setChannelType(ChannelType.FEISHU);
@@ -685,6 +233,7 @@ class AgentRuntimeServiceTest {
inboundEnvelope.setReplyTarget(new ReplyTarget(ChannelType.FEISHU, ConversationType.GROUP, "oc-1", "ou-1"));
inboundEnvelope.setReceivedAt(System.currentTimeMillis());
inboundEnvelope.setSessionKey("feishu:group:oc-1");
+ inboundEnvelope.setSourceKind(RuntimeSourceKind.USER_MESSAGE);
runtimeService.submitInbound(inboundEnvelope);
@@ -697,13 +246,30 @@ class AgentRuntimeServiceTest {
}
}
- /**
- * 构造一条测试入站消息。
- *
- * @param messageId 消息标识
- * @param content 文本内容
- * @return 入站消息
- */
+ private AgentRuntimeService runtimeService(
+ ConversationAgent conversationAgent,
+ RuntimeStoreService store,
+ ConversationScheduler scheduler,
+ ChannelRegistry registry,
+ SolonClawProperties properties
+ ) {
+ SystemEventRunner systemEventRunner = new SystemEventRunner(
+ conversationAgent,
+ store,
+ scheduler,
+ registry,
+ properties
+ );
+ return new AgentRuntimeService(
+ conversationAgent,
+ store,
+ scheduler,
+ registry,
+ systemEventRunner,
+ properties
+ );
+ }
+
private InboundEnvelope inbound(String messageId, String content) {
ReplyTarget replyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1");
@@ -718,17 +284,10 @@ class AgentRuntimeServiceTest {
envelope.setReplyTarget(replyTarget);
envelope.setReceivedAt(System.currentTimeMillis());
envelope.setSessionKey("dingtalk:group:group-1");
+ envelope.setSourceKind(RuntimeSourceKind.USER_MESSAGE);
return envelope;
}
- /**
- * 轮询等待条件成立。
- *
- * @param condition 条件判断
- * @param timeoutMs 超时时间
- * @return 若条件成立则返回 true
- * @throws InterruptedException 线程中断异常
- */
private boolean waitUntil(BooleanSupplier condition, long timeoutMs) throws InterruptedException {
long deadline = System.currentTimeMillis() + timeoutMs;
while (System.currentTimeMillis() < deadline) {
@@ -740,40 +299,38 @@ class AgentRuntimeServiceTest {
return condition.getAsBoolean();
}
- /**
- * 记录测试发送内容的伪渠道适配器。
- */
private static class RecordingChannelAdapter implements ChannelAdapter {
- /** 记录发送文本。 */
- protected final List messages = new CopyOnWriteArrayList<>();
- /** 记录完整出站消息。 */
- protected final List outbounds = new CopyOnWriteArrayList<>();
-
- /**
- * 返回适配器渠道类型。
- *
- * @return 钉钉渠道
- */
+ private final ChannelType channelType;
+ protected final List messages = new CopyOnWriteArrayList();
+ protected final List outbounds = new CopyOnWriteArrayList();
+
+ private RecordingChannelAdapter() {
+ this(ChannelType.DINGTALK);
+ }
+
+ private RecordingChannelAdapter(ChannelType channelType) {
+ this.channelType = channelType;
+ }
+
@Override
public ChannelType channelType() {
- return ChannelType.DINGTALK;
+ return channelType;
}
- /**
- * 记录一次发送请求。
- *
- * @param outboundEnvelope 出站消息
- */
@Override
- public void send(OutboundEnvelope outboundEnvelope) {
+ public DeliveryResult send(OutboundEnvelope outboundEnvelope) {
outbounds.add(outboundEnvelope);
messages.add(outboundEnvelope.getContent());
+ DeliveryResult result = new DeliveryResult();
+ result.setDelivered(true);
+ result.setChannelType(channelType());
+ result.setOriginalLength(outboundEnvelope.getContent() == null ? 0 : outboundEnvelope.getContent().length());
+ result.setFinalLength(result.getOriginalLength());
+ result.setMessage("sent");
+ return result;
}
}
- /**
- * 支持进度更新的伪渠道适配器。
- */
private static class ProgressChannelAdapter extends RecordingChannelAdapter {
@Override
public ChannelType channelType() {
@@ -785,6 +342,28 @@ class AgentRuntimeServiceTest {
return true;
}
}
-}
+ @Test
+ void emptyModelResponseFallsBackToUserFriendlyFailureReply() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+
+ ConversationAgent conversationAgent = (request, progressConsumer) -> "";
+ SolonClawProperties properties = new SolonClawProperties();
+ AgentRuntimeService runtimeService = runtimeService(conversationAgent, store, scheduler, registry, properties);
+ try {
+ String runId = runtimeService.submitInbound(inbound("msg-empty", "空回复测试"));
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> !adapter.messages.isEmpty(), 5000));
+ assertEquals("这次处理没有拿到有效结果,可能是模型响应超时或解析异常。请再试一次。", adapter.messages.get(0));
+ assertEquals(RunStatus.FAILED, runtimeService.getRun(runId).getStatus());
+ assertTrue(store.hasRunEventType(runId, "llm_empty_response"));
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+}
diff --git a/src/test/java/com/jimuqu/claw/agent/runtime/HeartbeatServiceTest.java b/src/test/java/com/jimuqu/claw/agent/runtime/HeartbeatServiceTest.java
index b02c06a791b0a453af292644ebf3888e385427ba..d22eea63698e43fc855c39150431f120a914e2da 100644
--- a/src/test/java/com/jimuqu/claw/agent/runtime/HeartbeatServiceTest.java
+++ b/src/test/java/com/jimuqu/claw/agent/runtime/HeartbeatServiceTest.java
@@ -1,45 +1,35 @@
package com.jimuqu.claw.agent.runtime;
import cn.hutool.core.io.FileUtil;
-import com.jimuqu.claw.agent.channel.ChannelAdapter;
import com.jimuqu.claw.agent.channel.ChannelRegistry;
import com.jimuqu.claw.agent.model.enums.ChannelType;
import com.jimuqu.claw.agent.model.enums.ConversationType;
-import com.jimuqu.claw.agent.model.envelope.OutboundEnvelope;
+import com.jimuqu.claw.agent.model.enums.RuntimeSourceKind;
+import com.jimuqu.claw.agent.model.enums.SystemEventPolicy;
import com.jimuqu.claw.agent.model.route.ReplyTarget;
import com.jimuqu.claw.agent.runtime.api.ConversationAgent;
-import com.jimuqu.claw.agent.runtime.impl.AgentRuntimeService;
import com.jimuqu.claw.agent.runtime.impl.ConversationScheduler;
import com.jimuqu.claw.agent.runtime.impl.HeartbeatService;
+import com.jimuqu.claw.agent.runtime.impl.SystemEventRunner;
+import com.jimuqu.claw.agent.runtime.support.SystemEventRequest;
import com.jimuqu.claw.agent.store.RuntimeStoreService;
import com.jimuqu.claw.config.SolonClawProperties;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
-/**
- * 验证心跳服务只会触发静默内部运行,不会直接向外部渠道发送消息。
- */
class HeartbeatServiceTest {
- /** 临时测试目录。 */
@TempDir
Path tempDir;
- /**
- * 验证一次心跳轮询会触发静默内部运行。
- *
- * @throws Exception 执行异常
- */
@Test
- void tickRunsHeartbeatSilentlyWithoutOutboundReply() throws Exception {
+ void tickSubmitsHeartbeatEventToSystemEventRunner() {
Path workspace = tempDir.resolve("workspace");
FileUtil.mkdir(workspace.toFile());
FileUtil.writeUtf8String("请汇报当前状态", workspace.resolve("HEARTBEAT.md").toFile());
@@ -48,104 +38,77 @@ class HeartbeatServiceTest {
ReplyTarget replyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-9", "user-9");
store.rememberReplyTarget("dingtalk:group:group-9", replyTarget);
- CountDownLatch executed = new CountDownLatch(1);
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- executed.countDown();
- return "heartbeat:" + request.getCurrentMessage();
- };
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
-
SolonClawProperties properties = new SolonClawProperties();
properties.setWorkspace(workspace.toString());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- HeartbeatService heartbeatService = new HeartbeatService(runtimeService, store, properties);
+ CapturingSystemEventRunner runner = new CapturingSystemEventRunner(store, scheduler, properties);
+ HeartbeatService heartbeatService = new HeartbeatService(runner, store, properties);
heartbeatService.tick();
- assertTrue(executed.await(3, TimeUnit.SECONDS));
- Thread.sleep(200);
- assertTrue(adapter.messages.isEmpty());
- assertEquals(0, store.readConversationEvents("dingtalk:group:group-9").size());
+ SystemEventRequest request = runner.lastRequest.get();
+ assertNotNull(request);
+ assertEquals(RuntimeSourceKind.HEARTBEAT_EVENT, request.getSourceKind());
+ assertEquals(SystemEventPolicy.INTERNAL_ONLY, request.getPolicy());
+ assertEquals("请汇报当前状态", request.getContent());
+ assertEquals("dingtalk:group:group-9", request.getSessionKey());
} finally {
scheduler.shutdown();
}
}
- /**
- * 验证只有注释的 HEARTBEAT.md 不会触发心跳运行。
- */
@Test
- void tickIgnoresCommentOnlyHeartbeatFile() throws Exception {
- Path workspace = tempDir.resolve("workspace");
+ void tickIgnoresCommentOnlyHeartbeatFile() {
+ Path workspace = tempDir.resolve("workspace-comment-only");
FileUtil.mkdir(workspace.toFile());
FileUtil.writeUtf8String(
"# HEARTBEAT.md\n\n# 保持此文件为空(或仅包含注释)以跳过心跳 API 调用。\n\n# 默认注释不应触发心跳。\n",
workspace.resolve("HEARTBEAT.md").toFile()
);
- RuntimeStoreService store = new RuntimeStoreService(tempDir.resolve("runtime").toFile());
- ReplyTarget replyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-9", "user-9");
- store.rememberReplyTarget("dingtalk:group:group-9", replyTarget);
-
- CountDownLatch executed = new CountDownLatch(1);
- ConversationAgent conversationAgent = (request, progressConsumer) -> {
- executed.countDown();
- return "heartbeat:" + request.getCurrentMessage();
- };
- ConversationScheduler scheduler = new ConversationScheduler(1);
- ChannelRegistry registry = new ChannelRegistry();
- RecordingChannelAdapter adapter = new RecordingChannelAdapter();
- registry.register(adapter);
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.resolve("runtime-comment-only").toFile());
+ ReplyTarget replyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-10", "user-10");
+ store.rememberReplyTarget("dingtalk:group:group-10", replyTarget);
SolonClawProperties properties = new SolonClawProperties();
properties.setWorkspace(workspace.toString());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
try {
- AgentRuntimeService runtimeService = new AgentRuntimeService(conversationAgent, store, scheduler, registry, properties);
- HeartbeatService heartbeatService = new HeartbeatService(runtimeService, store, properties);
+ CapturingSystemEventRunner runner = new CapturingSystemEventRunner(store, scheduler, properties);
+ HeartbeatService heartbeatService = new HeartbeatService(runner, store, properties);
heartbeatService.tick();
- assertTrue(!executed.await(500, TimeUnit.MILLISECONDS));
- Thread.sleep(200);
- assertTrue(adapter.messages.isEmpty());
- assertEquals(0, store.readConversationEvents("dingtalk:group:group-9").size());
+ assertNull(runner.lastRequest.get());
} finally {
scheduler.shutdown();
}
}
- /**
- * 记录测试发送消息的伪渠道适配器。
- */
- private static class RecordingChannelAdapter implements ChannelAdapter {
- /** 收到的消息列表。 */
- private final List messages = new CopyOnWriteArrayList<>();
-
- /**
- * 返回适配器渠道类型。
- *
- * @return 钉钉渠道
- */
- @Override
- public ChannelType channelType() {
- return ChannelType.DINGTALK;
+ private static class CapturingSystemEventRunner extends SystemEventRunner {
+ private final AtomicReference lastRequest = new AtomicReference();
+
+ private CapturingSystemEventRunner(
+ RuntimeStoreService runtimeStoreService,
+ ConversationScheduler conversationScheduler,
+ SolonClawProperties properties
+ ) {
+ super(
+ (ConversationAgent) (request, progressConsumer) -> "noop",
+ runtimeStoreService,
+ conversationScheduler,
+ new ChannelRegistry(),
+ properties
+ );
}
- /**
- * 记录发送内容。
- *
- * @param outboundEnvelope 出站消息
- */
@Override
- public void send(OutboundEnvelope outboundEnvelope) {
- messages.add(outboundEnvelope.getContent());
+ public String submit(SystemEventRequest request) {
+ lastRequest.set(request);
+ return "captured-heartbeat";
}
}
}
-
diff --git a/src/test/java/com/jimuqu/claw/agent/runtime/IsolatedAgentRunServiceTest.java b/src/test/java/com/jimuqu/claw/agent/runtime/IsolatedAgentRunServiceTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..87c15e79ae5c2f51d9abcc748327d2897aab6d6a
--- /dev/null
+++ b/src/test/java/com/jimuqu/claw/agent/runtime/IsolatedAgentRunServiceTest.java
@@ -0,0 +1,153 @@
+package com.jimuqu.claw.agent.runtime;
+
+import com.jimuqu.claw.agent.channel.ChannelAdapter;
+import com.jimuqu.claw.agent.channel.ChannelRegistry;
+import com.jimuqu.claw.agent.job.AgentTurnSpec;
+import com.jimuqu.claw.agent.job.JobDeliveryMode;
+import com.jimuqu.claw.agent.model.envelope.OutboundEnvelope;
+import com.jimuqu.claw.agent.model.enums.ChannelType;
+import com.jimuqu.claw.agent.model.enums.ConversationType;
+import com.jimuqu.claw.agent.model.enums.RunStatus;
+import com.jimuqu.claw.agent.model.route.ReplyTarget;
+import com.jimuqu.claw.agent.model.run.AgentRun;
+import com.jimuqu.claw.agent.runtime.api.ConversationAgent;
+import com.jimuqu.claw.agent.runtime.impl.ConversationScheduler;
+import com.jimuqu.claw.agent.runtime.impl.IsolatedAgentRunService;
+import com.jimuqu.claw.agent.runtime.support.AgentTurnRequest;
+import com.jimuqu.claw.agent.runtime.support.DeliveryResult;
+import com.jimuqu.claw.agent.store.RuntimeStoreService;
+import com.jimuqu.claw.config.SolonClawProperties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BooleanSupplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class IsolatedAgentRunServiceTest {
+ @TempDir
+ Path tempDir;
+
+ @Test
+ void deliveryNoneSuppressesFinalReply() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) -> "自动化结果";
+ IsolatedAgentRunService service = new IsolatedAgentRunService(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ String runId = service.submit(request(JobDeliveryMode.NONE, null));
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> {
+ AgentRun run = store.getRun(runId);
+ return run != null && run.getStatus() == RunStatus.SUCCEEDED;
+ }, 5000));
+ assertTrue(adapter.outbounds.isEmpty());
+ assertTrue(store.hasRunEventType(runId, "delivery_suppressed"));
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ void boundReplyTargetDeliverySendsFinalReply() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) -> "自动化结果";
+ IsolatedAgentRunService service = new IsolatedAgentRunService(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ ReplyTarget boundReplyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1");
+ String runId = service.submit(request(JobDeliveryMode.BOUND_REPLY_TARGET, boundReplyTarget));
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> adapter.messages.contains("自动化结果"), 5000));
+ assertEquals("group-1", adapter.outbounds.get(0).getReplyTarget().getConversationId());
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ void lastRouteDeliveryUsesLatestExternalRoute() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ReplyTarget latestReplyTarget = new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-last", "user-last");
+ store.rememberReplyTarget("dingtalk:group:group-last", latestReplyTarget);
+
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) -> "自动化结果";
+ IsolatedAgentRunService service = new IsolatedAgentRunService(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ String runId = service.submit(request(JobDeliveryMode.LAST_ROUTE, null));
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> adapter.messages.contains("自动化结果"), 5000));
+ assertEquals("group-last", adapter.outbounds.get(0).getReplyTarget().getConversationId());
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ private AgentTurnRequest request(JobDeliveryMode deliveryMode, ReplyTarget boundReplyTarget) {
+ AgentTurnRequest request = new AgentTurnRequest();
+ request.setSourceKind(com.jimuqu.claw.agent.model.enums.RuntimeSourceKind.JOB_AGENT_TURN);
+ request.setJobName("agent-job");
+ request.setBoundSessionKey("dingtalk:group:group-1");
+ request.setBoundReplyTarget(boundReplyTarget);
+ request.setDeliveryMode(deliveryMode);
+ AgentTurnSpec spec = new AgentTurnSpec();
+ spec.setMessage("执行自动化任务");
+ request.setAgentTurn(spec);
+ return request;
+ }
+
+ private boolean waitUntil(BooleanSupplier condition, long timeoutMs) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + timeoutMs;
+ while (System.currentTimeMillis() < deadline) {
+ if (condition.getAsBoolean()) {
+ return true;
+ }
+ Thread.sleep(50);
+ }
+ return condition.getAsBoolean();
+ }
+
+ private static class RecordingChannelAdapter implements ChannelAdapter {
+ private final List messages = new CopyOnWriteArrayList();
+ private final List outbounds = new CopyOnWriteArrayList();
+
+ @Override
+ public ChannelType channelType() {
+ return ChannelType.DINGTALK;
+ }
+
+ @Override
+ public DeliveryResult send(OutboundEnvelope outboundEnvelope) {
+ outbounds.add(outboundEnvelope);
+ messages.add(outboundEnvelope.getContent());
+ DeliveryResult result = new DeliveryResult();
+ result.setDelivered(true);
+ result.setChannelType(channelType());
+ result.setOriginalLength(outboundEnvelope.getContent() == null ? 0 : outboundEnvelope.getContent().length());
+ result.setFinalLength(result.getOriginalLength());
+ result.setMessage("sent");
+ return result;
+ }
+ }
+}
diff --git a/src/test/java/com/jimuqu/claw/agent/runtime/SystemEventRunnerTest.java b/src/test/java/com/jimuqu/claw/agent/runtime/SystemEventRunnerTest.java
new file mode 100644
index 0000000000000000000000000000000000000000..6e87ee5b7ab7741eb21414e1a52f071e4a952e51
--- /dev/null
+++ b/src/test/java/com/jimuqu/claw/agent/runtime/SystemEventRunnerTest.java
@@ -0,0 +1,218 @@
+package com.jimuqu.claw.agent.runtime;
+
+import com.jimuqu.claw.agent.channel.ChannelAdapter;
+import com.jimuqu.claw.agent.channel.ChannelRegistry;
+import com.jimuqu.claw.agent.model.envelope.OutboundEnvelope;
+import com.jimuqu.claw.agent.model.enums.ChannelType;
+import com.jimuqu.claw.agent.model.enums.ConversationType;
+import com.jimuqu.claw.agent.model.enums.RuntimeSourceKind;
+import com.jimuqu.claw.agent.model.enums.RunStatus;
+import com.jimuqu.claw.agent.model.enums.SystemEventPolicy;
+import com.jimuqu.claw.agent.model.route.ReplyTarget;
+import com.jimuqu.claw.agent.model.run.AgentRun;
+import com.jimuqu.claw.agent.runtime.api.ConversationAgent;
+import com.jimuqu.claw.agent.runtime.impl.ConversationScheduler;
+import com.jimuqu.claw.agent.runtime.impl.SystemEventRunner;
+import com.jimuqu.claw.agent.runtime.support.DeliveryResult;
+import com.jimuqu.claw.agent.runtime.support.NotificationResult;
+import com.jimuqu.claw.agent.runtime.support.SystemEventRequest;
+import com.jimuqu.claw.agent.store.RuntimeStoreService;
+import com.jimuqu.claw.config.SolonClawProperties;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.BooleanSupplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class SystemEventRunnerTest {
+ @TempDir
+ Path tempDir;
+
+ @Test
+ void jobSystemEventCanFallbackDeliverReminderText() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) -> "记得喝水。";
+ SystemEventRunner runner = new SystemEventRunner(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ SystemEventRequest request = new SystemEventRequest();
+ request.setSourceKind(RuntimeSourceKind.JOB_SYSTEM_EVENT);
+ request.setPolicy(SystemEventPolicy.USER_VISIBLE_OPTIONAL);
+ request.setSessionKey("dingtalk:group:group-1");
+ request.setReplyTarget(new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1"));
+ request.setContent("提醒我喝水");
+ request.setAllowNotifyUser(true);
+
+ String runId = runner.submit(request);
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> adapter.messages.contains("记得喝水。"), 5000));
+ assertTrue(store.hasRunEventType(runId, "delivery_fallback_sent"));
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ void heartbeatEventSuppressesPlainUserVisibleReplyByDefault() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) -> "系统状态正常";
+ SystemEventRunner runner = new SystemEventRunner(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ SystemEventRequest request = new SystemEventRequest();
+ request.setSourceKind(RuntimeSourceKind.HEARTBEAT_EVENT);
+ request.setPolicy(SystemEventPolicy.INTERNAL_ONLY);
+ request.setSessionKey("dingtalk:group:group-1");
+ request.setReplyTarget(new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1"));
+ request.setContent("heartbeat");
+ request.setAllowNotifyUser(true);
+
+ String runId = runner.submit(request);
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> {
+ AgentRun run = store.getRun(runId);
+ return run != null && run.getStatus() == RunStatus.SUCCEEDED;
+ }, 5000));
+ assertTrue(adapter.outbounds.isEmpty());
+ assertTrue(store.hasRunEventType(runId, "delivery_suppressed"));
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ void heartbeatEventCanNotifyUserExplicitly() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) -> {
+ NotificationResult result = request.getNotificationSupport().notifyUser("需要人工关注", false);
+ assertTrue(result.isDelivered());
+ return "NO_REPLY";
+ };
+ SystemEventRunner runner = new SystemEventRunner(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ SystemEventRequest request = new SystemEventRequest();
+ request.setSourceKind(RuntimeSourceKind.HEARTBEAT_EVENT);
+ request.setPolicy(SystemEventPolicy.INTERNAL_ONLY);
+ request.setSessionKey("dingtalk:group:group-1");
+ request.setReplyTarget(new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1"));
+ request.setContent("heartbeat");
+ request.setAllowNotifyUser(true);
+
+ String runId = runner.submit(request);
+ assertNotNull(runId);
+ assertTrue(waitUntil(() -> adapter.messages.contains("需要人工关注"), 5000));
+ assertEquals(1, adapter.outbounds.size());
+ assertTrue(store.hasRunEventType(runId, "notify"));
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ @Test
+ void childContinuationDeliversOnlyFinalReplyOnce() throws Exception {
+ RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
+ ConversationScheduler scheduler = new ConversationScheduler(1);
+ ChannelRegistry registry = new ChannelRegistry();
+ RecordingChannelAdapter adapter = new RecordingChannelAdapter();
+ registry.register(adapter);
+ SolonClawProperties properties = new SolonClawProperties();
+ ConversationAgent conversationAgent = (request, progressConsumer) ->
+ "FINAL_REPLY_ONCE:聚合完成";
+ SystemEventRunner runner = new SystemEventRunner(conversationAgent, store, scheduler, registry, properties);
+
+ try {
+ AgentRun parentRun = new AgentRun();
+ parentRun.setRunId("parent-run");
+ parentRun.setSessionKey("dingtalk:group:group-1");
+ parentRun.setSourceKind(RuntimeSourceKind.USER_MESSAGE);
+ parentRun.setSourceUserVersion(1L);
+ store.saveRun(parentRun);
+
+ AgentRun child = new AgentRun();
+ child.setRunId("child-1");
+ child.setParentRunId("parent-run");
+ child.setParentSessionKey("dingtalk:group:group-1");
+ child.setStatus(RunStatus.SUCCEEDED);
+ store.saveRun(child);
+
+ SystemEventRequest request = new SystemEventRequest();
+ request.setSourceKind(RuntimeSourceKind.CHILD_CONTINUATION);
+ request.setPolicy(SystemEventPolicy.AGGREGATE_ONLY);
+ request.setSessionKey("dingtalk:group:group-1");
+ request.setReplyTarget(new ReplyTarget(ChannelType.DINGTALK, ConversationType.GROUP, "group-1", "user-1"));
+ request.setContent("child completed");
+ request.setRelatedRunId("parent-run");
+
+ String firstRunId = runner.submit(request);
+ assertNotNull(firstRunId);
+ assertTrue(waitUntil(() -> adapter.messages.contains("聚合完成"), 5000));
+ assertTrue(store.hasRunEventType("parent-run", "children_aggregated"));
+
+ String secondRunId = runner.submit(request);
+ assertNotNull(secondRunId);
+ assertTrue(waitUntil(() -> {
+ AgentRun run = store.getRun(secondRunId);
+ return run != null && run.getStatus() == RunStatus.SUCCEEDED;
+ }, 5000));
+ assertEquals(1, adapter.outbounds.stream().filter(outbound -> "聚合完成".equals(outbound.getContent())).count());
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
+ private boolean waitUntil(BooleanSupplier condition, long timeoutMs) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + timeoutMs;
+ while (System.currentTimeMillis() < deadline) {
+ if (condition.getAsBoolean()) {
+ return true;
+ }
+ Thread.sleep(50);
+ }
+ return condition.getAsBoolean();
+ }
+
+ private static class RecordingChannelAdapter implements ChannelAdapter {
+ private final List messages = new CopyOnWriteArrayList();
+ private final List outbounds = new CopyOnWriteArrayList();
+
+ @Override
+ public ChannelType channelType() {
+ return ChannelType.DINGTALK;
+ }
+
+ @Override
+ public DeliveryResult send(OutboundEnvelope outboundEnvelope) {
+ outbounds.add(outboundEnvelope);
+ messages.add(outboundEnvelope.getContent());
+ DeliveryResult result = new DeliveryResult();
+ result.setDelivered(true);
+ result.setChannelType(channelType());
+ result.setOriginalLength(outboundEnvelope.getContent() == null ? 0 : outboundEnvelope.getContent().length());
+ result.setFinalLength(result.getOriginalLength());
+ result.setMessage("sent");
+ return result;
+ }
+ }
+}
diff --git a/src/test/java/com/jimuqu/claw/agent/store/RuntimeStoreServiceTest.java b/src/test/java/com/jimuqu/claw/agent/store/RuntimeStoreServiceTest.java
index b5d7e7c04f06cefeacedce77f2a24bd37e285947..24e25ddab8240fecad2648d2ee5f9368ce8d88d6 100644
--- a/src/test/java/com/jimuqu/claw/agent/store/RuntimeStoreServiceTest.java
+++ b/src/test/java/com/jimuqu/claw/agent/store/RuntimeStoreServiceTest.java
@@ -1,14 +1,14 @@
package com.jimuqu.claw.agent.store;
-import com.jimuqu.claw.agent.model.run.AgentRun;
+import com.jimuqu.claw.agent.model.envelope.InboundEnvelope;
import com.jimuqu.claw.agent.model.enums.ChannelType;
-import com.jimuqu.claw.agent.model.event.ConversationEvent;
import com.jimuqu.claw.agent.model.enums.ConversationType;
-import com.jimuqu.claw.agent.model.envelope.InboundEnvelope;
-import com.jimuqu.claw.agent.model.enums.InboundTriggerType;
-import com.jimuqu.claw.agent.model.route.ReplyTarget;
-import com.jimuqu.claw.agent.model.event.RunEvent;
+import com.jimuqu.claw.agent.model.enums.RuntimeSourceKind;
import com.jimuqu.claw.agent.model.enums.RunStatus;
+import com.jimuqu.claw.agent.model.event.ConversationEvent;
+import com.jimuqu.claw.agent.model.event.RunEvent;
+import com.jimuqu.claw.agent.model.route.ReplyTarget;
+import com.jimuqu.claw.agent.model.run.AgentRun;
import com.jimuqu.claw.agent.runtime.support.ParentRunChildrenSummary;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -21,28 +21,21 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-/**
- * 验证运行时存储服务的持久化和恢复行为。
- */
class RuntimeStoreServiceTest {
- /** 临时测试目录。 */
@TempDir
Path tempDir;
- /**
- * 验证历史消息会按入站顺序重建。
- */
@Test
void loadConversationHistoryBeforeKeepsInboundOrder() {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- InboundEnvelope first = inbound("session-a", "msg-1", "first");
- InboundEnvelope second = inbound("session-a", "msg-2", "second");
+ InboundEnvelope first = inbound("session-a", "msg-1", "first", RuntimeSourceKind.USER_MESSAGE);
+ InboundEnvelope second = inbound("session-a", "msg-2", "second", RuntimeSourceKind.USER_MESSAGE);
long firstVersion = store.appendInboundConversationEvent(first);
long secondVersion = store.appendInboundConversationEvent(second);
- store.appendAssistantConversationEvent("session-a", "run-1", "msg-1", firstVersion, "reply-first");
- store.appendAssistantConversationEvent("session-a", "run-2", "msg-2", secondVersion, "reply-second");
+ store.appendAssistantConversationEvent("session-a", "run-1", "msg-1", firstVersion, RuntimeSourceKind.USER_MESSAGE, "reply-first");
+ store.appendAssistantConversationEvent("session-a", "run-2", "msg-2", secondVersion, RuntimeSourceKind.USER_MESSAGE, "reply-second");
List history = store.loadConversationHistoryBefore("session-a", 5L);
@@ -53,15 +46,13 @@ class RuntimeStoreServiceTest {
assertEquals("reply-second", history.get(3).getContent());
}
- /**
- * 验证重启后未完成任务会被标记为中止。
- */
@Test
void marksIncompleteRunsAbortedOnStartup() {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
AgentRun run = new AgentRun();
run.setRunId("run-abort");
run.setSessionKey("debug-web:test");
+ run.setSourceKind(RuntimeSourceKind.USER_MESSAGE);
run.setStatus(RunStatus.RUNNING);
run.setCreatedAt(System.currentTimeMillis());
store.saveRun(run);
@@ -74,11 +65,9 @@ class RuntimeStoreServiceTest {
List events = restarted.getRunEvents("run-abort", 0);
assertEquals("aborted", events.get(events.size() - 1).getMessage());
+ assertEquals(RuntimeSourceKind.USER_MESSAGE, events.get(0).getSourceKind());
}
- /**
- * 验证最近外部路由会带上会话键一起保存。
- */
@Test
void remembersLatestExternalRouteWithSessionKey() {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
@@ -91,14 +80,11 @@ class RuntimeStoreServiceTest {
assertEquals("cid", store.getReplyTarget("dingtalk:group:cid").getConversationId());
}
- /**
- * 验证结构化子任务事件会以系统消息形式进入会话历史。
- */
@Test
void loadConversationHistoryBeforeIncludesStructuredChildEvents() {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- InboundEnvelope parent = inbound("session-a", "msg-1", "parent-question");
+ InboundEnvelope parent = inbound("session-a", "msg-1", "parent-question", RuntimeSourceKind.USER_MESSAGE);
long parentVersion = store.appendInboundConversationEvent(parent);
AgentRun childRun = new AgentRun();
@@ -121,129 +107,37 @@ class RuntimeStoreServiceTest {
assertTrue(history.get(2).getContent().contains("result="));
}
- /**
- * 验证可见系统消息会以 system 角色写入会话事件。
- */
@Test
- void appendInboundConversationEventUsesSystemRoleForVisibleSystemTrigger() {
+ void appendInboundConversationEventUsesSourceKindForRoleAndEventType() {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- InboundEnvelope user = inbound("session-a", "msg-1", "question");
+ InboundEnvelope user = inbound("session-a", "msg-1", "question", RuntimeSourceKind.USER_MESSAGE);
long userVersion = store.appendInboundConversationEvent(user);
- InboundEnvelope system = inbound("session-a", "system-1", "scheduled-task");
+ InboundEnvelope system = inbound("session-a", "system-1", "heartbeat", RuntimeSourceKind.HEARTBEAT_EVENT);
system.setChannelType(ChannelType.SYSTEM);
- system.setTriggerType(InboundTriggerType.SYSTEM_VISIBLE);
system.setHistoryAnchorVersion(userVersion);
-
long systemVersion = store.appendInboundConversationEvent(system);
- List events = store.readConversationEvents("session-a");
+ List events = store.readConversationEvents("session-a");
assertEquals(2L, systemVersion);
assertEquals("user_message", events.get(0).getEventType());
+ assertEquals("user", events.get(0).getRole());
+ assertEquals(RuntimeSourceKind.USER_MESSAGE, events.get(0).getSourceKind());
assertEquals("system_event", events.get(1).getEventType());
assertEquals("system", events.get(1).getRole());
- assertEquals(userVersion, events.get(1).getSourceUserVersion());
+ assertEquals(RuntimeSourceKind.HEARTBEAT_EVENT, events.get(1).getSourceKind());
}
- /**
- * 验证同一锚点下的系统事件与回复会按事件版本顺序重建。
- */
- @Test
- void loadConversationHistoryBeforeKeepsAnchoredSystemEventOrder() {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
-
- InboundEnvelope user = inbound("session-a", "msg-1", "question");
- long userVersion = store.appendInboundConversationEvent(user);
- store.appendAssistantConversationEvent("session-a", "run-1", "msg-1", userVersion, "reply-user");
-
- InboundEnvelope system = inbound("session-a", "system-1", "scheduled-task");
- system.setChannelType(ChannelType.SYSTEM);
- system.setTriggerType(InboundTriggerType.SYSTEM_VISIBLE);
- system.setHistoryAnchorVersion(userVersion);
- long systemVersion = store.appendInboundConversationEvent(system);
- store.appendAssistantConversationEvent("session-a", "run-2", "system-1", userVersion, "reply-system");
-
- List history = store.loadConversationHistoryBefore("session-a", systemVersion + 10L);
-
- assertEquals(4, history.size());
- assertEquals("question", history.get(0).getContent());
- assertEquals("reply-user", history.get(1).getContent());
- assertEquals("SYSTEM", history.get(2).getRole().toString());
- assertEquals("scheduled-task", history.get(2).getContent());
- assertEquals("reply-system", history.get(3).getContent());
- }
-
- /**
- * 验证可按父运行聚合多个子任务状态。
- */
- @Test
- void summarizeChildRunsByParentRun() {
- RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
-
- AgentRun child1 = new AgentRun();
- child1.setRunId("child-1");
- child1.setParentRunId("parent-1");
- child1.setParentSessionKey("session-a");
- child1.setTaskDescription("task-1");
- child1.setStatus(RunStatus.SUCCEEDED);
- child1.setCreatedAt(1L);
- store.saveRun(child1);
-
- AgentRun child2 = new AgentRun();
- child2.setRunId("child-2");
- child2.setParentRunId("parent-1");
- child2.setParentSessionKey("session-a");
- child2.setTaskDescription("task-2");
- child2.setStatus(RunStatus.RUNNING);
- child2.setCreatedAt(2L);
- store.saveRun(child2);
-
- ParentRunChildrenSummary summary = store.summarizeChildRuns("parent-1");
-
- assertEquals("parent-1", summary.getParentRunId());
- assertEquals(2, summary.getTotalChildren());
- assertEquals(1, summary.getSucceededChildren());
- assertEquals(0, summary.getFailedChildren());
- assertEquals(1, summary.getPendingChildren());
- assertTrue(!summary.isAllCompleted());
- }
-
- /**
- * 验证可按 batchKey 只聚合同一父运行下的一批子任务。
- */
@Test
void summarizeChildRunsByParentRunAndBatchKey() {
RuntimeStoreService store = new RuntimeStoreService(tempDir.toFile());
- AgentRun batchA1 = new AgentRun();
- batchA1.setRunId("child-a1");
- batchA1.setParentRunId("parent-1");
- batchA1.setParentSessionKey("session-a");
- batchA1.setBatchKey("plan-A");
- batchA1.setTaskDescription("task-a1");
- batchA1.setStatus(RunStatus.SUCCEEDED);
- batchA1.setCreatedAt(1L);
+ AgentRun batchA1 = child("child-a1", "parent-1", "session-a", "plan-A", RunStatus.SUCCEEDED, 1L);
+ AgentRun batchA2 = child("child-a2", "parent-1", "session-a", "plan-A", RunStatus.RUNNING, 2L);
+ AgentRun batchB1 = child("child-b1", "parent-1", "session-a", "plan-B", RunStatus.SUCCEEDED, 3L);
store.saveRun(batchA1);
-
- AgentRun batchA2 = new AgentRun();
- batchA2.setRunId("child-a2");
- batchA2.setParentRunId("parent-1");
- batchA2.setParentSessionKey("session-a");
- batchA2.setBatchKey("plan-A");
- batchA2.setTaskDescription("task-a2");
- batchA2.setStatus(RunStatus.RUNNING);
- batchA2.setCreatedAt(2L);
store.saveRun(batchA2);
-
- AgentRun batchB1 = new AgentRun();
- batchB1.setRunId("child-b1");
- batchB1.setParentRunId("parent-1");
- batchB1.setParentSessionKey("session-a");
- batchB1.setBatchKey("plan-B");
- batchB1.setTaskDescription("task-b1");
- batchB1.setStatus(RunStatus.SUCCEEDED);
- batchB1.setCreatedAt(3L);
store.saveRun(batchB1);
ParentRunChildrenSummary summary = store.summarizeChildRuns("parent-1", "plan-A");
@@ -255,15 +149,7 @@ class RuntimeStoreServiceTest {
assertTrue(!summary.isAllCompleted());
}
- /**
- * 构造一条简化版入站消息。
- *
- * @param sessionKey 会话键
- * @param messageId 消息标识
- * @param content 文本内容
- * @return 入站消息
- */
- private InboundEnvelope inbound(String sessionKey, String messageId, String content) {
+ private InboundEnvelope inbound(String sessionKey, String messageId, String content, RuntimeSourceKind sourceKind) {
InboundEnvelope envelope = new InboundEnvelope();
envelope.setSessionKey(sessionKey);
envelope.setMessageId(messageId);
@@ -273,8 +159,19 @@ class RuntimeStoreServiceTest {
envelope.setSenderId("user");
envelope.setContent(content);
envelope.setReceivedAt(System.currentTimeMillis());
+ envelope.setSourceKind(sourceKind);
return envelope;
}
-}
-
+ private AgentRun child(String runId, String parentRunId, String parentSessionKey, String batchKey, RunStatus status, long createdAt) {
+ AgentRun run = new AgentRun();
+ run.setRunId(runId);
+ run.setParentRunId(parentRunId);
+ run.setParentSessionKey(parentSessionKey);
+ run.setBatchKey(batchKey);
+ run.setStatus(status);
+ run.setCreatedAt(createdAt);
+ run.setSourceKind(RuntimeSourceKind.USER_MESSAGE);
+ return run;
+ }
+}
diff --git a/src/test/java/com/jimuqu/claw/channel/dingtalk/DingTalkRobotSenderTest.java b/src/test/java/com/jimuqu/claw/channel/dingtalk/DingTalkRobotSenderTest.java
index af3d819ec8291cdecef718b9fe62114b671bfa00..75f52f126799a2201e0cface9892f980278732ab 100644
--- a/src/test/java/com/jimuqu/claw/channel/dingtalk/DingTalkRobotSenderTest.java
+++ b/src/test/java/com/jimuqu/claw/channel/dingtalk/DingTalkRobotSenderTest.java
@@ -27,6 +27,38 @@ class DingTalkRobotSenderTest {
assertEquals("SolonClaw", sender.resolveMarkdownTitle(" "));
}
+
+ @Test
+ void truncatesMarkdownPayloadWhenCharacterCountExceedsLimit() throws Exception {
+ DingTalkRobotSender sender = new DingTalkRobotSender(null, new DingTalkProperties(), null);
+ String content = repeat("a", 7000);
+
+ String payload = sender.markdownMessageParam(content);
+ JSONObject json = JSONObject.parseObject(payload);
+ String text = json.getString("text");
+
+ assertTrue(text.contains("已截断"));
+ assertTrue(text.length() <= 5000);
+ }
+
+ @Test
+ void truncatesMarkdownPayloadBySimpleLengthLimit() throws Exception {
+ DingTalkRobotSender sender = new DingTalkRobotSender(null, new DingTalkProperties(), null);
+ String content = repeat("中", 6500);
+
+ String normalized = sender.normalizeMarkdownContent(content);
+
+ assertTrue(normalized.contains("已截断"));
+ assertTrue(normalized.length() <= 5000);
+ }
+
+ private String repeat(String value, int count) {
+ StringBuilder builder = new StringBuilder(value.length() * count);
+ for (int i = 0; i < count; i++) {
+ builder.append(value);
+ }
+ return builder.toString();
+ }
}
diff --git a/src/test/java/com/jimuqu/claw/channel/feishu/FeishuBotSenderTest.java b/src/test/java/com/jimuqu/claw/channel/feishu/FeishuBotSenderTest.java
index b7131e4c304f0e55f497490fd9942be789e07a65..7f152cae5345f523f4b8582bf5c940763eae40b2 100644
--- a/src/test/java/com/jimuqu/claw/channel/feishu/FeishuBotSenderTest.java
+++ b/src/test/java/com/jimuqu/claw/channel/feishu/FeishuBotSenderTest.java
@@ -53,7 +53,6 @@ class FeishuBotSenderTest {
assertEquals("msg-1", gateway.patchedMessageIds.get(1));
assertTrue(gateway.patchedContents.get(1).contains("final-answer"));
}
-
private OutboundEnvelope outbound(String runId, String content, boolean progress) {
OutboundEnvelope outboundEnvelope = new OutboundEnvelope();
outboundEnvelope.setRunId(runId);
diff --git a/src/test/java/com/jimuqu/claw/config/SolonClawConfigTest.java b/src/test/java/com/jimuqu/claw/config/SolonClawConfigTest.java
index bc745ce058ec22df9e23eed66e0e57ae8a9a5a99..3d2c39ee912beffa1bde52e5c5cf5ae90f47682b 100644
--- a/src/test/java/com/jimuqu/claw/config/SolonClawConfigTest.java
+++ b/src/test/java/com/jimuqu/claw/config/SolonClawConfigTest.java
@@ -1,8 +1,10 @@
package com.jimuqu.claw.config;
import com.jimuqu.claw.agent.workspace.AgentWorkspaceService;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.noear.solon.ai.agent.react.ReActInterceptor;
import org.noear.solon.ai.skills.cli.CliSkillProvider;
import java.lang.reflect.Field;
@@ -33,4 +35,13 @@ class SolonClawConfigTest {
assertFalse((Boolean) sandboxModeField.get(skillProvider.getTerminalSkill()));
}
+
+ @Test
+ void reactLoggingInterceptorBeanCreated() {
+ SolonClawConfig config = new SolonClawConfig();
+
+ ReActInterceptor interceptor = config.reActLoggingInterceptor();
+
+ Assertions.assertNotNull(interceptor);
+ }
}