Observer, observable

Got it! I will explain step by step, starting with pure Kotlin features, then gradually introducing RxJava concepts, and finally linking them together. I will provide separate Kotlin files for each concept so that you can experiment with them easily. The examples will revolve around car-related concepts to make learning intuitive.

Observer Pattern in Kotlin – Step-by-Step Explanation

The Observer Pattern is a design pattern where an object (the subject) maintains a list of dependents (the observers) and notifies them automatically of any state changes. In simpler terms, observers "subscribe" to a subject to get updates whenever something of interest changes. This pattern is commonly used for implementing event handling systems.

Below, we will build up the Observer Pattern in Kotlin from basic concepts to more advanced RxJava-based reactive streams. Each section provides a separate Kotlin file example with a car-related scenario (like monitoring speed, fuel level, etc.), so you can experiment with each concept individually.

1. Callback Mechanism in Kotlin (Using Interfaces and Lambdas)

Before diving into the formal Observer Pattern, let's start with callbacks, a fundamental concept in event handling. A callback is a function or interface that you pass into another function or class, so it can be "called back" (invoked) later when a certain event or condition occurs. This is one way to notify one piece of code about events happening elsewhere.

Example Scenario: Suppose we have a car speed monitor that should trigger an action whenever the car's speed exceeds a certain limit. We can implement this using a callback.

Using an Interface for Callbacks

We define an interface for the callback (listener) and a class that uses this callback:

// File: SpeedLimitMonitorInterface.kt
interface SpeedLimitListener {
    fun onSpeedLimitExceeded(currentSpeed: Int)
}

class SpeedLimitMonitor(private val speedLimit: Int) {
    private var listener: SpeedLimitListener? = null

    // Method for clients to register a callback
    fun setSpeedLimitListener(listener: SpeedLimitListener) {
        this.listener = listener
    }

    // Method to update speed; triggers callback if limit is exceeded
    fun updateSpeed(newSpeed: Int) {
        println("SpeedLimitMonitor: New speed = $newSpeed km/h")
        if (newSpeed > speedLimit) {
            // Notify the listener (invoke the callback method)
            listener?.onSpeedLimitExceeded(newSpeed)
        }
    }
}

// Usage
fun main() {
    val speedMonitor = SpeedLimitMonitor(speedLimit = 100)
    
    // Register a callback using an anonymous object implementing the interface
    speedMonitor.setSpeedLimitListener(object : SpeedLimitListener {
        override fun onSpeedLimitExceeded(currentSpeed: Int) {
            println("Callback: Speed limit exceeded! Current speed is $currentSpeed km/h")
        }
    })
    
    // Simulate speed updates
    speedMonitor.updateSpeed(80)   // Below limit, no callback
    speedMonitor.updateSpeed(120)  // Above limit, triggers callback
}

What’s happening?

  • SpeedLimitListener is an interface with a method onSpeedLimitExceeded. It defines what should happen when the speed limit is exceeded, but doesn't implement it (that’s up to whoever uses it).

  • SpeedLimitMonitor has a speedLimit threshold. It holds a listener (the callback) and provides setSpeedLimitListener to register a listener. When updateSpeed is called, it checks the speed. If the speed is above the limit, it calls listener?.onSpeedLimitExceeded(newSpeed) – this invokes the callback method on the listener, if one is set.

  • In main, we create a SpeedLimitMonitor for a speed limit of 100 km/h. We then register a listener using an anonymous object that implements SpeedLimitListener, printing a warning when the callback is triggered.

  • When we call updateSpeed(120), the monitor detects 120 > 100 and calls our callback, so we see the warning message printed.

Using a Lambda for Callbacks

Kotlin allows using lambda functions as callbacks, making the code more concise. Instead of defining an interface for a single function, we can use a function type. For example, we can modify the monitor to accept a function to call when the speed limit is exceeded:

How the lambda approach works:

  • In SpeedLimitMonitorLambda, we use a variable onSpeedLimitExceeded of type (Int) -> Unit (a function taking an Int and returning nothing). This will hold our callback function.

  • In updateSpeed, instead of calling an interface method, we simply invoke the lambda if it's not null.

  • In main, we assign a lambda to speedMonitor.onSpeedLimitExceeded. The lambda prints a message with the current speed. This lambda is our callback.

  • When updateSpeed(130) is called, the lambda is invoked, printing the warning.

Both the interface and lambda approaches achieve the same thing: a mechanism to call back into user-provided code when an event (speed limit exceeded) occurs. The interface approach is more verbose but clear, while the lambda approach is concise and idiomatic in Kotlin.

2. Implementing Observer Pattern in Pure Kotlin

The callback mechanism above works for a single listener. The Observer Pattern generalizes this: a subject can have multiple observers listening for updates. All observers will be notified when the subject’s state changes. In a car context, think of a car’s sensors (fuel sensor, speed sensor) as subjects, and various dashboard displays or alarms as observers. Multiple observers might be interested in the same data.

Let's implement a simple Observer Pattern without any external libraries:

  1. Subject – Maintains a list of observers and provides methods to attach/detach observers. It notifies all observers of changes.

  2. Observer – An interface that observers implement to receive updates (usually it has a method like update()).

We'll create a generic Subject class and an Observer interface. Then we'll use them in a car example for monitoring speed and fuel level.

Explanation:

  • Observer<T> interface declares update(newValue: T). Observers will implement this to define how they react to updates of type T. In our example, T is Int for both speed (km/h) and fuel level (%).

  • Subject<T> manages a list of Observer<T>. Observers can subscribe via addObserver and unsubscribe via removeObserver. When a relevant event or change happens, notifyObservers is called with the new value, and it in turn calls each observer’s update method with that value.

  • We create two subjects: speedSubject and fuelSubject, each handling Int updates. We then create different observers:

    • SpeedDisplay (prints the current speed whenever it updates),

    • SpeedAlert (alerts if speed is above a limit),

    • FuelDisplay (prints current fuel level),

    • LowFuelWarning (warns if fuel is below 15%).

  • We register the appropriate observers to each subject. For example, speedSubject gets SpeedDisplay and SpeedAlert, while fuelSubject gets FuelDisplay and LowFuelWarning. This means speed changes will notify the speed display and alert, and fuel changes will notify the fuel display and low fuel warning.

  • In main, we simulate car behavior: engine on with initial speed 0 and fuel 100 (we notify observers of these initial values). Then the car accelerates and fuel drops: we call notifyObservers with new speed and fuel values. Each call results in all subscribed observers reacting:

    • For instance, speedSubject.notifyObservers(120) calls SpeedDisplay.update(120) (printing speed) and SpeedAlert.update(120) (printing an alert because 120 > 100).

    • Similarly, fuelSubject.notifyObservers(10) calls FuelDisplay.update(10) (printing fuel) and LowFuelWarning.update(10) (printing a low fuel warning because 10 < 15).

  • This manual implementation illustrates a one-to-many relationship: one subject, many observers. Whenever the subject’s state changes (speed or fuel level in this case), all observers get notified. We could easily extend this to other car metrics (tire pressure, engine temperature, etc.) by adding new subjects and observers.

3. Introduction to RxJava (Using Observables and Observers)

Manually implementing the observer pattern (as above) is educational, but in real applications we often use libraries that provide this functionality. RxJava is a popular library for reactive programming, which essentially has the Observer Pattern built-in. It allows you to work with streams of data (Observables) and subscribe to them with Observers, along with a rich set of operators to manipulate these streams.

Key RxJava concepts in the context of the Observer Pattern:

  • Observable: A source of data that can emit sequence of values over time. It is like our Subject, but much more powerful and flexible (it can handle threading, backpressure, etc., which we won’t get into here).

  • Observer: An entity that receives the data emitted by an Observable. In RxJava, an Observer typically implements methods onNext(value), onError(e), and onComplete() to handle the stream of values and events.

  • Disposable: Represents the connection between an Observer and an Observable. If you want to stop receiving updates (unsubscribe), you dispose the Disposable.

Let's see a basic RxJava example. We will create an Observable that emits car speed readings and an Observer that listens to those readings.

Note: To run RxJava examples, you need to include RxJava in your project (for example, RxJava 2.x or 3.x library). The code below assumes RxJava 2.x (io.reactivex package).

Explanation:

  • We import io.reactivex.Observable and other RxJava types.

  • We create speedObservable using Observable.interval(...). This produces a sequence of Long values (0,1,2,3,...) at 1-second intervals. We use .map to convert those into Int speeds (multiplying by 20 to simulate a speed reading increasing by 20 km/h each second). We then use .take(5) to limit to 5 emissions for the demo. This means speedObservable will emit 5 speed values (0, 20, 40, 60, 80) and then call onComplete.

  • We then create speedObserver as an object implementing Observer<Int>. We override:

    • onSubscribe: called when subscription happens (we just print a message; Disposable d can be saved if we want to cancel later).

    • onNext: called each time the Observable emits a value. Here we print the new speed.

    • onError: called if an error occurs in the stream.

    • onComplete: called when the Observable has emitted all values and completes normally.

  • We subscribe speedObserver to speedObservable via speedObservable.subscribe(speedObserver). This starts the flow of data. As values are emitted, onNext in our observer will be called repeatedly.

  • We also show an alternative using lambdas: RxJava’s subscribe has overloads that accept lambda functions for onNext, onError, and onComplete. We call speedObservable.subscribe(...) with three lambdas. This returns a Disposable which we could use to unsubscribe if needed (not needed here since the stream completes on its own after 5 values).

  • The Thread.sleep(6000) is just to keep the main thread alive long enough to see the timed emissions (since interval runs on a background thread by default). In a real application (especially Android), you wouldn't use sleep like this, but for a simple demo in a main method, it ensures we see the output.

  • Output: As the program runs, you would see output like:

    This shows both our observer implementations receiving the same speed data stream.

This example demonstrates how RxJava's Observable/Observer model is akin to the observer pattern: speedObservable is like a subject that many observers can subscribe to. In fact, we subscribed two observers (the explicit speedObserver and the lambda-based one) to the same observable, and both received the emissions. RxJava handles the subscription and notification logic for us.

4. Using RxJava Subjects to Implement a Reactive Observer Pattern

RxJava not only has Observables that create and emit data, but also provides Subject classes which are a sort of bridge or proxy: they can act as an Observable and an Observer at the same time. Subjects are especially useful when you want to manually push values into an observable stream. They essentially embody the "Subject" from the Observer Pattern.

Different types of RxJava Subjects include:

  • PublishSubject: Starts empty and only emits new values to subscribers. Subscribers only get events that occur after they subscribe (no replay of old events).

  • BehaviorSubject: Remembers the most recent value and emits that immediately to any new subscriber, plus all subsequent events. You often create it with an initial value (so it has something to replay to new observers).

  • ReplaySubject: Remembers all values (or a specified buffer of values) and replays them to any new subscriber, so new subscribers get the full history of emissions.

  • AsyncSubject: Emits only the last value (and only after the source completes). It gives subscribers just one value (the final one).

Let's focus on PublishSubject and BehaviorSubject with car examples, as they are very common in reactive programming.

PublishSubject (Engine RPM example):

  • We create a PublishSubject<Int> named rpmSubject. Think of this as a stream of engine RPM readings (just an example event stream). Initially, it has no data.

  • We subscribe Observer1 to rpmSubject. (Using lambda for brevity: it prints the RPM value it receives.)

  • We then push some values using rpmSubject.onNext(1000) and onNext(2000). Observer1 will immediately receive these and print the RPMs.

  • Now Observer2 subscribes after those two values were already emitted. Because PublishSubject does not replay past emissions, Observer2 will not get the 1000 or 2000 RPM events – it will only get events from now on.

  • We emit two more values: 3000 and 4000. At this point, both Observer1 and Observer2 are subscribed, so both will receive 3000 and 4000. You would see output for Observer1 and Observer2 for those.

  • Finally, we call rpmSubject.onComplete() to indicate no more values will be emitted. (Observers could handle this in an onComplete if we had provided one in subscribe.)

The output sequence might look like:

Notice Observer2 only starts receiving from 3000 onward.

BehaviorSubject (Speed example):

  • We create a BehaviorSubject<Int> with an initial value 0 (speed 0 km/h). This subject always holds a latest value.

  • ObserverA subscribes immediately. As soon as it subscribes, BehaviorSubject will send it the current latest value (0) without us having to emit anything (this is the “behavior” – it remembers the last value). So ObserverA will print "Current speed = 0 km/h".

  • We push new speed values: 50, then 100 via onNext. ObserverA receives each of those and prints the updates. Now the latest value held by the subject is 100.

  • ObserverB then subscribes. BehaviorSubject will instantly emit the current value (100) to ObserverB upon subscription. So ObserverB will print "Current speed = 100 km/h" even though it missed the earlier 50 (it got the latest).

  • We emit another speed value 80. At this point, both ObserverA and ObserverB are subscribed, so both receive 80. Now latest is 80.

  • We call onComplete to finish. (If we had observers with onComplete handlers, they'd trigger.)

The output could be:

ObserverB did not see 0 or 50 because it subscribed later, but thanks to BehaviorSubject it did get 100 (the most recent at the time of subscribing).

Summary of Subjects:

  • Use PublishSubject for event streams where you don't want new subscribers to get old events (for example, a "live" event feed like engine RPM ticks or button clicks). Only current subscribers get the data.

  • Use BehaviorSubject when you always want the latest state available to observers. This is great for things like current sensor readings (speed, temperature, etc.) where if a new observer comes in, it should immediately get the current value.

  • ReplaySubject (not shown in code) could be used if, say, you wanted any new observer to get all past speed readings (not just the latest). In a car scenario, perhaps for a diagnostics system that gets the entire history of an event stream when it starts observing.

  • AsyncSubject would be uncommon in this scenario; it might be used for something like waiting for a final result (imagine a diagnostic test that only matters when it's done – observers would only get the final outcome).

5. Linking Everything Together (Building a Car Monitoring System)

Finally, let's build a simple reactive car monitoring system using RxJava Subjects to tie multiple streams together. In this system:

  • We will use a BehaviorSubject to represent the car’s current speed (because we always care about the latest speed).

  • We will use a BehaviorSubject for the fuel level (since that's also a continuously varying state we might want the latest of).

  • We will use a PublishSubject for engine on/off events (engine state changes are discrete events – new subscribers don't need past engine events by default).

We will create several observers (subscribers) to simulate components of a car’s monitoring system:

  • A speedometer display that shows the current speed.

  • A fuel gauge display that shows the current fuel percentage.

  • A speed alert that beeps or logs if speed goes above a certain threshold.

  • A low fuel warning that triggers when fuel goes below a threshold.

  • An engine monitor that logs when the engine is turned on or off, and resets speed to 0 when the engine is off (for safety).

Let's see how this can be implemented:

Walkthrough of the reactive system:

  • We create the subjects for speed, fuel, and engine state. speedSubject and fuelSubject are BehaviorSubjects with initial values 0 and 100, respectively. This means they hold a current state. engineSubject is a PublishSubject since engine state changes are events (no initial state; if a subscriber comes after an event, it won't retroactively get it).

  • We set up multiple subscriptions:

    • Dashboard Speedometer subscribes to speedSubject and simply prints the current speed. Because speedSubject is a BehaviorSubject with initial value 0, this subscriber will immediately receive 0 km/h once subscribed (simulating that the dashboard initially shows 0 when the car is off).

    • Dashboard Fuel Gauge subscribes to fuelSubject and prints the fuel level. It will immediately get 100% at subscribe (initial fuel).

    • Speed Alert subscribes to speedSubject and checks each speed value; if it exceeds speedLimit (120 km/h) it prints an overspeed warning.

    • Low Fuel Warning subscribes to fuelSubject and checks each fuel value; if it drops below 15%, it prints a low fuel warning.

    • Engine Monitor subscribes to engineSubject to log engine on/off events. Additionally, when it receives false (engine off), it triggers speedSubject.onNext(0) to reset the speed to 0. This simulates a rule that when the engine is off, the speed should go to 0 (and it will notify all speed observers of this change).

  • Now, we simulate a sequence of events by calling onNext on our subjects:

    • Engine starts (engineSubject.onNext(true)): This will cause the engine monitor observer to print "Engine turned ON". (The speedometer and fuel gauge still show the last known states: 0 km/h and 100% from initial BehaviorSubject values.)

    • As the car "drives", we push speed and fuel changes:

      • Speed 30 -> fuel 90 -> speed 100 -> fuel 50 -> speed 130 -> fuel 10 -> speed 80. Each of these onNext calls instantly notifies all subscribers of the respective subject:

        • Speed changes update the speedometer display each time, and the speed alert checks each time (only triggering on 130 because 130 > 120).

        • Fuel changes update the fuel gauge display each time, and the low fuel alert triggers on 10 because 10 < 15.

        • At speed 130, you'll see the overspeed alert. At fuel 10, you'll see the low fuel alert.

    • Finally, engine turns off (engineSubject.onNext(false)): Engine monitor prints "Engine turned OFF" and then pushes speedSubject.onNext(0). Pushing 0 into speedSubject means:

      • The speedometer will immediately update to 0 km/h.

      • The speed alert observer will see 0, which is not over the limit (no alert).

      • (Fuel is unchanged at 10, and engine off event doesn't directly affect fuelSubject in our example.)

This reactive system allows us to have multiple independent components (observers) respond to changes in the car's state, all driven by the Subjects. We didn't have to manually call each observer like in step 2; instead, each component just declared its interest, and the Subjects took care of notifying them. We could easily extend this by adding more observers (for example, a tire pressure monitor BehaviorSubject and corresponding observers for tire pressure warnings, etc.) or by adding more complex logic (combining streams, filtering, etc., which RxJava supports).

Each part of this step-by-step journey has built on the previous:

  • We started with simple callbacks (one observer, manual trigger).

  • Then expanded to a formal Observer Pattern with multiple observers in pure Kotlin.

  • Introduced RxJava’s Observables and how they generalize the pattern, providing powerful features.

  • Learned about RxJava’s Subjects to create our own observables that hold state or emit events.

  • Finally, combined Subjects to simulate a small reactive system for a car, with multiple data streams and observers.

Feel free to take each Kotlin file example, run it, and modify values or conditions to see how the observers react. This will solidify your understanding of the Observer Pattern and reactive programming in Kotlin. Happy coding!

Last updated