Yandex Cloud
Search
Contact UsGet started
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
  • All Services
  • System Status
    • Featured
    • Infrastructure & Network
    • Data Platform
    • Containers
    • Developer tools
    • Serverless
    • Security
    • Monitoring & Resources
    • AI for business
    • Business tools
  • All Solutions
    • By industry
    • By use case
    • Economics and Pricing
    • Security
    • Technical Support
    • Start testing with double trial credits
    • Cloud credits to scale your IT product
    • Gateway to Russia
    • Cloud for Startups
    • Center for Technologies and Society
    • Yandex Cloud Partner program
  • Pricing
  • Customer Stories
  • Documentation
  • Blog
© 2025 Direct Cursus Technology L.L.C.
Yandex AI Studio
    • All guides
    • Disabling request logging
    • Getting an API key
    • Image generation
    • Batch processing
      • Creating a voice agent via Realtime API
      • Creating a simple text agent
      • Creating a text agent with a function call
      • Creating a text agent with file search
      • Managing Vector Store search index
    • About Yandex AI Studio
    • Yandex Workflows
    • Quotas and limits
    • Terms and definitions
  • Compatibility with OpenAI
  • Access management
  • Pricing policy
  • Audit Trails events
  • Public materials
  • Release notes

In this article:

  • User-agent interaction diagram
  • Getting started
  • Create a voice agent
  1. Step-by-step guides
  2. Developing AI agents
  3. Creating a voice agent via Realtime API

Creating a WebSocket-based voice agent via the Realtime API

Written by
Yandex Cloud
Updated at November 12, 2025
  • User-agent interaction diagram
  • Getting started
  • Create a voice agent

Realtime API is an event-driven AI Studio interface for real-time voice interaction between the server and the client over a WebSocket transport.

With Realtime API, you can create a voice agent that will analyze the audio stream from the microphone in real time, provide voice requests for processing to the speech-realtime-250923 model, collect the model's response, and play it through the speakers.

The model can invoke various tools to process user requests. The response to the request is returned both as synthesized speech and in text format.

User-agent interaction diagramUser-agent interaction diagram

In the example below, the user interacts with the agent as follows:

  1. The user pronounces a request, the microphone records the stream, and the uplink() function sends audio fragments to the agent.

  2. The server (speech-realtime-250923) detects episodes of silence (VAD) and returns recognized text in the conversation.item.input_audio_transcription.completed event. The agent logs the resulting text.

  3. The model generates a response (response.created) and starts returning text deltas (response.output_text.delta) and audio deltas (response.output_audio.delta).

  4. The downlink() function takes the text of response to the screen and plays back the audio via AudioOut.

    However, if the user starts pronouncing a new request, the response playback is interrupted, and a brand new user request processing iteration starts.

Getting startedGetting started

To use an example:

Python
  1. Create a service account and assign it the ai.models.user role for the folder.

  2. Get an API key for the service account you are going to use to work with the Realtime API. For more information, see Setting up access to Yandex AI Studio with API keys.

  3. Install Python 3.10 or higher.

  4. Install Python venv to create isolated virtual environments in Python.

  5. Create a new Python virtual environment and activate it:

    python3 -m venv new-env
    source new-env/bin/activate
    
  6. Install the required libraries using the pip package manager:

    pip install numpy sounddevice aiohttp
    

    Additional info on installed libraries:

    • aiohttp: Asynchronous HTTP and WebSocket clients. Requires library version 3.9.0 or higher.
    • numpy: Working with audio arrays and transformations. Requires library version 1.26.0 or higher.
    • sounddevice: Recording and playing sound via PortAudio. Requires library version 0.4.6 or higher.

Create a voice agentCreate a voice agent

To create a WebSocket-based voice agent:

Python
  1. Create a file named voice-agent.py and paste the following code into it:

    Voice agent code
    import json
    import base64
    import asyncio
    import threading
    import time
    import random
    import sys
    from typing import Optional
    
    import numpy as np
    import sounddevice as sd
    import aiohttp
    
    # ==== Configuration ====
    YANDEX_CLOUD_API_KEY = "<service_account_API_key>"
    YANDEX_CLOUD_FOLDER_ID = "<folder_ID>"
    
    # Configuring tools
    VECTOR_STORE_ID = "<search_index_ID>"
    
    # Checking for key and folder ID
    assert (
        YANDEX_CLOUD_FOLDER_ID and YANDEX_CLOUD_API_KEY
    ), "YANDEX_CLOUD_FOLDER_ID and YANDEX_CLOUD_API_KEY are required"
    
    # API settings
    WS_URL = (
        f"wss://rest-assistant.api.cloud.yandex.net/v1/realtime/openai"
        f"?model=gpt://{YANDEX_CLOUD_FOLDER_ID}/speech-realtime-250923"
    )
    HEADERS = {"Authorization": f"api-key {YANDEX_CLOUD_API_KEY}"}
    
    # Configuring audio for a server
    IN_RATE = 24000
    OUT_RATE = 44100
    CHANNELS = 1
    VOICE = "dasha"
    
    # Configuring audio for a local device
    FRAME_MS = 20
    IN_SAMPLES = int(IN_RATE * FRAME_MS / 1000)
    OUT_BLOCK = int(OUT_RATE * 0.02)
    
    
    # ======== Auxiliary functions ========
    
    
    # Converts audio from `float32` to `PCM16`
    def float_to_pcm16(data: np.ndarray) -> bytes:
        data = np.clip(data, -1.0, 1.0)
        return (data * 32767).astype(np.int16).tobytes()
    
    
    # Decodes a base64 string into bytes
    def b64_decode(s: str) -> bytes:
        return base64.b64decode(s)
    
    
    # Encodes bytes into a base64 string
    def b64_encode(b: bytes) -> str:
        return base64.b64encode(b).decode("ascii")
    
    
    # Thread-safe log output
    def log(*args):
        print(*args)
        sys.stdout.flush()
    
    
    # Generates fake weather data JSONs
    def fake_weather(city: str) -> str:
        rng = random.Random(hash(city) & 0xFFFFFFFF)
    
        weather_data = {
            "city": city,
            "temperature_c": rng.randint(-10, 35),
            "condition": rng.choice(["clear", "cloudy", "rain", "snow", "thunderstorm", "fog"]),
            "wind_m_s": round(rng.uniform(0.5, 10.0), 1),
            "advice": rng.choice(
                [
                    "Take along a light jacket.",
                    "An umbrella will come in handy.",
                    "Drink water and avoid the sun.",
                    "The roads are slippery, so be careful.",
                    "It's windy, a hood will be a good idea.",
                ]
            ),
        }
    
        return json.dumps(weather_data, ensure_ascii=False)
    
    
    # ======== Audio output class ========
    class AudioOut:
    
        # Manages the output audio stream, buffered
        def __init__(self, rate: int = OUT_RATE):
            import queue
    
            self.queue = queue.Queue(maxsize=100)
            self.closed = False
            self.stream = sd.RawOutputStream(
                samplerate=rate,
                channels=1,
                dtype="int16",
                blocksize=OUT_BLOCK,
                latency="low",
                callback=self._audio_callback,
            )
            self.stream.start()
    
        # Audio stream callback
        def _audio_callback(self, outdata, frames, time_info, status):
            try:
                chunk = self.queue.get_nowait()
            except Exception:
                outdata[:] = b"\x00" * (frames * 2)
                return
    
            need = frames * 2
    
            if len(chunk) >= need:
                outdata[:] = chunk[:need]
                rest = chunk[need:]
                if rest:
                    try:
                        self.queue.queue.appendleft(rest)
                    except Exception:
                        pass
            else:
                outdata[: len(chunk)] = chunk
                outdata[len(chunk) :] = b"\x00" * (need - len(chunk))
    
        # Adds audio data to the output queue
        def write(self, pcm16_bytes: bytes):
            if self.closed or not pcm16_bytes:
                return
    
            try:
                self.queue.put_nowait(pcm16_bytes)
            except Exception:
                self.clear()
    
        # Clears the audio buffer
        def clear(self):
            try:
                with self.queue.mutex:
                    self.queue.queue.clear()
            except Exception:
                pass
    
        # Closes the audio stream
        def close(self):
            if self.closed:
                return
    
            self.closed = True
            try:
                self.stream.stop()
                self.stream.close()
            except Exception:
                pass
    
    
    # ======== Microphone recording class ========
    class MicStreamer:
        # Sends audio from a microphone to an async queue
        def __init__(self, output_queue: asyncio.Queue, can_stream_event: asyncio.Event):
            self.output_queue = output_queue
            self.running = False
            self.loop: Optional[asyncio.AbstractEventLoop] = None
            self.can_stream_event = can_stream_event
    
        # Starts a microphone recording stream in a separate thread
        def start(self):
            self.loop = asyncio.get_running_loop()
            self.running = True
            threading.Thread(target=self._streaming_loop, daemon=True).start()
    
        # Main loop for recording audio from a microphone
        def _streaming_loop(self):
            try:
                with sd.InputStream(
                    samplerate=IN_RATE,
                    channels=1,
                    dtype="float32",
                    blocksize=IN_SAMPLES,
                    latency="low",
                ) as stream:
                    while self.running:
                        if not self.can_stream_event.is_set():
                            time.sleep(0.01)
                            continue
    
                        data, _ = stream.read(IN_SAMPLES)
                        pcm = float_to_pcm16(data.reshape(-1))
    
                        if self.loop and not self.loop.is_closed():
                            future = asyncio.run_coroutine_threadsafe(
                                self.output_queue.put(pcm), self.loop
                            )
                            try:
                                future.result(timeout=0.2)
                            except Exception:
                                pass
            except Exception as e:
                log("[MIC ERROR]", e)
    
        # Stops microphone recording
        def stop(self):
            self.running = False
    
    
    # ======== Main app ========
    
    
    # Main loop of the app
    async def main():
        session = aiohttp.ClientSession()
        ws = await session.ws_connect(WS_URL, headers=HEADERS, heartbeat=20.0)
        log("Connected to Realtime API.")
    
        # Configuring the session
        await ws.send_json(
            {
                "type": "session.update",
                "session": {
                    "instructions": (
                        "You are a polite English-speaking assistant. Answer briefly."
                        "If asked to tell the news, use the web_search function."
                        "If asked about the weather, call the get_weather function."
                        "If asked about {voice profiler}, use the file_search function"
                    ),
                    # Response format configuration: response can contain both text and audio
                    "output_modalities": ["text", "audio"],
                    "audio": {
                        "input": {
                            # Input audio format
                            "format": {
                                "type": "audio/pcm",
                                "rate": IN_RATE,
                                "channels": CHANNELS,
                            },
                            # Server VAD configuration
                            "turn_detection": {
                                "type": "server_vad",  # enabling sever VAD
                                "threshold": 0.5,  # sensitivity
                                "silence_duration_ms": 400,  # duration of silence to complete a speech fragment
                            },
                        },
                        # Output audio format
                        "output": {
                            "format": {"type": "audio/pcm", "rate": OUT_RATE},
                            "voice": VOICE,
                        },
                    },
                    # "tool_choice": {"type": "auto"}
                    # Tools for the agent
                    "tools": [
                        # Model function to demo function calls
                        {
                            "type": "function",
                            "name": "get_weather",
                            "description": "Get a brief weather report for the city.",
                            "parameters": {
                                "type": "object",
                                "properties": {"city": {"type": "string"}},
                                "required": ["city"],
                                "additionalProperties": False,
                            },
                        },
                        # Internet search tool
                        {
                            "type": "function",
                            "name": "web_search",
                            "description": "Web search",
                            "parameters": "{}",
                        },
                        # File system search tool
                        {
                            "type": "function",
                            "name": "file_search",  # required function name
                            "description": VECTOR_STORE_ID,  # index ID
                            "parameters": "{}",
                        },
                    ],
                },
            }
        )
    
        # Initializing audio components
        mic_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=50)
        can_stream_input = asyncio.Event()
        can_stream_input.set()
    
        mic = MicStreamer(mic_queue, can_stream_input)
        mic.start()
    
        audio_out = AudioOut()
        session_id = None
    
        # Managing text-to-speech conversion "epochs"
        play_epoch = 0
        current_response_epoch = None
    
        # Sending audio from microphone to the server
        async def uplink():
            try:
                while True:
                    await can_stream_input.wait()
                    pcm = await mic_queue.get()
                    await ws.send_json(
                        {"type": "input_audio_buffer.append", "audio": b64_encode(pcm)}
                    )
            except asyncio.CancelledError:
                pass
    
        # Receiving and processing messages from the server
        async def downlink():
            nonlocal session_id, play_epoch, current_response_epoch
    
            async for msg in ws:
                if msg.type != aiohttp.WSMsgType.TEXT:
                    continue
    
                message = json.loads(msg.data)
                msg_type = message.get("type")
    
                if msg_type not in {
                    "input_audio_buffer.commit",
                    "response.output_text.delta",
                    "conversation.item.input_audio_transcription.completed",
                    "session.created",
                }:
                    log(f"on_message: {msg_type}")
    
                # User's recognized text
                if msg_type == "conversation.item.input_audio_transcription.completed":
                    transcript = message.get("transcript", "")
                    if transcript:
                        log(f"on_message: {msg_type} [user (transcript): {transcript}]")
                    continue
    
                # Text the server sends for text-to-speech conversion
                if msg_type == "response.output_text.delta":
                    delta = message.get("delta", "")
                    if delta:
                        log(f"on_message: {msg_type} [agent (text): {delta}]")
                    continue
    
                # Logging the session ID
                if msg_type == "session.created":
                    session_id = (message.get("session") or {}).get("id")
                    log(f"on_message: {msg_type} [session.id = {session_id}]")
                    continue
    
                # Interrupting the agent's current response if the user starts speaking
                if msg_type == "input_audio_buffer.speech_started":
                    play_epoch += 1
                    current_response_epoch = None
                    audio_out.clear()
                    continue
    
                # Start of the agent's new response
                if msg_type == "response.created":
                    current_response_epoch = play_epoch
                    continue
    
                # Arrival of audio data from the agent
                if msg_type == "response.output_audio.delta":
                    if current_response_epoch == play_epoch:
                        audio_out.write(b64_decode(message["delta"]))
                    continue
    
                # Ending a function call
                if msg_type == "response.output_item.done":
                    item = message.get("item") or {}
    
                    if item.get("type") == "function_call":
                        call_id = item.get("call_id")
                        args_text = item.get("arguments") or "{}"
    
                        try:
                            args = json.loads(args_text)
                        except Exception:
                            args = {}
    
                        city = (args.get("city") or "Moscow").strip()
                        weather_json = fake_weather(city)
    
                        payload_item = {
                            "type": "conversation.item.create",
                            "item": {
                                "type": "function_call_output",
                                "call_id": call_id,
                                "output": weather_json,
                            },
                        }
    
                        log(
                            "[conversation.item.create(function_call_output):",
                            json.dumps(payload_item, ensure_ascii=False),
                        )
                        await ws.send_json(payload_item)
    
                        # Request for a new agent response
                        log("Sending response.create after function")
                        await ws.send_json(
                            {
                                "type": "response.create",
                                "response": {
                                    "modalities": ["audio", "text"],
                                    "conversation": "default",
                                },
                            }
                        )
                    continue
    
                # Handling errors
                if msg_type == "error":
                    log("SERVER ERROR:", json.dumps(message, ensure_ascii=False))
                    continue
    
            log("WS connection closed")
    
        log("Speak (server VAD). Press Ctrl+C to exit.")
    
        uplink_task = asyncio.create_task(uplink())
        downlink_task = asyncio.create_task(downlink())
    
        try:
            while True:
                await asyncio.sleep(3600)
        except KeyboardInterrupt:
            pass
        finally:
            uplink_task.cancel()
            downlink_task.cancel()
            mic.stop()
            audio_out.close()
            await ws.close()
            await session.close()
            log("\nExit.")
    
    
    if __name__ == "__main__":
        asyncio.run(main())
    

    Where:

    • Configuration:

      • YANDEX_CLOUD_API_KEY: API key of the service account used to call Realtime API methods.
      • YANDEX_CLOUD_FOLDER_ID: ID of the folder the Realtime API is called in.
      • VECTOR_STORE_ID: Search index ID for built-in search through knowledge base files.
      • WS_URL: Realtime API WebSocket endpoint.
      • IN_RATE and OUT_RATE: Audio input and output sampling rates.
      • VOICE: Voice used for the agent's response.
    • Auxiliary functions:

      • float_to_pcm16(): Converts audio data from float32 (microphone) to PCM16.
      • b64_encode() and b64_decode(): Base64 encoding and decoding audio fragments for transmission over WebSocket.
      • log(): Thread-safe log output.
      • fake_weather(): Demo function simulating a weather API. Returns a response containing weather information and advice in JSON format.
    • Classes:

      • The AudioOut class manages sound playback. Creates a stream named sounddevice.RawOutputStream and a buffer to which PCM blocks received from the server are added. If the queue is empty, silence is played back.
      • The MicStreamer class manages the capturing of audio from the microphone in a separate stream and sending audio fragments to an asynchronous queue (asyncio.Queue). From this queue, the data then goes to the server in the uplink() function.
    • Asynchronous functions:

      • uplink(): Continuously receives audio fragments from the queue and sends them to the server in the following format:

        {"type": "input_audio_buffer.append", "audio": "<base64>"}
        
      • downlink(): Processes incoming messages from Realtime API:

        • conversation.item.input_audio_transcription.completed: Recognized text of the user's request.
        • response.output_text.delta: Fragment of the agent's response in text format.
        • response.output_audio.delta: Fragment of the agent's response in audio format (synthesized speech).
        • response.output_item.done: Results of a completed function call (in the example, get_weather).
        • error: Error messages.
        • input_audio_buffer.speech_started: Start of a new user request (stops playing back the agent's current response).
      • Main stream main():

        • Connects to Realtime API.
        • Sets up the session: specifies the model, audio parameters, and tools.
        • Starts two background tasks (asynchronous uplink and downlink functions).
        • Processes Ctrl + C events (graceful termination of connection).
  2. Run the file you created:

    python3 voice-agent.py
    

    Once you execute the code, the agent will await voice requests and respond to them with text on the screen and simultaneous audio playback through computer speakers.

    To interrupt code execution, use the Ctrl + C keyboard shortcut.

See alsoSee also

  • Voice agents

Was the article helpful?

Previous
Tuning an embedding model
Next
Creating a simple text agent
© 2025 Direct Cursus Technology L.L.C.