LLM 应用示例:最佳实践示例

LLM 应用开发模式:轻量级 API 编排

在 LangChain 中使用了思维链的方式来选择合适的智能体(Agent),在 Co-mate 中,我们也是采取了类似的设计,在本地构建好函数,然后交由 LLM 来分析用户的输入适合调用哪个函数。

如下是我们的 prompt 示例:

  1. Answer the following questions as best you can. You have access to the following tools:
  2. introduce_system: introduce_system is a function to introduce a system.
  3. Use the following format:
  4. Question: the input question you must answer
  5. Thought: you should always think about what to do
  6. Action: the action to take, should be one of [introduce_system]
  7. Action Input: the input to the action
  8. Observation: the result of the action
  9. ... (this Thought/Action/Action Input/Observation can repeat N times)
  10. Thought: I now know the final answer
  11. Final Answer: the final answer to the original input question
  12. Begin!
  13. Question: Introduce the following system: https://github.com/archguard/ddd-monolithic-code-sample

这里的 Question 便是用户的输入,然后再调用对应的 introduce_system 函数进行分析。

LLM 应用开发模式:DSL 动态运行时

与事实能力相比,我们更信任 LLM 的编排能力,因此我们在 Co-mate 中采用了 DSL 的方式来编排函数,这样可以更加灵活的编排函数。

为了支撑这样的能力,我们在 Co-mate 中引入了 Kotlin 作为 DSL 的运行时:

  1. // 初始化运行时
  2. val repl = KotlinInterpreter()
  3. val mvcDslSpec = repl.evalCast<FoundationSpec>(InterpreterRequest(code = mvcFoundation))
  4. // 从用户的输入中获取 action
  5. val action = ComateToolingAction.from(action.lowercase())
  6. // 添加默认的 DSL spec
  7. if (action == ComateToolingAction.FOUNDATION_SPEC_GOVERNANCE) {
  8. comateContext.spec = mvcDslSpec
  9. }

对应的 DSL 示例(由 ChatGPT 根据 DDD 版本 spec 生成):

  1. foundation {
  2. project_name {
  3. pattern("^([a-z0-9-]+)-([a-z0-9-]+)(-common)?\${'$'}")
  4. example("system1-webapp1")
  5. }
  6. layered {
  7. layer("controller") {
  8. pattern(".*\\.controller") { name shouldBe endsWith("Controller") }
  9. }
  10. layer("service") {
  11. pattern(".*\\.service") {
  12. name shouldBe endsWith("DTO", "Request", "Response", "Factory", "Service")
  13. }
  14. }
  15. layer("repository") {
  16. pattern(".*\\.repository") { name shouldBe endsWith("Entity", "Repository", "Mapper") }
  17. }
  18. dependency {
  19. "controller" dependedOn "service"
  20. "controller" dependedOn "repository"
  21. "service" dependedOn "repository"
  22. }
  23. }
  24. naming {
  25. class_level {
  26. style("CamelCase")
  27. pattern(".*") { name shouldNotBe contains("${'$'}") }
  28. }
  29. function_level {
  30. style("CamelCase")
  31. pattern(".*") { name shouldNotBe contains("${'$'}") }
  32. }
  33. }
  34. }

LLM 应用开发模式:本地小模型

在 Co-mate 中,我们在本地引入了 SentenceTransformer 来处理用户的输入,优在本地分析、匹配用户的输入,并处理。当匹配到结果后直接调用本地的函数,当匹配不到结果时调用远端的处理函数来处理。

HuggingFace: https://huggingface.co/sentence-transformers

在原理上主要是参考了 GitHub Copilot、 Bloop 的实现,通过本地的小模型来处理用户的输入,然后再通过远端的大模型来处理用户的输入。

Rust 实现示例

Rust 相关示例:https://github.com/unit-mesh/unit-agent

  1. #![allow(unused)]
  2. fn main() {
  3. pub fn embed(&self, sequence: &str) -> anyhow::Result<Embedding> {
  4.     let tokenizer_output = self.tokenizer.encode(sequence, true).unwrap();
  5.     let input_ids = tokenizer_output.get_ids();
  6.     let attention_mask = tokenizer_output.get_attention_mask();
  7.     let token_type_ids = tokenizer_output.get_type_ids();
  8.     let length = input_ids.len();
  9.     trace!("embedding {} tokens {:?}", length, sequence);
  10.     let inputs_ids_array = ndarray::Array::from_shape_vec(
  11.         (1, length),
  12.         input_ids.iter().map(|&x| x as i64).collect(),
  13.     )?;
  14.     let attention_mask_array = ndarray::Array::from_shape_vec(
  15.         (1, length),
  16.         attention_mask.iter().map(|&x| x as i64).collect(),
  17.     )?;
  18.     let token_type_ids_array = ndarray::Array::from_shape_vec(
  19.         (1, length),
  20.         token_type_ids.iter().map(|&x| x as i64).collect(),
  21.     )?;
  22.     let outputs = self.session.run([
  23.         InputTensor::from_array(inputs_ids_array.into_dyn()),
  24.         InputTensor::from_array(attention_mask_array.into_dyn()),
  25.         InputTensor::from_array(token_type_ids_array.into_dyn()),
  26.     ])?;
  27.     let output_tensor: OrtOwnedTensor<f32, _> = outputs[0].try_extract().unwrap();
  28.     let sequence_embedding = &*output_tensor.view();
  29.     let pooled = sequence_embedding.mean_axis(Axis(1)).unwrap();
  30.     Ok(pooled.to_owned().as_slice().unwrap().to_vec())
  31. }
  32. }

Kotlin 实现示例

  1. class Semantic(val tokenizer: HuggingFaceTokenizer, val session: OrtSession, val env: OrtEnvironment) {
  2. fun embed(
  3. sequence: String,
  4. ): FloatArray {
  5. val tokenized = tokenizer.encode(sequence, true)
  6. val inputIds = tokenized.ids
  7. val attentionMask = tokenized.attentionMask
  8. val typeIds = tokenized.typeIds
  9. val tensorInput = OrtUtil.reshape(inputIds, longArrayOf(1, inputIds.size.toLong()))
  10. val tensorAttentionMask = OrtUtil.reshape(attentionMask, longArrayOf(1, attentionMask.size.toLong()))
  11. val tensorTypeIds = OrtUtil.reshape(typeIds, longArrayOf(1, typeIds.size.toLong()))
  12. val result = session.run(
  13. mapOf(
  14. "input_ids" to OnnxTensor.createTensor(env, tensorInput),
  15. "attention_mask" to OnnxTensor.createTensor(env, tensorAttentionMask),
  16. "token_type_ids" to OnnxTensor.createTensor(env, tensorTypeIds),
  17. ),
  18. )
  19. val outputTensor = result.get(0) as OnnxTensor
  20. val output = outputTensor.floatBuffer.array()
  21. return output
  22. }
  23. companion object {
  24. fun create(): Semantic {
  25. val classLoader = Thread.currentThread().getContextClassLoader()
  26. val tokenizerPath = classLoader.getResource("model/tokenizer.json")!!.toURI()
  27. val onnxPath = classLoader.getResource("model/model.onnx")!!.toURI()
  28. try {
  29. val env: Map<String, String> = HashMap()
  30. val array: List<String> = tokenizerPath.toString().split("!")
  31. FileSystems.newFileSystem(URI.create(array[0]), env)
  32. } catch (e: Exception) {
  33. // e.printStackTrace()
  34. }
  35. val tokenizer = HuggingFaceTokenizer.newInstance(Paths.get(tokenizerPath))
  36. val ortEnv = OrtEnvironment.getEnvironment()
  37. val sessionOptions = OrtSession.SessionOptions()
  38. // load onnxPath as byte[]
  39. val onnxPathAsByteArray = Files.readAllBytes(Paths.get(onnxPath))
  40. val session = ortEnv.createSession(onnxPathAsByteArray, sessionOptions)
  41. return Semantic(tokenizer, session, ortEnv)
  42. }
  43. }
  44. }

LLM 应用开发模式:Stream 封装

服务端 API 调用:Kotlin 实现

机制:结合 callbackFlow 来实现

  1. fun stream(text: String): Flow<String> {
  2. val systemMessage = ChatMessage(ChatMessageRole.USER.value(), text)
  3. messages.add(systemMessage)
  4. val completionRequest = ChatCompletionRequest.builder()
  5. .model(openAiVersion)
  6. .temperature(0.0)
  7. .messages(messages)
  8. .build()
  9. return callbackFlow {
  10. withContext(Dispatchers.IO) {
  11. service.streamChatCompletion(completionRequest)
  12. .doOnError(Throwable::printStackTrace)
  13. .blockingForEach { response ->
  14. val completion = response.choices[0].message
  15. if (completion != null && completion.content != null) {
  16. trySend(completion.content)
  17. }
  18. }
  19. close()
  20. }
  21. }
  22. }

客户端 API 调用:TypeScript 实现

机制:依赖于 Vercel 的 AI 库,提供对于 Stream 的封装

  1. import { Message, OpenAIStream, StreamingTextResponse } from 'ai'
  2. import { Configuration, OpenAIApi } from 'openai-edge'
  3. export async function stream(apiKey: string, messages: Message[], isStream: boolean = true) {
  4. let basePath = process.env.OPENAI_PROXY_URL
  5. if (basePath == null) {
  6. basePath = 'https://api.openai.com'
  7. }
  8. const configuration = new Configuration({
  9. apiKey: apiKey || process.env.OPENAI_API_KEY,
  10. basePath
  11. })
  12. const openai = new OpenAIApi(configuration)
  13. const res = await openai.createChatCompletion({
  14. model: 'gpt-3.5-turbo',
  15. messages,
  16. temperature: 0.7,
  17. stream: isStream
  18. })
  19. if (!isStream) {
  20. return res
  21. }
  22. const stream = OpenAIStream(res, {})
  23. return new StreamingTextResponse(stream)
  24. }

客户端 UI 实现:Fetch

  1. const decoder = new TextDecoder()
  2. export function decodeAIStreamChunk(chunk: Uint8Array): string {
  3. return decoder.decode(chunk)
  4. }
  5. await fetch("/api/action/tooling", {
  6. method: "POST",
  7. body: JSON.stringify(tooling),
  8. }).then(async response => {
  9. onResult(await response.json())
  10. let result = ""
  11. const reader = response.body.getReader()
  12. while (true) {
  13. const { done, value } = await reader.read()
  14. if (done) {
  15. break
  16. }
  17. result += decodeAIStreamChunk(value)
  18. onResult(result)
  19. }
  20. isPending = false
  21. });

服务端实现转发: Java + Spring

WebFlux + Spring Boot

  1. @RestController
  2. public class ChatController {
  3. private WebClient webClient = WebClient.create();
  4. @PostMapping(value = "/api/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  5. public SseEmitter chat(@RequestBody ChatInput input) throws IOException {
  6. SseEmitter emitter = new SseEmitter();
  7. webClient.post()
  8. .uri(REMOTE_URL)
  9. .bodyValue(input)
  10. .exchangeToFlux(response -> {
  11. if (response.statusCode().is2xxSuccessful()) {
  12. return response.bodyToFlux(byte[].class)
  13. .map(String::new)
  14. .doOnNext(string -> {
  15. try {
  16. emitter.send(string);
  17. } catch (IOException e) {
  18. logger.error("Error while sending data: {}", e.getMessage());
  19. emitter.completeWithError(e);
  20. }
  21. })
  22. .doOnComplete(emitter::complete)
  23. .doOnError(emitter::completeWithError);
  24. } else {
  25. emitter.completeWithError(new RuntimeException("Error while calling remote service"));
  26. }
  27. })
  28. .subscribe();
  29. return emitter;
  30. }
  31. }

服务端转发:Python

FastAPI + OpenAI

  1. def generate_reply_stream(input_data: ChatInput):
  2. prompt = input_data.message
  3. try:
  4. prompt = prompt
  5. response = openai.ChatCompletion.create(
  6. model=openai_model,
  7. temperature=temperature,
  8. max_tokens=max_tokens,
  9. n=max_responses,
  10. messages=[
  11. {"role": "user", "content": prompt},
  12. ],
  13. stream=True,
  14. )
  15. except Exception as e:
  16. print("Error in creating campaigns from openAI:", str(e))
  17. raise HTTPException(503, error503)
  18. try:
  19. for chunk in response:
  20. current_content = chunk["choices"][0]["delta"].get("content", "")
  21. yield current_content
  22. except Exception as e:
  23. print("OpenAI Response (Streaming) Error: " + str(e))
  24. raise HTTPException(503, error503)
  25. @app.post("/api/chat", response_class=Response)
  26. async def chat(input_data: ChatInput):
  27. return StreamingResponse(generate_reply_stream(input_data), media_type="text/event-stream")