基于OllamaL + lamaIndex 的小参数模型的事件驱动型 Agent 工作流设计与实践

LlamaIndex 工作流(Workflow)详解

工作流是一种事件驱动、基于步骤的执行流程控制方式,能将应用程序拆解为可复用的「步骤(Steps)」,步骤由「事件(Events)」触发且自身也会发出事件,通过组合步骤与事件可构建复杂且易维护的执行逻辑。

技术选型:选用 llama-index 作为Agent框架,搭配绘图模块可视化工作流

1
2
pip install llama-index-core
pip install llama-index-utils-workflow

1. 核心概念:分支与循环

工作流的核心逻辑由「分支」和「循环」构成:

  • 分支:步骤可根据条件返回不同事件,框架会根据事件类型自动匹配并执行对应步骤;
  • 循环:某步骤返回的事件若匹配自身/上游步骤的输入类型,框架会将事件重新传入该步骤,实现重复执行。

基础依赖导入

所有工作流示例均依赖以下核心模块:

1
2
3
4
5
6
7
8
9
10
from llama_index.core.workflow import (
StartEvent, # 工作流起始事件(框架内置)
StopEvent, # 工作流终止事件(框架内置)
Workflow, # 自定义工作流基类
step, # 标记工作流步骤的装饰器
Event, # 步骤间数据传递的事件基类
Context, # 跨步骤数据传递的上下文对象(进阶用法)
)
from llama_index.utils.workflow import draw_all_possible_flows # 工作流可视化
import asyncio # 异步执行依赖

2. 单分支线性工作流

最基础的工作流形式:步骤按固定顺序执行,每个步骤的输出作为下一个步骤的输入。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class FirstEvent(Event):
first_output: str # 自定义事件的传递数据

class SecondEvent(Event):
second_output: str

class MyWorkflow(Workflow):
@step # 标记为工作流步骤
async def step_one(self, ev: StartEvent) -> FirstEvent:
print(ev.first_input)
return FirstEvent(first_output="First step complete.")

@step
async def step_two(self, ev: FirstEvent) -> SecondEvent:
print(ev.first_output)
return SecondEvent(second_output="Second step complete.")

@step
async def step_three(self, ev: SecondEvent) -> StopEvent:
print(ev.second_output)
return StopEvent(result="Workflow complete.")

async def main():
# 初始化工作流(超时时间10秒,关闭详细日志)
w = MyWorkflow(timeout=10, verbose=False)
# 生成工作流可视化文件
draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html")
# 运行工作流并传入初始参数
result = await w.run(first_input="Start the workflow.")
print(result)

if __name__ == "__main__":
asyncio.run(main())

执行输出

1
2
3
4
Start the workflow.
First step complete.
Second step complete.
Workflow complete.

工作流可视化图:
单分支工作流

3. 多分支选择工作流

步骤可返回多个类型的事件,框架根据返回的事件类型自动选择执行分支(示例中通过随机数模拟分支选择)。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import random

# 定义不同分支的事件类型
class BranchA1Event(Event):
payload: str

class BranchA2Event(Event):
payload: str

class BranchB1Event(Event):
payload: str

class BranchB2Event(Event):
payload: str

class BranchWorkflow(Workflow):
@step
async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event:
# 随机选择分支
if random.randint(0, 1) == 0:
print("Go to branch A")
return BranchA1Event(payload="Branch A")
else:
print("Go to branch B")
return BranchB1Event(payload="Branch B")

@step # 分支A第一步
async def step_a1(self, ev: BranchA1Event) -> BranchA2Event:
print(ev.payload)
return BranchA2Event(payload=ev.payload)

@step # 分支B第一步
async def step_b1(self, ev: BranchB1Event) -> BranchB2Event:
print(ev.payload)
return BranchB2Event(payload=ev.payload)

@step # 分支A终止
async def step_a2(self, ev: BranchA2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch A complete.")

@step # 分支B终止
async def step_b2(self, ev: BranchB2Event) -> StopEvent:
print(ev.payload)
return StopEvent(result="Branch B complete.")

async def main():
workflow = BranchWorkflow(timeout=10, verbose=True)
draw_all_possible_flows(BranchWorkflow, filename="branch_workflow.html")
result = await workflow.run()
print(f"\n最终结果: {result}")

if __name__ == "__main__":
# Windows系统异步策略适配
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())

工作流可视化图:
多分支工作流

4. 基于Context的跨步骤数据传递(状态保持)

自定义事件仅能在相邻步骤传递数据,跨步骤传递需借助 Context 对象(类似ThinkPHP框架的缓存机制),可实现工作流内全局数据共享。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SetupEvent(Event):
query: str

class StepTwoEvent(Event):
payload: str

class StateFulFollow(Workflow):
def __init__(self, timeout: int = 10, verbose: bool = True):
super().__init__(timeout=timeout, verbose=verbose)
self.context_data = {} # 全局上下文存储

@step
async def start(self, ev: StartEvent) -> SetupEvent:
# 读取上下文数据
db_data = self.context_data.get("some_database")
print(f"Start步骤 - 初始Context数据: {db_data}")
return SetupEvent(query=ev.query)

@step
async def setup(self, ev: SetupEvent) -> StepTwoEvent:
# 写入上下文数据
self.context_data["some_database"] = [1,2,3]
print(f"Setup步骤 - 已向Context存入数据: [1,2,3]")
return StepTwoEvent(payload=ev.query)

@step
async def step_two(self, ev: StepTwoEvent) -> StopEvent:
# 跨步骤读取上下文数据
db_data = self.context_data.get("some_database")
print(f"StepTwo步骤 - 从Context读取数据: {db_data}")
return StopEvent(result=f"最终结果:{db_data},查询词:{ev.payload}")

async def main():
w = StateFulFollow(timeout=10, verbose=True)
draw_all_possible_flows(StateFulFollow, filename="doc_check_workflow.html")
result = await w.run(query="Some query")
print(f"\n工作流最终结果: {result}")

if __name__ == "__main__":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
asyncio.run(main())

工作流可视化图:
上下文传递工作流

5. 流式记忆:事件流与LLM流式输出

复杂工作流执行耗时较长,可通过 Context.write_event_to_stream() 实时输出进度/LLM流式响应,提升用户体验(示例基于Ollama本地模型实现流式输出)。

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
from llama_index.llms.ollama import Ollama  # 导入Ollama LLM依赖

# 自定义事件
class FirstEvent(Event):
first_output: str

class SecondEvent(Event):
second_output: str
response: str

class ProgressEvent(Event):
msg: str # 流式输出的进度信息

class MyWorkflow(Workflow):
@step
async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent:
# 写入流式事件(进度提示)
ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening\n"))
return FirstEvent(first_output="First step complete.")

@step
async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent:
# 初始化Ollama本地模型
llm = Ollama(
model="llama3.1:latest",
base_url="http://localhost:11434",
temperature=0.1
)

full_response = ""
# 流式生成LLM响应
generator = await llm.astream_complete(
"Please give me the first 3 paragraphs of Moby Dick, a book in the public domain."
)

# 逐块输出流式响应
async for response in generator:
if response.delta: # delta为当前流式片段
full_response += response.delta
ctx.write_event_to_stream(ProgressEvent(msg=response.delta))

return SecondEvent(
second_output="Second step complete, full response attached",
response=full_response,
)

@step
async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent:
ctx.write_event_to_stream(ProgressEvent(msg="\nStep three is happening"))
return StopEvent(result="Workflow complete.")

async def main():
w = MyWorkflow(timeout=60, verbose=True)
# 运行工作流并监听事件流
handler = w.run(start_event=StartEvent())
async for ev in handler.stream_events():
if isinstance(ev, ProgressEvent):
print(ev.msg, end="", flush=True) # 实时输出流式内容

final_result = await handler
print(f"\nFinal result: {final_result}")

# 生成工作流可视化文件
draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")

if __name__ == "__main__":
asyncio.run(main())

工作流可视化图:
流式工作流1
流式工作流2


总结

  1. 核心机制:LlamaIndex工作流通过@step标记步骤、Event传递数据,框架根据事件类型自动匹配执行步骤;
  2. 数据传递:相邻步骤用自定义Event,跨步骤用Context对象实现全局状态保持;
  3. 进阶能力:通过Context.write_event_to_stream()可实现LLM流式输出,提升长耗时工作流的用户体验。