Streaming API
Introduction
The Streaming API in the Koog framework lets you process structured data from Large Language Models (LLMs) as it arrives, rather than waiting for the entire response. This page explains how to use the Streaming API to efficiently handle structured data in Markdown format.
Streaming API overview
The Streaming API enables real-time processing of structured data from LLM responses. Instead of waiting for the complete response, you can:
- Process data as it arrives in chunks
- Parse structured information on the fly
- Emit structured objects as they are completed
- Handle these objects immediately (collect them or pass to tools)
This approach is particularly useful as it provides the following benefits:
- Improving responsiveness in user interfaces
- Processing large responses efficiently
- Implementing real-time data processing pipelines
The Streaming API allows parsing the output as structured data from the .md format or as a set of plain text chunks.
Working with a raw string stream
It is important to note that you can parse the output by working directly with a raw string stream. This approach gives you more flexibility and control over the parsing process.
Here is a raw string stream with the Markdown definition of the output structure:
fun markdownBookDefinition(): MarkdownStructuredDataDefinition {
return MarkdownStructuredDataDefinition("name", schema = { /*...*/ })
}
val mdDefinition = markdownBookDefinition()
llm.writeSession {
val stream = requestLLMStreaming(mdDefinition)
// Access the raw string chunks directly
stream.collect { chunk ->
// Process each chunk of text as it arrives
println("Received chunk: $chunk") // The chunks together will be structured as a text following the mdDefinition schema
}
}
This is an example of a raw string stream without the definition:
llm.writeSession {
val stream = requestLLMStreaming()
// Access the raw string chunks directly
stream.collect { chunk ->
// Process each chunk of text as it arrives
println("Received chunk: $chunk") // The chunks will not be structured in a specific way
}
}
Working with a stream of structured data
Although it is possible to work with a raw string stream, it is often more convenient to work with structured data.
The structured data approach includes the following key components:
- MarkdownStructuredDataDefinition: a class to help you define the schema and examples for structured data in Markdown format.
- markdownStreamingParser: a function to create a parser that processes a stream of Markdown chunks and emits events.
The sections below provide step-by-step instructions and code samples related to processing a stream of structured data.
1. Define your data structure
First, define a data class to represent your structured data:
2. Define the Markdown structure
Create a definition that specifies how your data should be structured in Markdown with the
MarkdownStructuredDataDefinition
class:
fun markdownBookDefinition(): MarkdownStructuredDataDefinition {
return MarkdownStructuredDataDefinition("bookList", schema = {
markdown {
header(1, "bookName")
bulleted {
item("author")
item("description")
}
}
}, examples = {
markdown {
header(1, "The Great Gatsby")
bulleted {
item("F. Scott Fitzgerald")
item("A novel set in the Jazz Age that tells the story of Jay Gatsby's unrequited love for Daisy Buchanan.")
}
}
})
}
3. Create a parser for your data structure
The markdownStreamingParser
provides several handlers for different Markdown elements:
markdownStreamingParser {
// Handle level 1 headings
// The heading level can be from 1 to 6
onHeader(1) { headerText ->
// Process heading text
}
// Handle bullet points
onBullet { bulletText ->
// Process bullet text
}
// Handle code blocks
onCodeBlock { codeBlockContent ->
// Process code block content
}
// Handle lines matching a regex pattern
onLineMatching(Regex("pattern")) { line ->
// Process matching lines
}
// Handle the end of the stream
onFinishStream { remainingText ->
// Process any remaining text or perform cleanup
}
}
Using the defined handlers, you can implement a function that parses the Markdown stream and emits your data objects
with the markdownStreamingParser
function.
fun parseMarkdownStreamToBooks(markdownStream: Flow<String>): Flow<Book> {
return flow {
markdownStreamingParser {
var currentBookName = ""
val bulletPoints = mutableListOf<String>()
// Handle the event of receiving the Markdown header in the response stream
onHeader(1) { headerText ->
// If there was a previous book, emit it
if (currentBookName.isNotEmpty() && bulletPoints.isNotEmpty()) {
val author = bulletPoints.getOrNull(0) ?: ""
val description = bulletPoints.getOrNull(1) ?: ""
emit(Book(currentBookName, author, description))
}
currentBookName = headerText
bulletPoints.clear()
}
// Handle the event of receiving the Markdown bullets list in the response stream
onBullet { bulletText ->
bulletPoints.add(bulletText)
}
// Handle the end of the response stream
onFinishStream {
// Emit the last book, if present
if (currentBookName.isNotEmpty() && bulletPoints.isNotEmpty()) {
val author = bulletPoints.getOrNull(0) ?: ""
val description = bulletPoints.getOrNull(1) ?: ""
emit(Book(currentBookName, author, description))
}
}
}.parseStream(markdownStream)
}
}
4. Use the parser in your agent strategy
val agentStrategy = strategy("library-assistant") {
// Describe the node containing the output stream parsing
val getMdOutput by node<String, String> { input ->
val books = mutableListOf<Book>()
val mdDefinition = markdownBookDefinition()
llm.writeSession {
updatePrompt { user(input) }
// Initiate the response stream in the form of the definition `mdDefinition`
val markdownStream = requestLLMStreaming(mdDefinition)
// Call the parser with the result of the response stream and perform actions with the result
parseMarkdownStreamToBooks(markdownStream).collect { book ->
books.add(book)
println("Parsed Book: ${book.bookName} by ${book.author}")
}
}
// A custom function for output formatting
formatOutput(books)
}
// Describe the agent's graph making sure the node is accessible
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)
}
Advanced usage: Streaming with tools
You can also use the Streaming API with tools to process data as it arrives. The following sections provide a brief step-by-step guide on how to define a tool and use it with streaming data.
1. Define a tool for your data structure
class BookTool(): SimpleTool<Book>() {
companion object {
const val NAME = "book"
}
override suspend fun doExecute(args: Book): String {
println("${args.bookName} by ${args.author}:\n ${args.description}")
return "Done"
}
override val argsSerializer: KSerializer<Book>
get() = Book.serializer()
override val descriptor: ToolDescriptor
get() = ToolDescriptor(
name = NAME,
description = "A tool to parse book information from Markdown",
requiredParameters = listOf(),
optionalParameters = listOf()
)
}
2. Use the tool with streaming data
val agentStrategy = strategy("library-assistant") {
val getMdOutput by node<String, String> { input ->
val mdDefinition = markdownBookDefinition()
llm.writeSession {
updatePrompt { user(input) }
val markdownStream = requestLLMStreaming(mdDefinition)
parseMarkdownStreamToBooks(markdownStream).collect { book ->
callToolRaw(BookTool.NAME, book)
/* Other possible options:
callTool(BookTool::class, book)
callTool<BookTool>(book)
findTool(BookTool::class).execute(book)
*/
}
// We can make parallel tool calls
parseMarkdownStreamToBooks(markdownStream).toParallelToolCallsRaw(BookTool::class).collect()
}
""
}
edge(nodeStart forwardTo getMdOutput)
edge(getMdOutput forwardTo nodeFinish)
}
3. Register the tool in your agent configuration
val toolRegistry = ToolRegistry {
tool(BookTool())
}
val runner = AIAgent(
promptExecutor = simpleOpenAIExecutor(token),
toolRegistry = toolRegistry,
strategy = agentStrategy,
agentConfig = agentConfig
)
Best practices
-
Define clear structures: create clear and unambiguous markdown structures for your data.
-
Provide good examples: include comprehensive examples in your
MarkdownStructuredDataDefinition
to guide the LLM. -
Handle incomplete data: always check for null or empty values when parsing data from the stream.
-
Clean up resources: use the
onFinishStream
handler to clean up resources and process any remaining data. -
Handle errors: implement proper error handling for malformed Markdown or unexpected data.
-
Testing: test your parser with various input scenarios, including partial chunks and malformed input.
-
Parallel processing: for independent data items, consider using parallel tool calls for better performance.