Skip to content

7. Rx, the Composable Stream

Two things in your program refuse to sit still. An async HTTP call does not return a Response — it returns one later. A configuration value or a UI field changes while the app runs. Both are "a value that isn't simply here right now," and Uni models both with one type: Rx.

This chapter is about that type and the handful of operators that make it compose.

A value that changes

The simplest Rx is a variable you can watch:

scala
import wvlet.uni.rx.Rx

val count = Rx.variable(0)

count.subscribe { v =>
  println(s"count is now ${v}")
}

count := 1
count := 2
bash
$ sbt run
count is now 0
count is now 1
count is now 2

subscribe runs your function once with the current value and again on every change. := sets a new value. You are not polling and you are not wiring callbacks by hand — you describe what should happen when the value changes, and Rx makes it happen.

Walking the code

Rx[A] is a description, not a result. An Rx[A] says "this will produce zero or more A values over time." It does nothing until someone runs it — subscribe (react to every value) or run (run an effect) or await (block for the value, JVM only). Nothing fires until then, which means you can build an Rx pipeline freely and pay only when you attach a consumer.

subscribe hands you a Cancelable. Watching a stream is a resource: it keeps a subscription alive. The return value is a Cancelable you call when you're done. This is the one piece of bookkeeping Rx asks of you — hold the handle, cancel it when the work that needed it ends.

scala
val sub = count.subscribe(v => println(v))
// ... later, when you no longer care ...
sub.cancel

Composing without unwrapping

The point of Rx is that you transform the stream, not the values one at a time. The operators look exactly like the ones on Option and List, because they mean the same thing:

scala
import wvlet.uni.rx.Rx

val count = Rx.variable(0)

val label: Rx[String] =
  count
    .filter(_ % 2 == 0)        // keep even values
    .map(n => s"even: ${n}")   // turn each into a label

label.subscribe(println)

count := 1   // filtered out — nothing prints
count := 2   // prints: even: 2

map transforms each value, filter drops the ones you don't want, and flatMap lets one stream depend on another (run a second Rx for each value of the first). You build a pipeline once; every value that flows through count takes the whole path. The operators reference lists the full set, including zip to combine streams and recover to handle errors.

Where Rx shows up: async results

Back in Chapter 2 the synchronous client returned an HttpResponse directly. The async client returns the value-over-time version of exactly that:

scala
import wvlet.uni.http.{Http, Request}

val client = Http.client.newAsyncClient

val title: Rx[String] =
  client
    .send(Request.get("https://example.com"))   // Rx[HttpResponse]
    .map(_.status.toString)                      // Rx[String]

title.subscribe(println)

The HTTP response is "a value that isn't here yet," so the async client gives you an Rx[HttpResponse]. You compose it with the same map you used on the counter — there is no separate callback API, no second mental model for "async" versus "reactive." One type covers both, which is why Rx is the backbone of async code across Uni: HTTP, WebSockets, and server-sent events all surface as Rx.

Why Rx instead of Future or callbacks

A Future starts running the moment you create it and can only ever deliver one value. A callback delivers many values but composes badly — nest two and you have a pyramid. Rx is chosen to avoid both traps:

  • Lazy. Building an Rx runs nothing. You assemble the pipeline, then decide when (and whether) to start it. A Future has already left the gate.
  • One-shot and streaming. The same type and operators serve a single async result and an endless event stream. You learn one thing.
  • Cancellable. subscribe/run return a Cancelable, so a stream you no longer need stops cleanly — no leaked timer, no listener firing into a dead component.
  • Cross-platform. Rx runs the same on the JVM, in the browser, and on Native, so async code moves between platforms unchanged.

That cancellation handle is also how Rx cooperates with Design: give a session ownership of a subscription and closing the session cancels it, so streams wind down with the objects that created them. The concurrency reference covers the threading and cancellation model in depth.

What you have, what comes next

You can now describe values that change and results that arrive later with one tool:

  • Rx[A] is a lazy description of zero or more A over time; nothing runs until you subscribe/run/await.
  • Rx.variable + := is a value you can watch; map / filter / flatMap compose the stream.
  • subscribe returns a Cancelable — hold it, cancel it when done.
  • The async HTTP client returns Rx[HttpResponse], so async and reactive code share one model.

Next, Chapter 8 is about what happens when those calls fail — retries, circuit breakers, and resource lifetimes that clean up after themselves.

← 6. Data In, Data Out | Next → 8. Living With Failure

Released under the Apache 2.0 License.