[UniRx] 操作符 - 合并可观察对象(Combining Observables) 第四篇
- #UniRx
- #教程
- Yaoming
Combining Observables 用于将多个 Observable
合并或组合成一个新的 Observable
。这些操作符非常适合处理多个事件流的场景,比如同时监听多个输入、合并多个数据流、等待多个任务完成等
1. Merge
Merge
会将多个 Observable
合并成一个 Observable
,并按时间顺序发出所有流的事件。它不会组合事件,而是简单地将所有流的事件合并到一个流中。
> 这里的组合主要用于区分CombineLatest操作符
示例:
在游戏中,如果后端有两个协议会通知玩家血量的变化,那么可以将两个数据流合并到一起
//后端协议1,改变了玩家的血量
var healthStream1 = Observable.Interval(System.TimeSpan.FromSeconds(2))
.Select(_ => health.Value -= 5);
//后端协议2,改变了玩家的血量
var healthStream2 = Observable.Interval(System.TimeSpan.FromSeconds(3))
.Select(_ => health.Value -= 10);
// Merge 合并两个协议的数据流
Observable.Merge(healthStream1, healthStream2)
.Subscribe(h => Debug.Log($"RefreshPlayer health:{h}"));
输出:
RefreshPlayer health:95
RefreshPlayer health:85
RefreshPlayer health:80
RefreshPlayer health:70
RefreshPlayer health:65
...
2. CombineLatest
CombineLatest 操作符非常适合处理多个状态或事件流的组合。它会在任意一个流发出新事件时,获取所有流的最新值并组合成一个新的事件。
这在实时状态监控、UI 更新、玩家输入处理等场景中非常有用。
示例:
在游戏中,玩家的属性(如血量、魔法值、体力)可能会频繁变化,但你却希望把这些刷新UI属性的逻辑整合到一起,那么,我们可以使用 CombineLatest
来监听这些属性的变化,合并更新 UI
private ReactiveProperty<int> health = new ReactiveProperty<int>(100);
private ReactiveProperty<int> mana = new ReactiveProperty<int>(50);
private ReactiveProperty<float> stamina = new ReactiveProperty<float>(1.5f);
[Button("RunCombineLatest")]
void RunCombineLatest()
{
// 模拟属性变化
Observable.Interval(System.TimeSpan.FromSeconds(1)).Subscribe(_ => health.Value -= 5);
Observable.Interval(System.TimeSpan.FromSeconds(2)).Subscribe(_ => mana.Value -= 10);
//注意这里耐力的属性类型是float
Observable.Interval(System.TimeSpan.FromSeconds(3)).Subscribe(_ => stamina.Value += 0.5f);
// 使用 CombineLatest 监听属性变化并更新 UI
Observable.CombineLatest(health, mana, stamina, RefreshPlayerUI)
.Subscribe(stats => {
//这里没有做处理
Debug.Log("Combined stream");
});
}
private UniRx.Unit RefreshPlayerUI(int health, int mana, float stamina)
{
Debug.Log($"Health: {health}, Mana: {mana}, stamina: {stamina}");
return Unit.Default;
}
输出:
Health: 100, Mana: 50, stamina: 1.5
Combined stream
Health: 95, Mana: 50, stamina: 1.5
Combined stream
Health: 95, Mana: 40, stamina: 1.5
Combined stream
Health: 90, Mana: 40, stamina: 1.5
Combined stream
Health: 90, Mana: 40, stamina: 2
Combined stream
...
3. StartWith
StartWith 操作符用于在序列开始之前,先发出一个或多个指定的初始值。这在需要提供默认值、初始化状态或在流开始时立即触发某些事件的场景中非常有用。
示例:
创建玩家伤害属性流,第一次的伤害为30点,之后5秒内,每秒连续收到10点伤害
// 创建玩家伤害属性流,第一次的伤害为30点,之后5秒内,每秒连续收到10点伤害
var damageStream = Observable.Interval(System.TimeSpan.FromSeconds(1))
.Select(_ => -10)
.TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5)))
.StartWith(-30);
damageStream
.Subscribe(value => Debug.Log($"Health: {value}"));
输出:
Health: -30
Health: -10
Health: -10
Health: -10
Health: -10
4.Switch
Switch
用于切换到最新的 Observable
,并取消订阅之前的 Observable。
示例:通过一个控制流(例如 Subject
),来回切换两个数据流。
// 数据流 A:每秒发出 "Stream A: x"
var streamA = Observable.Interval(System.TimeSpan.FromSeconds(1))
.Select(x => $"Stream A: {x}");
// 数据流 B:每秒发出 "Stream B: x"
var streamB = Observable.Interval(System.TimeSpan.FromSeconds(2))
.Select(x => $"Stream B: {x}");
// 控制流:用于切换数据流
var toggleStream = new Subject<bool>();
// 根据控制流的值切换到对应的数据流
toggleStream
.Select(isStreamA => isStreamA ? streamA : streamB) // 根据控制流选择数据流
.Switch() // 切换到最新的数据流
.Subscribe(
message => Debug.Log(message), // 输出当前流的消息
() => Debug.Log("Stream completed!") // 流完成时调用
);
bool togg = false;
Observable.EveryUpdate()
.Where(x => Input.GetMouseButtonDown(0))//鼠标点击切换
.Subscribe(_ =>
{
togg = !togg;
toggleStream.OnNext(togg);
});
输出:
Stream A: 0
Stream A: 1
Stream A: 2
//鼠标点击后切换
Stream B: 0
Stream B: 1
//鼠标点击后切换
Stream A: 0
Stream A: 1
Stream A: 2
5.Zip
当所有输入流都发出一个新值时,将这些值组合成一个新值并发出。它严格按照每个流的发出顺序进行组合
示例:当两个数据流之间有依赖关系,可以用Zip来等待两个流的数据都收到后一起处理
// 收到A流的数据
var streamA = Observable.Timer(System.TimeSpan.FromSeconds(1))
.Select(x => $"A{x}");
// 收到B流的数据
var streamB = Observable.Timer(System.TimeSpan.FromSeconds(5))
.Select(x => $"B{x}");
// 使用 Zip 组合两个流
streamA
.Zip(streamB, (a, b) => $"{a} - {b}") // 当两个流都发出值时,组合它们
.Timeout(TimeSpan.FromSeconds(3))
//.Timeout(TimeSpan.FromSeconds(7))
.Subscribe(
result => Debug.Log(result),// 输出组合结果
e => Debug.Log($"Error! {e}"),
() => Debug.Log("Complete!")
);
输出:
如果设置超时3秒
Error! System.TimeoutException: The operation has timed out.
如果设置超时7秒
A0 - B0
Complete!
6.ZipLatest
当任意一个输入流发出新值时,ZipLatest
会等待其他流收到最新值,然后组合一起下发。注意和CombineLatest区别,CombineLatest并不会等待其他流的数据,会直接组合并下发。
示例:
这里使用了和CombineLatest相似的代码,但是结果不一样
// 模拟属性变化
Observable.Interval(System.TimeSpan.FromSeconds(1)).Subscribe(_ => health.Value -= 5);
Observable.Interval(System.TimeSpan.FromSeconds(2)).Subscribe(_ => mana.Value -= 10);
//注意这里耐力的属性类型是float
Observable.Interval(System.TimeSpan.FromSeconds(3)).Subscribe(_ => stamina.Value += 0.5f);
// 使用 ZipLatest 监听属性变化并更新 UI
Observable.ZipLatest(health, mana, stamina, RefreshPlayerUI)
.Subscribe(stats => {
//这里没有做处理
Debug.Log("Combined stream");
});
输出:
可以看到Health、Mana、stamina每个值,每次都不一样
Health: 100, Mana: 50, stamina: 1.5
Combined stream
Health: 90, Mana: 40, stamina: 2
Combined stream
Health: 75, Mana: 30, stamina: 2.5
Combined stream
...
以上就是合并流操作符的一些例子。
这里是RuntimeCube.com,下一篇,让我们来讲讲错误处理操作符(Error Handling Operators)