Using Genkit Flows with Docker Model Runtime to create a REST API with streaming

Prerequisites:
- Having read the previous blog post First Contact with Genkit and Docker Model Runtime
- Having this model:
ai/qwen2.5:0.5B-F16(use the commanddocker model pull ai/qwen2.5:0.5B-F16)
Introduction: what are Flows?
Genkit "flows" are constructs that allow you to define AI workflows as Go functions to facilitate the development of AI functionalities in your application.
They serve to orchestrate the steps: context retrieval, calls to generative models, pre/post-model processing, response verification, etc.
Key characteristics
- It's possible to test, debug and trace each flow independently of the application.
- Any flow can be exposed as a web API endpoint.
- Each flow is an ordinary Go function.
What interests us today is the 2nd point: the ability to expose a flow as a web API endpoint.
Flows can also be used in streaming mode: the output is transmitted to the client as it's generated.
Flow definition with genkit.DefineStreamingFlow()
We'll define a flow using the DefineStreamingFlow method. I need a REST API that allows me to send a user message to an AI model, and I want to receive the response in streaming mode.
I need to initialize a Genkit instance:
ctx := context.Background()
g := genkit.Init(ctx, genkit.WithPlugins(&openai.OpenAI{
APIKey: "tada",
Opts: []option.RequestOption{
option.WithBaseURL("http://localhost:12434/engines/v1/"),
},
}))
Then the flow definition is rather simple. I'll reuse the streaming completion I already used in the previous blog post, but this time in a flow.
// Definition of a streaming flow
streamingChatFlow := genkit.DefineStreamingFlow(
g,
"streaming-chat",
func(ctx context.Context, input *ChatRequest, callback core.StreamCallback[string]) (*ChatResponse, error) {
fullResponse, err := genkit.Generate(ctx, g,
ai.WithModelName("openai/ai/qwen2.5:0.5B-F16"),
ai.WithMessages(
ai.NewSystemTextMessage("You are the dungeon master of a D&D game."),
ai.NewUserTextMessage(input.Message),
),
ai.WithConfig(map[string]any{"temperature": 0.7}),
ai.WithStreaming(func(ctx context.Context, chunk *ai.ModelResponseChunk) error {
// Display each chunk as it arrives
fmt.Print(chunk.Text())
if callback != nil {
if err := callback(ctx, chunk.Text()); err != nil {
return fmt.Errorf("error sending chunk: %w", err)
}
}
return nil
}),
)
if err != nil {
return nil, err
}
// Return the complete response for non-streaming clients
return &ChatResponse{Response: fullResponse.Text()}, nil
},
)
How it works:
- Input: Receives a user message via
input *ChatRequest - Preparation: Uses the qwen2.5:0.5B-F16 model with:
- A system prompt defining system instructions
- The user message as input
- A temperature of 0.7 for creativity
- Real-time streaming: Each generated text chunk is:
- Displayed immediately (
fmt.Print(chunk.Text())) - Sent to the client via the
callback(callback core.StreamCallback[string]) for real-time updates
- Displayed immediately (
All that's left is to expose this flow as a web API endpoint.
Exposing the flow as a web API endpoint
This is extremely simple: just use genkit.Handler() to create an HTTP handler from the flow, then attach it to an HTTP router:
mux := http.NewServeMux()
mux.HandleFunc("POST /completion", genkit.Handler(streamingChatFlow))
All that's left is to start the HTTP server:
log.Println("Server started on http://127.0.0.1:3500")
log.Fatal(server.Start(ctx, "127.0.0.1:3500", mux))
Then, just launch the server with go run main.go
And finally, call the API with curl:
#!/bin/bash
SERVICE_URL=${SERVICE_URL:-http://localhost:3500/completion}
read -r -d '' USER_CONTENT <<- EOM
[Brief] Generate a D&D NPC Dwarf Rogue with a mysterious past
EOM
read -r -d '' DATA <<- EOM
{
"data": {
"message":"${USER_CONTENT}"
}
}
EOM
# Remove newlines from DATA
DATA=$(echo ${DATA} | tr -d '\n')
callback() {
echo -ne "$1"
}
unescape_quotes() {
local str="$1"
str="${str//\\\"/\"}" # Replace \" by "
echo "$str"
}
remove_quotes() {
local str="$1"
str="${str%\"}" # remove " at the end
str="${str#\"}" # remove " at start
echo "$str"
}
curl --no-buffer --silent ${SERVICE_URL} \
-H "Content-Type: application/json" \
-H "Accept: text/event-stream" \
-d "${DATA}" \
| while IFS= read -r line; do
if [[ $line == data:* ]]; then
#echo "🤖> ${line#data: }"
json_data="${line#data: }"
content_chunk=$(echo "$json_data" | jq '.message // "null"' 2>/dev/null)
result=$(remove_quotes "$content_chunk")
clean_result=$(unescape_quotes "$result")
callback "$clean_result"
fi
done
You'll get a streaming response, as the model generates the text, like this:
The Dwarf Rogue, named **Erik**, has a long and storied past. Born into a noble family of dwarves, Erik was raised with a deep respect for the land and its history. As he grew older, he began to see the world differently, noticing the hidden dangers and mysteries of the dwarven realm.
Erik's most famous adventure was a quest to uncover the origins of a legendary treasure, which he believed to be guarded by a powerful guardian. The treasure was said to be the key to unlocking the hidden depths of the dwarven world.
However, Erik soon discovered that the treasure was actually a trap. The guardian, a fearsome figure with a menacing demeanor, had been set to destroy the treasure for a secret mission. As Erik fought the guardian, he was forced to confront the true nature of his family and the dark secrets that had been buried for generations.
Erik learned that his past had been marked by treachery and violence, and that his family had been responsible for the destruction of the dwarven world. He grew angry and determined to take revenge, even if it meant risking everything he had known.
With the help of his trusted friends, Erik embarked on a journey to find the true treasure and a way to break free from his family's control. As he journeyed through the treacherous lands of the dwarven world, Erik faced many challenges and dangers, but he remained steadfast in his determination to uncover the truth.
In the end, Erik emerged victorious and free from his family's control, having learned a valuable lesson about the importance of family and loyalty. He returned to his home, where he found himself facing an uncertain future, but with a newfound sense of purpose and a sense of belonging to a larger, more complex world.
Once again, we see that Genkit truly simplifies life when developing Generative AI applications. Next time we'll see how to use embedding models for similarity search.