Skip to content
主页博客推荐博客

[UniRx] 操作符 - 合并可观察对象(Combining Observables) 第四篇

  • #UniRx
  • #教程
Read time: 1 minute
Yaoming

示例代码

代码由GPT-4o等多种AI参与生成,并通过人工审查确保可执行.

登录后下载

Combining Observables 用于将多个 Observable 合并或组合成一个新的 Observable。这些操作符非常适合处理多个事件流的场景,比如同时监听多个输入、合并多个数据流、等待多个任务完成等

1. Merge

Merge 会将多个 Observable 合并成一个 Observable,并按时间顺序发出所有流的事件。它不会组合事件,而是简单地将所有流的事件合并到一个流中。

> 这里的组合主要用于区分CombineLatest操作符

示例:

在游戏中,如果后端有两个协议会通知玩家血量的变化,那么可以将两个数据流合并到一起

//后端协议1,改变了玩家的血量
var healthStream1 = Observable.Interval(System.TimeSpan.FromSeconds(2))
    .Select(_ => health.Value -= 5);
//后端协议2,改变了玩家的血量
var healthStream2 = Observable.Interval(System.TimeSpan.FromSeconds(3))
    .Select(_ => health.Value -= 10);

// Merge 合并两个协议的数据流
Observable.Merge(healthStream1, healthStream2)
    .Subscribe(h => Debug.Log($"RefreshPlayer health:{h}"));

输出:

RefreshPlayer health:95
RefreshPlayer health:85
RefreshPlayer health:80
RefreshPlayer health:70
RefreshPlayer health:65
...

2. CombineLatest

CombineLatest 操作符非常适合处理多个状态或事件流的组合。它会在任意一个流发出新事件时,获取所有流的最新值并组合成一个新的事件

这在实时状态监控、UI 更新、玩家输入处理等场景中非常有用。

示例:

在游戏中,玩家的属性(如血量、魔法值、体力)可能会频繁变化,但你却希望把这些刷新UI属性的逻辑整合到一起,那么,我们可以使用 CombineLatest 来监听这些属性的变化,合并更新 UI

private ReactiveProperty<int> health = new ReactiveProperty<int>(100);
private ReactiveProperty<int> mana = new ReactiveProperty<int>(50);
private ReactiveProperty<float> stamina = new ReactiveProperty<float>(1.5f);

[Button("RunCombineLatest")]
void RunCombineLatest()
{
    // 模拟属性变化  
    Observable.Interval(System.TimeSpan.FromSeconds(1)).Subscribe(_ => health.Value -= 5);
    Observable.Interval(System.TimeSpan.FromSeconds(2)).Subscribe(_ => mana.Value -= 10);
    //注意这里耐力的属性类型是float
    Observable.Interval(System.TimeSpan.FromSeconds(3)).Subscribe(_ => stamina.Value += 0.5f);

    // 使用 CombineLatest 监听属性变化并更新 UI  
    Observable.CombineLatest(health, mana, stamina, RefreshPlayerUI)
        .Subscribe(stats => {
            //这里没有做处理
            Debug.Log("Combined stream");
        });
}

private UniRx.Unit RefreshPlayerUI(int health, int mana, float stamina)
{
    Debug.Log($"Health: {health}, Mana: {mana}, stamina: {stamina}");
    return Unit.Default;
}

输出:

Health: 100, Mana: 50, stamina: 1.5
Combined stream
Health: 95, Mana: 50, stamina: 1.5
Combined stream
Health: 95, Mana: 40, stamina: 1.5
Combined stream
Health: 90, Mana: 40, stamina: 1.5
Combined stream
Health: 90, Mana: 40, stamina: 2
Combined stream
...

3. StartWith

StartWith 操作符用于在序列开始之前,先发出一个或多个指定的初始值。这在需要提供默认值、初始化状态或在流开始时立即触发某些事件的场景中非常有用。

示例:

创建玩家伤害属性流,第一次的伤害为30点,之后5秒内,每秒连续收到10点伤害

// 创建玩家伤害属性流,第一次的伤害为30点,之后5秒内,每秒连续收到10点伤害
var damageStream = Observable.Interval(System.TimeSpan.FromSeconds(1))
    .Select(_ => -10)
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5)))
    .StartWith(-30);


damageStream
    .Subscribe(value => Debug.Log($"Health: {value}"));

输出:

Health: -30
Health: -10
Health: -10
Health: -10
Health: -10

4.Switch

Switch 用于切换到最新的 Observable,并取消订阅之前的 Observable。

示例:通过一个控制流(例如 Subject),来回切换两个数据流。

// 数据流 A:每秒发出 "Stream A: x"  
var streamA = Observable.Interval(System.TimeSpan.FromSeconds(1))
    .Select(x => $"Stream A: {x}");

// 数据流 B:每秒发出 "Stream B: x"  
var streamB = Observable.Interval(System.TimeSpan.FromSeconds(2))
    .Select(x => $"Stream B: {x}");

// 控制流:用于切换数据流  
var toggleStream = new Subject<bool>();

// 根据控制流的值切换到对应的数据流  
toggleStream
    .Select(isStreamA => isStreamA ? streamA : streamB) // 根据控制流选择数据流  
    .Switch() // 切换到最新的数据流  
    .Subscribe(
        message => Debug.Log(message), // 输出当前流的消息  
        () => Debug.Log("Stream completed!") // 流完成时调用  
    );

bool togg = false;

Observable.EveryUpdate()
    .Where(x => Input.GetMouseButtonDown(0))//鼠标点击切换
    .Subscribe(_ =>
    {
        togg = !togg;
        toggleStream.OnNext(togg);
    });


输出:

Stream A: 0
Stream A: 1
Stream A: 2
//鼠标点击后切换
Stream B: 0
Stream B: 1
//鼠标点击后切换
Stream A: 0
Stream A: 1
Stream A: 2

5.Zip

当所有输入流都发出一个新值时,将这些值组合成一个新值并发出。它严格按照每个流的发出顺序进行组合

示例:当两个数据流之间有依赖关系,可以用Zip来等待两个流的数据都收到后一起处理

// 收到A流的数据
var streamA = Observable.Timer(System.TimeSpan.FromSeconds(1))
    .Select(x => $"A{x}");

// 收到B流的数据
var streamB = Observable.Timer(System.TimeSpan.FromSeconds(5))
    .Select(x => $"B{x}");

// 使用 Zip 组合两个流  
streamA
    .Zip(streamB, (a, b) => $"{a} - {b}") // 当两个流都发出值时,组合它们  
    .Timeout(TimeSpan.FromSeconds(3))
    //.Timeout(TimeSpan.FromSeconds(7))
    .Subscribe(
        result => Debug.Log(result),// 输出组合结果  
        e => Debug.Log($"Error! {e}"),
        () => Debug.Log("Complete!")
        ); 

输出:

如果设置超时3秒

Error! System.TimeoutException: The operation has timed out.

如果设置超时7秒

A0 - B0
Complete!

6.ZipLatest

当任意一个输入流发出新值时,ZipLatest 会等待其他流收到最新值,然后组合一起下发。注意和CombineLatest区别,CombineLatest并不会等待其他流的数据,会直接组合并下发。

示例:

这里使用了和CombineLatest相似的代码,但是结果不一样

// 模拟属性变化  
Observable.Interval(System.TimeSpan.FromSeconds(1)).Subscribe(_ => health.Value -= 5);
Observable.Interval(System.TimeSpan.FromSeconds(2)).Subscribe(_ => mana.Value -= 10);
//注意这里耐力的属性类型是float
Observable.Interval(System.TimeSpan.FromSeconds(3)).Subscribe(_ => stamina.Value += 0.5f);

// 使用 ZipLatest 监听属性变化并更新 UI  
Observable.ZipLatest(health, mana, stamina, RefreshPlayerUI)
    .Subscribe(stats => {
        //这里没有做处理
        Debug.Log("Combined stream");
    });

输出:

可以看到Health、Mana、stamina每个值,每次都不一样

Health: 100, Mana: 50, stamina: 1.5
Combined stream
Health: 90, Mana: 40, stamina: 2
Combined stream
Health: 75, Mana: 30, stamina: 2.5
Combined stream
...

以上就是合并流操作符的一些例子。

这里是RuntimeCube.com,下一篇,让我们来讲讲错误处理操作符(Error Handling Operators)

示例代码

代码由GPT-4o等多种AI参与生成,并通过人工审查确保可执行.

登录后下载