diff --git a/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeDeserializerFactory.java b/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeDeserializerFactory.java index c0398a2f3b76b2e4cf43aad1d3054b9764801f0f..7e62de7a7bafaa1310e573c87d7a137a1bb3de17 100644 --- a/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeDeserializerFactory.java +++ b/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeDeserializerFactory.java @@ -6,7 +6,6 @@ import com.inspur.edp.ai.flow.metadata.AbsNode; import com.inspur.edp.common.type.enhanced.EnhancedServiceLoader; import com.inspur.edp.common.type.utils.JsonPropUtil; import com.inspur.edp.common.type.utils.JsonSerializeUtil; -import io.iec.edp.caf.commons.utils.SpringBeanUtils; import io.iec.edp.caf.commons.utils.StringUtils; import java.util.HashMap; @@ -53,7 +52,8 @@ public class NodeDeserializerFactory { } NodeDeserializer nodeDeserializer = NodeDeserializerFactory.get(expressKind); if (nodeDeserializer == null) { - throw new FlowException(String.format("There is no %s expression deserializer", expressKind)); + nodeDeserializer = NodeDeserializerFactory.get("device"); +// throw new FlowException(String.format("There is no %s expression deserializer", expressKind)); } if (!jsonObject.isObject()) { throw new FlowException("json node must be object node."); diff --git a/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeExecutorFactory.java b/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeExecutorFactory.java index b0e833ad85d1fb12c2287d2970f399b2245fd617..150053da3ddd6423f5ea51087db0443b28c2d080 100644 --- a/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeExecutorFactory.java +++ b/ai-flow-api/src/main/java/com/inspur/edp/ai/flow/spi/NodeExecutorFactory.java @@ -2,7 +2,6 @@ package com.inspur.edp.ai.flow.spi; import com.inspur.edp.ai.flow.exception.FlowException; import com.inspur.edp.common.type.enhanced.EnhancedServiceLoader; -import io.iec.edp.caf.commons.utils.SpringBeanUtils; import io.iec.edp.caf.commons.utils.StringUtils; import java.util.List; @@ -40,7 +39,8 @@ public class NodeExecutorFactory { if (executor != null) { return executor; } else { - throw new FlowException("not find node executor provider " + kind); + return PROVIDER_MAP.get("device"); +// throw new FlowException("not find node executor provider " + kind); } } diff --git a/ai-flow-mcp/pom.xml b/ai-flow-mcp/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..972a44445d464ecd465aa66d181889450fd36a28 --- /dev/null +++ b/ai-flow-mcp/pom.xml @@ -0,0 +1,34 @@ + + + + ai-flow-backend + com.inspur.edp + 1.0.0 + + 4.0.0 + + ai-flow-mcp + + + com.inspur.edp + ai-flow-api + 1.0.0 + compile + + + com.inspur.edp + ai-flow-node + 1.0.0 + compile + + + org.java-websocket + Java-WebSocket + 1.5.2 + + + + + \ No newline at end of file diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/controller/McpDebugFestController.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/controller/McpDebugFestController.java new file mode 100644 index 0000000000000000000000000000000000000000..cc2b720892160d07b92556594a8e611069a17c8a --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/controller/McpDebugFestController.java @@ -0,0 +1,28 @@ +package com.inspur.edp.ai.flow.mcp.controller; + +import com.inspur.edp.ai.flow.mcp.event.WebSocketEvent; +import com.inspur.edp.ai.flow.mcp.tool.NodeGenerateServer; +import com.inspur.edp.ai.flow.mcp.vo.FlowRequest; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +/** + * @author xww + * @Description 启动websocker服务 + * @createTime 2026年01月21日 08:59:00 + */ +@RestController +@RequestMapping("aiflow/mcp") +public class McpDebugFestController { + @PostMapping("/mcptest") + public String generateFlow(FlowRequest request) { + return NodeGenerateServer.generateFlow(request); + } + + @GetMapping("/startWs") + public void startWs() { + WebSocketEvent.startWs(); + } +} diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/event/WebSocketEvent.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/event/WebSocketEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..33fd618eca48e7012a2180595bb5d062a75c6e2c --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/event/WebSocketEvent.java @@ -0,0 +1,98 @@ +package com.inspur.edp.ai.flow.mcp.event; + + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * @author xww + * @Description websocket测试类 + * @createTime 2026年02月10日 09:38:00 + */ +@Slf4j +public class WebSocketEvent extends WebSocketClient { + public WebSocketEvent(URI serverUri) { + super(serverUri); + } + @Override + public void onOpen(ServerHandshake handshakedata) { + log.info("新连接已打开"); + } + + @Override + public void onMessage(String message) { + log.info("收到消息: " + message); + ObjectMapper objectMapper = new ObjectMapper(); + Map map=new HashMap<>(); + try { + JsonNode jsonNode = objectMapper.readTree(message); + JsonNode dataNode=jsonNode.get("data"); + String deviceId=dataNode.get("deviceId").asText(); + String deviceCategory=dataNode.get("category")==null?"":dataNode.get("category").asText(); + map.put("deviceCategory",deviceCategory); + map.put("deviceId",deviceId); + map.put("kind","deviceEventListen"); + JsonNode payload=dataNode.get("payload"); + if(payload==null||!payload.isObject()){ + return; + } + //获取事件名称 + Iterator fieldNames = payload.fieldNames(); + String deviceEvent=""; + while (fieldNames.hasNext()) { + deviceEvent = fieldNames.next(); + log.info("事件名称: " + deviceEvent); + } + JsonNode eventParams=payload.get(deviceEvent); + if(eventParams!=null&&eventParams.isObject()){ + Iterator paramNames = eventParams.fieldNames(); + Map outputParams=new HashMap<>(); + while (paramNames.hasNext()) { + String paramName = paramNames.next(); + JsonNode paramValue=eventParams.get(paramName); + outputParams.put(paramName,paramValue); + } + map.put("outputParams",outputParams); + } + JsonNode node = objectMapper.valueToTree(map); + log.error("===============================输出参数: " + objectMapper.writeValueAsString(node)); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + } + + @Override + public void onClose(int code, String reason, boolean remote) { + log.info("连接已关闭: " + reason); + } + + @Override + public void onError(Exception ex) { + log.info("发生错误: " + ex.getMessage()); + } + + + public static void main(String[] args) { + startWs(); + } + + public static void startWs() { + try { + WebSocketEvent client = new WebSocketEvent(new URI("ws://139.196.239.110:5174/ws")); // 替换为你的WebSocket服务器地址和端点 + client.connect(); // 尝试连接服务器 + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } +} diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/factory/NodeFactory.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/factory/NodeFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..7453ccc2a4a1fa082ab2c3735b863f5d511d060a --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/factory/NodeFactory.java @@ -0,0 +1,37 @@ +package com.inspur.edp.ai.flow.mcp.factory; + +import com.inspur.edp.ai.flow.metadata.AbsNode; +import com.inspur.edp.ai.flow.node.*; + +/** + * @author xww + * @Description 根据类型获取节点 + * @createTime 2026年01月20日 09:38:00 + */ +public class NodeFactory { + public static AbsNode getNodeByType(String type){ + switch (type){ + case VariableDefNode.KIND: + return new VariableDefNode(); + case StartNode.KIND: + return new StartNode(); + case EndNode.KIND: + return new EndNode(); + case SelectorNode.KIND: + return new SelectorNode(); + case BatchAssignValueNode.KIND: + return new BatchAssignValueNode(); + case LoopNode.KIND: + return new LoopNode(); + case DeviceEventListenNode.KIND: + return new DeviceEventListenNode(); + default: + return new DeviceNode() { + @Override + public String getKind() { + return type; + } + }; + } + } +} diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/tool/NodeGenerateServer.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/tool/NodeGenerateServer.java new file mode 100644 index 0000000000000000000000000000000000000000..ec18a1b2d94881aa2f8084aa627b1852dec0beff --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/tool/NodeGenerateServer.java @@ -0,0 +1,86 @@ +package com.inspur.edp.ai.flow.mcp.tool; + +/** + * @author xww + * @Description 一个管理节点生成的服务器 + * @createTime 2026年01月19日 15:51:00 + */ +import com.fasterxml.jackson.databind.JsonNode; +import com.inspur.edp.ai.flow.mcp.factory.NodeFactory; +import com.inspur.edp.ai.flow.mcp.vo.FlowRequest; +import com.inspur.edp.ai.flow.mcp.vo.SimpleNode; +import com.inspur.edp.ai.flow.metadata.AbsNode; +import com.inspur.edp.ai.flow.metadata.FlowMetadata; +import com.inspur.edp.ai.flow.node.BatchAssignValueNode; +import com.inspur.edp.ai.flow.node.DeviceEventListenNode; +import com.inspur.edp.ai.flow.node.DeviceNode; +import com.inspur.edp.ai.flow.node.SelectorNode; +import com.inspur.edp.common.expr.expresses.setvalue.AssignValueExpr; +import com.inspur.edp.common.type.utils.JsonSerializeUtil; +import io.iec.edp.caf.commons.utils.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class NodeGenerateServer { + public static String generateFlow(FlowRequest flowRequest) { + FlowMetadata flowMetadata=new FlowMetadata(); + List absNodes=new ArrayList<>(); + List simpleNodes=flowRequest.getNodes(); + simpleNodes.forEach(simpleNode -> { + AbsNode node= NodeFactory.getNodeByType(simpleNode.getKind()); + JsonNode graphMeta= JsonSerializeUtil.toJsonNode("{position: {x: 1007.1470588235293, y: -208.15257352941177}, deletable: true}"); + node.setGraphMeta(graphMeta); + node.setId(simpleNode.getId()); + node.setName(simpleNode.getName()); + node.setInputPorts(simpleNode.getInputPorts()); + node.setOutputPorts(simpleNode.getOutputPorts()); + //设置入参 + node.setInputParams(simpleNode.getInputParams()); + //设置出参 + node.setOutputParams(simpleNode.getOutputParams()); + node.setCode(node.getKind()+"_"+node.getId()); + //赋值节点处理表达式 + if(BatchAssignValueNode.KIND.equals(node.getKind())){ + List expresses=simpleNode.getExpresses(); + ((BatchAssignValueNode)node).setExpresses(expresses); + } + //条件节点处理表达式 + if(SelectorNode.KIND.equals(node.getKind())){ + List branches=simpleNode.getBranches(); + ((SelectorNode)node).setBranches(branches); + } + //设备节点处理,设备节点的kind是动态随机的 + try{ + DeviceNode deviceNode=(DeviceNode)node; + if(!StringUtils.isEmpty(deviceNode.getDeviceId())){ + ((DeviceNode)node).setDeviceId(simpleNode.getDeviceId()); + ((DeviceNode)node).setDeviceAction(simpleNode.getDeviceAction()); + ((DeviceNode)node).setRequestType(simpleNode.getRequestType()); + ((DeviceNode)node).setApiEndpoint(simpleNode.getApiEndpoint()); + } + }catch (Throwable e){ + //暂不处理 + } + //设备监听节点处理 + if(DeviceEventListenNode.KIND.equals(node.getKind())){ + ((DeviceEventListenNode)node).setDeviceEvent(simpleNode.getDeviceEvent()); + ((DeviceEventListenNode)node).setDeviceCategory(simpleNode.getDeviceCategory()); + } + absNodes.add(node); + }); + flowMetadata.setNodes(absNodes); + flowMetadata.setEdges(flowRequest.getEdges()); + flowMetadata.setId(UUID.randomUUID().toString()); + flowMetadata.setName(flowRequest.getName()); + flowMetadata.setKind("workflow"); + flowMetadata.setCode(UUID.randomUUID().toString()); + flowMetadata.setBizTypeId("BP"); + flowMetadata.setSysInit(false); + flowMetadata.setVersion("v1"); + JsonNode graphMeta= JsonSerializeUtil.toJsonNode("{__initialized__: true}"); + flowMetadata.setExtension(graphMeta); + return JsonSerializeUtil.toJson(flowMetadata); + } +} \ No newline at end of file diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/FlowRequest.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/FlowRequest.java new file mode 100644 index 0000000000000000000000000000000000000000..ea97d49ebfed630bcb8ab8ccace5fc6b058d3ff8 --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/FlowRequest.java @@ -0,0 +1,18 @@ +package com.inspur.edp.ai.flow.mcp.vo; + +import com.inspur.edp.ai.flow.metadata.Edge; +import lombok.Data; + +import java.util.List; + +/** + * @author xww + * @Description TODO + * @createTime 2026年01月20日 16:40:00 + */ +@Data +public class FlowRequest { + private List nodes; + private String name; + private List edges; +} diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleNode.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleNode.java new file mode 100644 index 0000000000000000000000000000000000000000..aba98fab6112744fce3981ce255c1034029bd716 --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleNode.java @@ -0,0 +1,42 @@ +package com.inspur.edp.ai.flow.mcp.vo; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.inspur.edp.ai.flow.metadata.Parameter; +import com.inspur.edp.ai.flow.node.SelectorNode; +import com.inspur.edp.common.expr.ExprListJsonDeserializer; +import com.inspur.edp.common.expr.Express; +import com.inspur.edp.common.expr.expresses.setvalue.AssignValueExpr; +import lombok.Data; + +import java.util.List; + +/** + * @author xww + * @Description TODO + * @createTime 2026年01月20日 16:43:00 + */ +@Data +public class SimpleNode { + private String id; + private String name; + private String kind; + private List inputPorts; + private List outputPorts; + private List inputParams; + private List outputParams; + @JsonDeserialize(using = ExprListJsonDeserializer.class) + private List expresses; + private List branches; + /** + * 设备节点相关信息 + */ + private String deviceId; + private String deviceAction; + private String apiEndpoint; + private String requestType; + /** + * 设备监听节点相关信息 + */ + private String deviceCategory; + private String deviceEvent; +} diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleParameter.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleParameter.java new file mode 100644 index 0000000000000000000000000000000000000000..37dbf250edbf91bc308e8fc3fac23000ca7d60b1 --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleParameter.java @@ -0,0 +1,25 @@ +package com.inspur.edp.ai.flow.mcp.vo; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.inspur.edp.common.expr.ExprJsonDeserializer; +import com.inspur.edp.common.expr.Express; +import lombok.Data; + +import java.util.List; + +/** + * @author xww + * @Description TODO + * @createTime 2026年01月20日 16:45:00 + */ +@Data +public class SimpleParameter { + private String code; + private List typeIds; + @JsonDeserialize(using = ExprJsonDeserializer.class) + private Express valueExpr; + /** + * 支持手写的场景 + */ + private String value; +} diff --git a/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleSelectorBranch.java b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleSelectorBranch.java new file mode 100644 index 0000000000000000000000000000000000000000..43ef2c7dbf3fa5c7433d8636452fc7fa33cfda54 --- /dev/null +++ b/ai-flow-mcp/src/main/java/com/inspur/edp/ai/flow/mcp/vo/SimpleSelectorBranch.java @@ -0,0 +1,24 @@ +package com.inspur.edp.ai.flow.mcp.vo; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.inspur.edp.common.expr.ExprJsonDeserializer; +import com.inspur.edp.common.expr.ExprListJsonDeserializer; +import com.inspur.edp.common.expr.Express; +import com.inspur.edp.common.expr.expresses.logic.LogicExpr; +import com.inspur.edp.common.expr.expresses.logic.LogicOperator; +import lombok.Data; + +import java.util.List; + +/** + * @author xww + * @Description TODO + * @createTime 2026年01月20日 18:26:00 + */ +@Data +public class SimpleSelectorBranch { + @JsonDeserialize(using = LogicExpr.LogicOperatorDeserializer.class) + private LogicOperator operator; + @JsonDeserialize(using = ExprListJsonDeserializer.class) + private List expresses; +} diff --git a/ai-flow-node/pom.xml b/ai-flow-node/pom.xml index 4dcf72f10a050979dd5255bfef93a9ee0db060ee..dfac9640c5e507cf48513611eb9a8f6177672de9 100644 --- a/ai-flow-node/pom.xml +++ b/ai-flow-node/pom.xml @@ -17,8 +17,6 @@ 1.0.0 compile - - diff --git a/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNode.java b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNode.java new file mode 100644 index 0000000000000000000000000000000000000000..05a3fc72da5a2d1a104f4a089a5611b1ec185639 --- /dev/null +++ b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNode.java @@ -0,0 +1,20 @@ +package com.inspur.edp.ai.flow.node; + +import com.inspur.edp.ai.flow.metadata.AbsNode; +import lombok.Data; + +/** + * @author xww + * @Description 设备监听节点 + * @createTime 2026年01月23日 09:50:00 + */ +@Data +public class DeviceEventListenNode extends AbsNode { + public static final String KIND = "deviceEventListen"; + + public DeviceEventListenNode() { + this.setKind(KIND); + } + private String deviceCategory; + private String deviceEvent; +} diff --git a/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNodeDeserializer.java b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNodeDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..71fc7eee0543c624f0b24b71f4591eb55777ba02 --- /dev/null +++ b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNodeDeserializer.java @@ -0,0 +1,21 @@ +package com.inspur.edp.ai.flow.node; + +import com.inspur.edp.ai.flow.spi.NodeDeserializer; + +/** + * @author lizhaorui + * @date 2025/9/24 + * @description + */ +public class DeviceEventListenNodeDeserializer implements NodeDeserializer { + + @Override + public String getKind() { + return DeviceEventListenNode.KIND; + } + + @Override + public Class getNodeClass() { + return DeviceEventListenNode.class; + } +} \ No newline at end of file diff --git a/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNodeExecutor.java b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNodeExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..abc47512549be690aab463c046be0e0c26eb6a0c --- /dev/null +++ b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceEventListenNodeExecutor.java @@ -0,0 +1,26 @@ +package com.inspur.edp.ai.flow.node; + +import com.inspur.edp.ai.flow.engine.FlowContext; +import com.inspur.edp.ai.flow.metadata.AbsNode; +import com.inspur.edp.ai.flow.spi.NodeExecuteResult; +import com.inspur.edp.ai.flow.spi.NodeExecutor; + +import java.util.Map; + +/** + * @author xww + * @Description 设备监听执行器 + * @createTime 2026年01月23日 09:54:00 + */ +public class DeviceEventListenNodeExecutor implements NodeExecutor { + @Override + public String getKind() { + return DeviceEventListenNode.KIND; + } + + @Override + public NodeExecuteResult execute(FlowContext context, AbsNode node, Map argMap) { + //监听到设备发来的消息,当作开始节点开始执行 + return null; + } +} diff --git a/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNode.java b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNode.java new file mode 100644 index 0000000000000000000000000000000000000000..fa329a4286a0851e729ad6ee00165f405548e260 --- /dev/null +++ b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNode.java @@ -0,0 +1,97 @@ +package com.inspur.edp.ai.flow.node; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.JsonNode; +import com.inspur.edp.ai.flow.metadata.AbsNode; +import com.inspur.edp.ai.flow.metadata.Parameter; +import com.inspur.edp.common.expr.evaluator.ExpressEvaluator; +import lombok.Data; + +import java.util.List; + +/** + * @author xww + * @Description 设备节点:各种类型的设备 + * @createTime 2026年01月22日 11:25:00 + */ +@Data +public class DeviceNode extends AbsNode{ + public static final String KIND = "device"; + private String id; + //设备类型:摄像头、咖啡机、机器人等 + private String kind; + + private String code; + + private String name; + + private String description; + /** + * 是否为有状态节点 + */ + private boolean stateful; + private boolean inputSelf; + + /** + * 输入参数列表 + */ + private List inputParams; + + /** + * 输出参数列表 + */ + private List outputParams; + + /** + * 输入端口列表 + */ + private List inputPorts; + + /** + * 输出端口列表 + */ + private List outputPorts; + + /** + * 前端画布属性,json结构如下: + * { + * "position": { + * "x": 820, + * "y": 33.30000000000001 + * } + * } + */ + private JsonNode graphMeta; + + /** + * 是否为返回值节点 + */ + private boolean isReturnNode; + /** + * 设备ID + */ + private String deviceId; + /** + * 请求url + */ + private String apiEndpoint; + /** + * 请求类型:POST/GET + */ + private String requestType; + /** + * 设备调用方法 + */ + private String deviceAction; + /** + * 超时时间 + */ + private int timeOut; + /** + * 各个设备节点其他属性信息 + */ + private JsonNode properties; + + @JsonIgnore + private List expressEvaluators; +} diff --git a/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNodeDeserializer.java b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNodeDeserializer.java new file mode 100644 index 0000000000000000000000000000000000000000..9b921eb6f16f6ab0bd4d101c62bb4c800467dd7c --- /dev/null +++ b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNodeDeserializer.java @@ -0,0 +1,21 @@ +package com.inspur.edp.ai.flow.node; + +import com.inspur.edp.ai.flow.spi.NodeDeserializer; + +/** + * @author lizhaorui + * @date 2025/9/24 + * @description + */ +public class DeviceNodeDeserializer implements NodeDeserializer { + + @Override + public String getKind() { + return DeviceNode.KIND; + } + + @Override + public Class getNodeClass() { + return DeviceNode.class; + } +} \ No newline at end of file diff --git a/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNodeExecutor.java b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNodeExecutor.java new file mode 100644 index 0000000000000000000000000000000000000000..681424027dd5daa46635d39ce59579c723f9a8be --- /dev/null +++ b/ai-flow-node/src/main/java/com/inspur/edp/ai/flow/node/DeviceNodeExecutor.java @@ -0,0 +1,172 @@ +package com.inspur.edp.ai.flow.node; +import com.inspur.edp.ai.flow.engine.FlowContext; +import com.inspur.edp.ai.flow.engine.strategy.DefaultFlowStrategy; +import com.inspur.edp.ai.flow.metadata.AbsNode; +import com.inspur.edp.ai.flow.metadata.Parameter; +import com.inspur.edp.ai.flow.spi.NodeExecuteResult; +import com.inspur.edp.ai.flow.spi.NodeExecutor; +import com.inspur.edp.common.expr.evaluator.StringConstEvaluator; +import com.inspur.edp.common.expr.expresses.constant.StringConstExpr; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import javax.servlet.http.Cookie; +import javax.servlet.http.HttpServletRequest; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigDecimal; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.zip.GZIPInputStream; + +/** + * @author xww + * @Description 设备统一执行器 + * @createTime 2026年01月22日 14:34:00 + */ +public class DeviceNodeExecutor implements NodeExecutor { + @Override + public String getKind() { + return "device"; + } + + @Override + public NodeExecuteResult execute(FlowContext context, AbsNode node, Map argMap) { + DeviceNode deviceNode = (DeviceNode) node; + //根据节点信息组织参数、请求http并获取返回值 + Object result = invokeRestService(deviceNode,argMap); + //返回结果 + NodeExecuteResult nodeExecuteResult = new NodeExecuteResult(); + DefaultFlowStrategy strategy = new DefaultFlowStrategy(node.getId(), node.getOutputPorts().get(0)); + nodeExecuteResult.setFlowStrategy(strategy); + List outputParams = node.getOutputParams(); + if (outputParams != null && !outputParams.isEmpty()) { + Map outputObjects = new HashMap<>(); + outputObjects.put(node.getOutputParams().get(0).getCode(), result); + nodeExecuteResult.setVariablesToAdd(outputObjects); + } + return nodeExecuteResult; + } + public static void main(String[] args){ + DeviceNode deviceNode=new DeviceNode(); + deviceNode.setApiEndpoint("http://139.196.239.110:5174/devices/command"); + deviceNode.setRequestType("POST"); + deviceNode.setInputParams(new ArrayList<>()); + deviceNode.setDeviceId("coffee001"); + deviceNode.setDeviceAction("makeCoffee"); + List inputParams=new ArrayList<>(); + Parameter parameter=new Parameter(); + parameter.setCode("coffee_type"); + parameter.setName("coffee_type"); + StringConstExpr stringConstExpr =new StringConstExpr(); + parameter.setValue("Espresso"); + parameter.setValueExpr(stringConstExpr); + parameter.setValueExprEvaluator(new StringConstEvaluator("Espresso")); + inputParams.add(parameter); + deviceNode.setInputParams(inputParams); + String result=(String)invokeRestService(deviceNode,null); + System.out.println(result); + } + private static Object invokeRestService(DeviceNode deviceNode,Map argMap) { + String apiEndPoint=deviceNode.getApiEndpoint(); + String action=deviceNode.getDeviceAction(); + String deviceId=deviceNode.getDeviceId(); + String requestType=deviceNode.getRequestType(); + //根据需要组织参数,目前传递设备id和执行动作 + StringBuffer params=new StringBuffer("{\"deviceId\":\""+deviceId+"\",\"action\":\""+action+"\""); + List parameters=deviceNode.getInputParams(); + params.append(",\"").append("params").append("\":").append("{"); + int i=0; + for (Parameter parameter : parameters) { + if(i>0){ + params.append(","); + } + Object paramValue=null; + if (parameter.getValueExprEvaluator() != null) { + paramValue = parameter.getValueExprEvaluator().evaluate(argMap); + } + if (paramValue instanceof Integer || paramValue instanceof Long || paramValue instanceof Float || paramValue instanceof Double ||paramValue instanceof Short || paramValue instanceof BigDecimal){ + params.append("\"").append(parameter.getCode()).append("\":").append(paramValue); + }else{ + params.append("\"").append(parameter.getCode()).append("\":").append("\"").append(paramValue).append("\""); + } + i++; + } + params.append("}"); + params.append("}"); + System.out.println(params); + int timeOut= deviceNode.getTimeOut(); + if(timeOut==0){//默认超时时间5分钟 + timeOut=5*60*1000; + } + HttpServletRequest request; + Cookie[] cookies = new Cookie[0]; + HttpURLConnection conn = null; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + conn = (HttpURLConnection) new URL(apiEndPoint).openConnection(); + if (RequestContextHolder.getRequestAttributes() != null) { + request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); + cookies = request.getCookies(); + } + StringBuilder cookieStr = new StringBuilder(); + if (cookies != null) { + for (Cookie cookie : cookies) { + if (cookieStr.length() > 0) { + cookieStr.append(";"); + } + cookieStr.append(cookie.getName()).append("=").append(cookie.getValue()); + } + } + conn.setRequestMethod(requestType); + conn.setRequestProperty("Charset", "UTF-8"); + conn.setRequestProperty("Content-Type", "application/json"); + conn.setRequestProperty("Cookie", cookieStr.toString()); + conn.setRequestProperty("Accept-Encoding", "gzip,deflate"); + conn.setFollowRedirects(true); + conn.setConnectTimeout(timeOut); + conn.setReadTimeout(timeOut); + conn.setDoInput(true); + conn.setDoOutput(true); + conn.setAllowUserInteraction(false); + try (OutputStream out = conn.getOutputStream()) { + out.write(params.toString().getBytes()); + out.flush(); + } + int responseCode = conn.getResponseCode(); + if (responseCode != 200) { + return "Error responseCode:" + responseCode+" deviceId is "+deviceNode.getDeviceId(); + } + String contentEncoding = conn.getHeaderField("Content-Encoding"); + try (InputStream inputStream = conn.getInputStream(); + InputStream decompressedStream = "gzip".equals(contentEncoding) ? new GZIPInputStream(inputStream) : inputStream; + ) { + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = decompressedStream.read(buffer)) != -1) { + baos.write(buffer, 0, bytesRead); + } + } + return new String(baos.toByteArray(), StandardCharsets.UTF_8); + } catch (Exception e) { + return "error, deviceId is "+deviceNode.getDeviceId() +"detail:"+e.getMessage(); + } finally { + if (conn != null) { + conn.disconnect(); + } + if(baos!=null){ + try { + baos.close(); + } catch (IOException e) { + } + } + } + } +} diff --git a/pom.xml b/pom.xml index e341df8b94b8cb2b19594c4809a24e03e8bc1a2c..e3a33b383df7c8f640e2b0193a0fc1c559aecda1 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,28 @@ common-type-ext common-function ai-flow-demo + ai-flow-mcp + + + org.java-websocket + Java-WebSocket + 1.5.2 + compile + + + com.fasterxml.jackson.core + jackson-databind + + + jakarta.ws.rs + jakarta.ws.rs-api + + + org.springframework + spring-context + + io.iec.edp