Streaming OpenAI responses

Drawing of a robot stuck in a bottle with a cork on top. With hands reaching out to it.
Image by Annie Ruygt

This post is about using Elixir to stream OpenAI Chat Response’s in real time! If you want to keep your latency low then check out how to get started. You could be up and running in minutes.

Problem

You are building an application that interfaces with OpenAI’s ChatGPT and want to create a real time interactive experience just like the OpenAI Chat UI.

To do this we will need to work with the ChatGPT Streaming API, which is built using the HTTP Server Sent Events . Elixir is great for real time applications, how can we use the streaming API with Elixir?

Solution

Server Sent Events are a streaming response protocol compatible with HTTP/1.1. A GET request is made to a server and will keep the connection alive sending messages in the format data: <message>\n\n until the connection closes. Browsers handle this by parsing the data line by line and giving you the message stream. If you are curious, yes Plug does support it!

Let’s start by adding the fantastic Req to your dependencies. Req is a high level HTTP Client Library built by Elixir Contributor Wojtek Mach. It builds off of pure Elixir libraries and uses common Elixir idioms and patterns. It also comes with tons of developer UX such has handlers for common response types, streaming requests, and common header values.

Overall if we want a “just works” http client library use Req, if we want something a little lower level use Finch, which is what Req is built on top of. Today we will end up using both!

    {:req, github: "wojtekmach/req"}

We’re using the main branch here until a version > 0.3.6 is deployed. The fine grained control of Streams was just added to Req and will be available with the next version. We could have just used the Finch Library directly but Req is handy enough I still grabbed it!

We’re going to make a single function called gpt_stream that takes a prompt and callback function. And luckily for us the Req library has an example from the documentation that handle’s this case! So building off of that:

defmodule OpenAI do
  def gpt_stream(prompt, cb) do
    fun = fn request, finch_request, finch_name, finch_options ->
      fun = fn
        {:status, status}, response ->
          %{response | status: status}

        {:headers, headers}, response ->
          %{response | headers: headers}

        {:data, data}, response ->
          body =
            data
            |> String.split("data: ")
            |> Enum.map(fn str ->
              str
              |> String.trim()
              |> decode_body(cb)
            end)
            |> Enum.filter(fn d -> d != :ok end)

          old_body = if response.body == "", do: [], else: response.body

          %{response | body: old_body ++ body}
      end

      case Finch.stream(finch_request, finch_name, Req.Response.new(), fun, finch_options) do
        {:ok, response} -> {request, response}
        {:error, exception} -> {request, exception}
      end
    end

    Req.post!("https://api.openai.com/v1/chat/completions",
      json: %{
        # Pick your model here
        model: "gpt-3.5-turbo-0301",
        messages: [%{role: "user", content: prompt}],
        stream: true
      },
      auth: {:bearer, System.fetch_env!("OPENAI_KEY")},
      finch_request: fun
    )
  end

  defp decode_body("", _), do: :ok
  defp decode_body("[DONE]", _), do: :ok
  defp decode_body(json, cb), do: cb.(Jason.decode!(json))
end

Some functions are easier to read from the bottom up, so let’s start there. Req.post!()takes the usual parameters:

  • URL
  • JSON body with arguments
  • auth header with our Bearer token
  • finch_request: this one requires some explaining. Req is a high level HTTP Library built on top of the lower level Finch HTTP library. With this option, we can configure the Finch request handling manually using a function callback. That’s what we’re doing here.

The Finch.stream/5 function takes a callback function where we define how to handle streamed data, headers and the status. Each time returning the response or an error. In our case we handle status by setting the status on the response, headers by setting the headers, and data by calling our callback (cb) function with said data.

The Chat Completions API will return the streamed data in lines with format data: <JSON>\n\ndata: <JSON>... until it returns a data: [DONE] which is a little strange since Server Sent Events end when the connection closes but so it goes! We handle this in our decode_body which checks for empty strings, and [DONE] via pattern matching.

We are also appending the data to the body just in case we want to use it after the stream is complete.

And that’s basically it! We can call our function like so

OpenAI.gpt_stream("How do I train a cat to shake hands?", fn data ->
  IO.inspect(data)
end)

You can do whatever you want with the data, such as sending the data to a pid or PubSub.broadcast it, but I will leave that as an exercise to the reader!

Fly.io ❤️ Elixir

Fly.io is a great way to run your Phoenix LiveView app close to your users. It’s really easy to get started. You can be running in minutes.

Deploy a Phoenix app today!