Eino 学习总结

并行处理

Parallel 的输出结构为map[string]any

key 是在AddLambda方法中指定的 outputKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package main

import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"github.com/joho/godotenv"
)

var GlobalClient *openai.ChatModel

func initGlobalClient() {
err := godotenv.Load()
if err != nil {
panic(err)
}
key := os.Getenv("OPENAI_API_KEY")
model := os.Getenv("OPENAI_MODEL")
baseURL := os.Getenv("OPENAI_BASE_URL")
GlobalClient, err = openai.NewChatModel(context.Background(), &openai.ChatModelConfig{
ByAzure: false,
BaseURL: baseURL,
APIKey: key,
Model: model,
})
if err != nil {
panic(err)
}
}

func main() {
ctx := context.Background()
initGlobalClient()

// 创建并行节点, 准备添加4个并行任务
parallel := compose.NewParallel()

// 情感分析
parallel.AddLambda("current_emotion", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
userInput, ok := input["user_input"].(string)
if !ok {
return "neutral", nil
}

switch {
case contains(userInput, []string{"开心", "高兴", "愉快", "happy", "good"}):
return "positive", nil
case contains(userInput, []string{"难过", "沮丧", "生气", "sad", "angry"}):
return "negative", nil
default:
return "neutral", nil
}
}))

// 获取历史信息
parallel.AddLambda("context_history", compose.InvokableLambda(func(ctx context.Context, input map[string]any) ([]string, error) {
history := []string{
"用户: 我失恋了怎么办啊",
"助手: 时间会治愈一切, 专注自我成长, 你会遇到更好的。",
"用户: 可是我放不下她啊",
}
return history, nil
}))

// 获取当前时间
parallel.AddLambda("current_time", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
return time.Now().Format("2006-01-02 15:04:05"), nil
}))

// 保留原始输入
parallel.AddLambda("user_input", compose.InvokableLambda(func(ctx context.Context, input map[string]any) (string, error) {
if val, exists := input["user_input"]; exists {
if val != nil {
if str, ok := val.(string); ok {
return str, nil
}
}
}
return "未知哦", nil
}))

// 创建链, 作为工作流主干
chain := compose.NewChain[map[string]any, *schema.Message]()

// 添加Parallel节点
chain.AppendParallel(parallel)

// 拼接并行结果, 构造出调用参数
chain.AppendLambda(compose.InvokableLambda(func(ctx context.Context, input map[string]any) ([]*schema.Message, error) {
emotion := input["current_emotion"].(string)
history := input["context_history"].([]string)
currentTime := input["current_time"].(string)
userInput := input["user_input"].(string)

systemPrompt := fmt.Sprintf(
"你是一个智能助手, 根据以下信息来回答用户问题:\n\n"+
"1. 用户当前情绪状态: %s\n"+
"2. 最近的对话历史:\n %s\n"+
"3. 当前时间: %s\n\n"+
"请综合考虑以上信息, 给出恰当的回答。",
emotion,
fmt.Sprintf("%s", history),
currentTime,
)

messages := []*schema.Message{
schema.SystemMessage(systemPrompt),
schema.UserMessage(userInput),
}

return messages, nil
}))

// 调用ai节点
chain.AppendLambda(compose.InvokableLambda(func(ctx context.Context, messages []*schema.Message) (*schema.Message, error) {
resp, err := GlobalClient.Generate(ctx, messages)
if err != nil {
return nil, fmt.Errorf("调用LLM失败: %w", err)
}
return resp, nil
}))

// 编译Chain
r, err := chain.Compile(ctx)
if err != nil {
log.Fatalf("编译chain失败, err=%v", err)
return
}

// 准备输入数据
input := make(map[string]any)
input["user_input"] = string("夜深了, 还是想她")

// 执行Chain
result, err := r.Invoke(ctx, input)
if err != nil {
log.Fatalf("执行chain失败, err=%v", err)
return
}
fmt.Println("最终输出:")
fmt.Println(result.Content)
}

// 辅助函数:检查字符串是否包含指定词汇
func contains(text string, keywords []string) bool {
for _, keyword := range keywords {
if len(keyword) > 0 && len(text) >= len(keyword) {
for i := 0; i <= len(text)-len(keyword); i++ {
if text[i:i+len(keyword)] == keyword {
return true
}
}
}
}
return false
}

流程图

graph TD
    A[Start] --> C[创建 Parallel 节点]

    C --> D[添加 current_emotion 任务]
    C --> E[添加 context_history 任务]
    C --> F[添加 current_time 任务]
    C --> G[添加 user_input 任务]

    D --> H[创建 Chain]
    E --> H
    F --> H
    G --> H

    H --> I[AppendParallel - 执行并行任务]
    I --> J[数据整合 - 构造系统提示词]
    J --> K[调用AI模型生成回复]
    K --> O[输出结果]

Chain 理解

1
compose.NewChain[map[string]any, *schema.Message]()

这个NewChain[I, O]一个是输入类型, 一个是输出类型

输入类型与下一个AppendLambda的类型要一致(每一步的类型都是环环相扣的), 否则会遇到

例:

1
2
compose.NewChain[string, string]() // 输入是string
func(ctx context.Context, question int) // 传递过去接收int
1
编译stringChain失败: graph edge[start]-[node_0]: start node's output type[string] and end node's input type[int] mismatch

那么对于单链的来说, 大致流程

1
NewChain -> AppendLambda -> ...n个函数 -> Compile -> Invoke

分支

Branch 节点只能从一个节点接收输入

1
2
3
4
5
6
7
// 创建分支条件函数
branchCondition := func(ctx context.Context, input int) (string, error)
// 对应注册的处理函数是按名字来的, 所以分支函数的返回值一定是string
// 下面把small打错, 直接走了默认情况
branch.AddLambda("smallll", compose.InvokableLambda(func(ctx context.Context, input int) (string, error) {
return fmt.Sprintf("小数字: %d", input), nil
}))

大致流程

1
输入数据 -> 分支条件函数 -> 返回分支key -> 查找对应处理函数 -> 执行对应分支

调试

在 main 函数顶上初始化devops.Init(), 默认在 52538 端口, 可以devops.WithDevServerPort("指定端口")

注意: 编排产物至少执行过一次 Compile() (比如聊天类型的, 要先对话一次才能看到可视化的)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func main() {
ctx := context.Background()
// Init eino devops server
err := devops.Init(ctx)
if err != nil {
logs.Errorf("[eino dev] init failed, err=%v", err)
return
}

// xxx 你的代码
chain.Compile(ctx) // 根据编译的函数来注册调试

// Blocking process exits
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs

// Exit
logs.Infof("[eino dev] shutting down\n")
}

在 vscode 插件安装eino.eino-dev插件, 点击Graph调试配置地址和端口即可

调试的 json 值设置

有些类型插件能自行推导, any 类型不能自动推导

输入 map[string]interface{}

1
2
3
4
5
6
7
8
9
10
// user_input => string key
// {} => interface value
// _value => 真实的value
// _eino_go_type => go的类型
{
"user_input": {
"_value": "你好!今天天气真好, 我感到很开心!",
"_eino_go_type": "string"
}
}

输入 any

1
2
3
4
{
"_value": "This is a plain string",
"_eino_go_type": "string"
}

自定义类型注册

使用devops.AppendType()

1
2
3
4
5
6
// 自定义类型
type NodeInfo struct {
Message string `json:"message"`
Count int `json:"count"`
}
devops.AppendType(&NodeInfo{})

调试 json

1
2
3
4
5
6
7
{
"_value": {
"message": "Hello from NodeInfo",
"count": 42
},
"_eino_go_type": "*main.NodeInfo"
}

触发模式

  • 需要更灵活的执行控制, 选择 AnyPredecessor
  • 需要确保所有前置条件都满足后再执行, 选择 AllPredecessor

rag

处理&召回

统一一个位置写配置, 哪些索引, 返回什么

redis 协议版本一定要对齐

pg

用 redis 的来建图, 然后修改 indexer 和 retriever