Stream Operators
uni provides a rich set of operators for transforming and combining reactive streams.
Transformation
map
Transform each value:
scala
val doubled = numbers.map(_ * 2)flatMap
Transform to another stream:
scala
val expanded = numbers.flatMap(n => Rx.fromSeq(1 to n))filter
Keep matching values:
scala
val evens = numbers.filter(_ % 2 == 0)transform
Handle both success and failure:
scala
val handled = stream.transform {
case Success(v) => processValue(v)
case Failure(e) => handleError(e)
}Combining Streams
zip
Combine when both emit (waits for pairs):
scala
val a = Rx.fromSeq(1 to 3)
val b = Rx.fromSeq("a", "b", "c")
a.zip(b).subscribe { (num, letter) =>
println(s"${num}-${letter}")
}
// 1-a, 2-b, 3-cjoin
Combine on any change (latest values):
scala
val x = Rx.variable(1)
val y = Rx.variable(2)
x.join(y).subscribe { (a, b) =>
println(s"${a} + ${b} = ${a + b}")
}
x := 10 // Prints: 10 + 2 = 12
y := 20 // Prints: 10 + 20 = 30concat
Chain streams sequentially:
scala
val first = Rx.fromSeq(1 to 3)
val second = Rx.fromSeq(4 to 6)
first.concat(second).subscribe(println)
// 1, 2, 3, 4, 5, 6Error Handling
recover
Replace errors with values:
scala
val safe = risky.recover {
case _: NetworkError => defaultValue
}recoverWith
Replace errors with another stream:
scala
val withFallback = primary.recoverWith {
case _: NetworkError => fallbackStream
}transformFailure
Transform specific errors:
scala
val mapped = stream.transformFailure {
case e: IOException => new NetworkException(e)
}Flow Control
take
Limit emissions:
scala
val first5 = stream.take(5)throttleFirst
Emit first in time window:
scala
import java.util.concurrent.TimeUnit
// Prevent double-clicks
val throttled = clicks.throttleFirst(500, TimeUnit.MILLISECONDS)throttleLast / sample
Emit last in time window:
scala
val sampled = stream.sample(100, TimeUnit.MILLISECONDS)timeout
Fail if no value in time:
scala
val withTimeout = stream.timeout(5, TimeUnit.SECONDS)Caching
cache
Cache the last value:
scala
val cached = expensive.cache
cached.subscribe(println) // Computes
cached.subscribe(println) // Uses cachedexpireAfterWrite
TTL cache:
scala
val ttlCache = stream.cache.expireAfterWrite(5, TimeUnit.MINUTES)Side Effects
tap
Observe without modifying:
scala
val logged = stream.tap(v => logger.debug(s"Value: ${v}"))tapOn
Observe with Try:
scala
val logged = stream.tapOn {
case Success(v) => logger.info(s"Success: ${v}")
case Failure(e) => logger.error("Failed", e)
}tapOnFailure
Observe only errors:
scala
val logged = stream.tapOnFailure { e =>
logger.error("Error occurred", e)
}Starting Values
startWith
Prepend a value:
scala
val withDefault = stream.startWith(0)startWith (Seq)
Prepend multiple values:
scala
val withDefaults = stream.startWith(Seq(-2, -1, 0))Aggregation
lastOption
Get the last value as Option:
scala
val last: RxOption[Int] = stream.lastOptiontoSeq (JVM only)
Collect all values:
scala
val all: Seq[Int] = Rx.fromSeq(1 to 10).toSeqOperator Chaining
Combine operators fluently:
scala
val result = Rx.interval(100, TimeUnit.MILLISECONDS)
.take(100)
.filter(_ % 2 == 0)
.map(_ * 10)
.throttleLast(500, TimeUnit.MILLISECONDS)
.tap(v => logger.debug(s"Processing: ${v}"))
.recover { case _ => -1 }
.cacheBest Practices
- Order matters - filter before map when possible
- Use throttle for UI events
- Cache expensive computations
- Handle errors at appropriate levels
- Clean up subscriptions to avoid leaks
