Ai工作流LLM 工作流基于OllamaL + lamaIndex 的小参数模型的事件驱动型 Agent 工作流设计与实践
28.7的博客LlamaIndex 工作流(Workflow)详解
工作流是一种事件驱动、基于步骤的执行流程控制方式,能将应用程序拆解为可复用的「步骤(Steps)」,步骤由「事件(Events)」触发且自身也会发出事件,通过组合步骤与事件可构建复杂且易维护的执行逻辑。
技术选型:选用 llama-index 作为Agent框架,搭配绘图模块可视化工作流
1 2
| pip install llama-index-core pip install llama-index-utils-workflow
|
1. 核心概念:分支与循环
工作流的核心逻辑由「分支」和「循环」构成:
- 分支:步骤可根据条件返回不同事件,框架会根据事件类型自动匹配并执行对应步骤;
- 循环:某步骤返回的事件若匹配自身/上游步骤的输入类型,框架会将事件重新传入该步骤,实现重复执行。
基础依赖导入
所有工作流示例均依赖以下核心模块:
1 2 3 4 5 6 7 8 9 10
| from llama_index.core.workflow import ( StartEvent, StopEvent, Workflow, step, Event, Context, ) from llama_index.utils.workflow import draw_all_possible_flows import asyncio
|
2. 单分支线性工作流
最基础的工作流形式:步骤按固定顺序执行,每个步骤的输出作为下一个步骤的输入。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class FirstEvent(Event): first_output: str
class SecondEvent(Event): second_output: str
class MyWorkflow(Workflow): @step async def step_one(self, ev: StartEvent) -> FirstEvent: print(ev.first_input) return FirstEvent(first_output="First step complete.")
@step async def step_two(self, ev: FirstEvent) -> SecondEvent: print(ev.first_output) return SecondEvent(second_output="Second step complete.")
@step async def step_three(self, ev: SecondEvent) -> StopEvent: print(ev.second_output) return StopEvent(result="Workflow complete.")
async def main(): w = MyWorkflow(timeout=10, verbose=False) draw_all_possible_flows(MyWorkflow, filename="multi_step_workflow.html") result = await w.run(first_input="Start the workflow.") print(result)
if __name__ == "__main__": asyncio.run(main())
|
执行输出
1 2 3 4
| Start the workflow. First step complete. Second step complete. Workflow complete.
|
工作流可视化图:

3. 多分支选择工作流
步骤可返回多个类型的事件,框架根据返回的事件类型自动选择执行分支(示例中通过随机数模拟分支选择)。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| import random
class BranchA1Event(Event): payload: str
class BranchA2Event(Event): payload: str
class BranchB1Event(Event): payload: str
class BranchB2Event(Event): payload: str
class BranchWorkflow(Workflow): @step async def start(self, ev: StartEvent) -> BranchA1Event | BranchB1Event: if random.randint(0, 1) == 0: print("Go to branch A") return BranchA1Event(payload="Branch A") else: print("Go to branch B") return BranchB1Event(payload="Branch B")
@step async def step_a1(self, ev: BranchA1Event) -> BranchA2Event: print(ev.payload) return BranchA2Event(payload=ev.payload)
@step async def step_b1(self, ev: BranchB1Event) -> BranchB2Event: print(ev.payload) return BranchB2Event(payload=ev.payload)
@step async def step_a2(self, ev: BranchA2Event) -> StopEvent: print(ev.payload) return StopEvent(result="Branch A complete.")
@step async def step_b2(self, ev: BranchB2Event) -> StopEvent: print(ev.payload) return StopEvent(result="Branch B complete.")
async def main(): workflow = BranchWorkflow(timeout=10, verbose=True) draw_all_possible_flows(BranchWorkflow, filename="branch_workflow.html") result = await workflow.run() print(f"\n最终结果: {result}")
if __name__ == "__main__": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())
|
工作流可视化图:

4. 基于Context的跨步骤数据传递(状态保持)
自定义事件仅能在相邻步骤传递数据,跨步骤传递需借助 Context 对象(类似ThinkPHP框架的缓存机制),可实现工作流内全局数据共享。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| class SetupEvent(Event): query: str
class StepTwoEvent(Event): payload: str
class StateFulFollow(Workflow): def __init__(self, timeout: int = 10, verbose: bool = True): super().__init__(timeout=timeout, verbose=verbose) self.context_data = {}
@step async def start(self, ev: StartEvent) -> SetupEvent: db_data = self.context_data.get("some_database") print(f"Start步骤 - 初始Context数据: {db_data}") return SetupEvent(query=ev.query)
@step async def setup(self, ev: SetupEvent) -> StepTwoEvent: self.context_data["some_database"] = [1,2,3] print(f"Setup步骤 - 已向Context存入数据: [1,2,3]") return StepTwoEvent(payload=ev.query)
@step async def step_two(self, ev: StepTwoEvent) -> StopEvent: db_data = self.context_data.get("some_database") print(f"StepTwo步骤 - 从Context读取数据: {db_data}") return StopEvent(result=f"最终结果:{db_data},查询词:{ev.payload}")
async def main(): w = StateFulFollow(timeout=10, verbose=True) draw_all_possible_flows(StateFulFollow, filename="doc_check_workflow.html") result = await w.run(query="Some query") print(f"\n工作流最终结果: {result}")
if __name__ == "__main__": asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())
|
工作流可视化图:

5. 流式记忆:事件流与LLM流式输出
复杂工作流执行耗时较长,可通过 Context.write_event_to_stream() 实时输出进度/LLM流式响应,提升用户体验(示例基于Ollama本地模型实现流式输出)。
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| from llama_index.llms.ollama import Ollama
class FirstEvent(Event): first_output: str
class SecondEvent(Event): second_output: str response: str
class ProgressEvent(Event): msg: str
class MyWorkflow(Workflow): @step async def step_one(self, ctx: Context, ev: StartEvent) -> FirstEvent: ctx.write_event_to_stream(ProgressEvent(msg="Step one is happening\n")) return FirstEvent(first_output="First step complete.")
@step async def step_two(self, ctx: Context, ev: FirstEvent) -> SecondEvent: llm = Ollama( model="llama3.1:latest", base_url="http://localhost:11434", temperature=0.1 ) full_response = "" generator = await llm.astream_complete( "Please give me the first 3 paragraphs of Moby Dick, a book in the public domain." ) async for response in generator: if response.delta: full_response += response.delta ctx.write_event_to_stream(ProgressEvent(msg=response.delta)) return SecondEvent( second_output="Second step complete, full response attached", response=full_response, )
@step async def step_three(self, ctx: Context, ev: SecondEvent) -> StopEvent: ctx.write_event_to_stream(ProgressEvent(msg="\nStep three is happening")) return StopEvent(result="Workflow complete.")
async def main(): w = MyWorkflow(timeout=60, verbose=True) handler = w.run(start_event=StartEvent()) async for ev in handler.stream_events(): if isinstance(ev, ProgressEvent): print(ev.msg, end="", flush=True) final_result = await handler print(f"\nFinal result: {final_result}")
draw_all_possible_flows(MyWorkflow, filename="streaming_workflow.html")
if __name__ == "__main__": asyncio.run(main())
|
工作流可视化图:


总结
- 核心机制:LlamaIndex工作流通过
@step标记步骤、Event传递数据,框架根据事件类型自动匹配执行步骤;
- 数据传递:相邻步骤用自定义
Event,跨步骤用Context对象实现全局状态保持;
- 进阶能力:通过
Context.write_event_to_stream()可实现LLM流式输出,提升长耗时工作流的用户体验。