[UniRx] 操作符 - 创建可观察对象(Creating Observables) 第一篇
- #UniRx
- #教程
- Yaoming
在 UniRx 中,创建可观察对象是响应式编程的核心部分。可观察对象(IObservable<T>
)是数据流的来源,它可以发出数据、完成信号或错误信号。UniRx 提供了多种方式来创建可观察对象,适用于不同的场景。
以下是 UniRx 中创建可观察对象的常见方法和使用场景:
1. 使用 Observable.Create
- 用途:手动创建一个自定义的可观察对象。
- 实现方式:通过传入一个委托,定义如何发出数据、完成或抛出错误。
示例:
var observable = Observable.Create<int>(observer =>
{
// 发出数据
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);
// 发出完成信号
observer.OnCompleted();
// 返回一个 IDisposable,用于清理资源
return Disposable.Empty;
});
// 订阅观察者
observable.Subscribe(
x => Debug.Log($"OnNext: {x}"),
ex => Debug.LogError($"OnError: {ex}"),
() => Debug.Log("OnCompleted")
);
2. 使用 Observable.Defer
- 延迟创建:
Defer
会等到有订阅者时才执行创建逻辑,而不是在声明时立即执行。 - 动态生成流:每次订阅时,
Defer
都会重新调用创建逻辑,生成一个新的可观察对象。 - 避免共享状态:如果你的可观察对象依赖某些动态数据或状态,
Defer
可以确保每次订阅时都使用最新的状态。
错误示例:
假设我们有一个方法 GetRandomNumber
,每次调用都会生成一个随机数。如果直接创建一个 Observable
,随机数会在声明时生成,而不是在订阅时生成:
// 错误示例:随机数在声明时生成
var randomObservable = Observable.Return(UnityEngine.Random.Range(1, 100));
randomObservable.Subscribe(x => Debug.Log($"Subscription 1: {x}"));
randomObservable.Subscribe(x => Debug.Log($"Subscription 2: {x}"));
错误输出:
Subscription1:42
Subscription2:42
使用 Defer
可以确保每次订阅时都生成新的随机数:
// 正确示例:随机数在订阅时生成
var randomObservable = Observable.Defer(() =>
{
return Observable.Return(UnityEngine.Random.Range(1, 100));
});
randomObservable.Subscribe(x => Debug.Log($"Subscription 1: {x}"));
randomObservable.Subscribe(x => Debug.Log($"Subscription 2: {x}"));
正确输出:
Subscription 1: 42
Subscription 2: 87
注意事项:
- 每次订阅都会重新创建流:
Defer
的核心特点是每次订阅时都会重新调用创建逻辑,因此适合需要动态生成数据流的场景。如果不需要动态生成,可以直接使用Observable.Return
或其他方法。 - 避免不必要的复杂性:如果数据流是固定的,或者不需要动态生成,使用
Defer
可能会增加不必要的复杂性。
与其他创建方法的对比:
3. Never
/Throw/Empty
Observable.Empty:
- 用途:创建一个不发出任何数据但会立即完成的可观察对象。
- 实现方式:适用于需要一个空的完成信号的场景。
Observable.Never:
- 用途:创建一个永远不会发出数据或完成信号的可观察对象。
- 实现方式:适用于测试或需要一个“静默”流的场景。
Observable.Throw:
- 用途:创建一个立即发出错误信号的可观察对象。
- 实现方式:适用于模拟错误的场景。
4. 使用 Observable.FromEvent
- 用途:将自定义的 C# 事件,可以使用
Observable.FromEvent
将其转换为IObservable
。
示例:
/// <summary>
/// 转换前的事件
/// </summary>
public event Action OnCustomEvent;
[Button("RunFrom")]
private void RunFrom()
{
// 将自定义事件转换为 Observable
Observable.FromEvent(
handler => OnCustomEvent += handler, // 添加事件监听器
handler => OnCustomEvent -= handler // 移除事件监听器
)
.Subscribe(_ =>
{
Debug.Log("自定义事件被触发!");
})
.AddTo(this);//和当前MonoBehaviour绑定生命周期,用于销毁时取消监听
// 模拟触发事件
Invoke(nameof(TriggerCustomEvent), 2f); // 等待2秒后触发事件
}
void TriggerCustomEvent()
{
OnCustomEvent?.Invoke(); // 触发事件
}
输出:
在运行 2 秒后,控制台会打印 "自定义事件被触发!"。
UniRx 提供了对 Unity 的 UnityEvent
的扩展方法,可以直接将类似 Button.onClick
的事件转换为 IObservable
,无需手动使用 Observable.FromEvent
示例:
public UnityEngine.UI.Button myButton; // 在 Inspector 中拖入按钮
[Button("RunFromUnityEvent")]
private void RunFromUnityEvent()
{
// 使用 UniRx 提供的扩展方法监听按钮点击事件
myButton.OnClickAsObservable()
.Subscribe(_ =>
{
Debug.Log("Button clicked!");
})
.AddTo(this); // 确保订阅在 GameObject 销毁时自动取消
}

4. 使用 Observable.Interval
- 用途:创建一个定时发出数据的可观察对象。
- 实现方式:适用于需要周期性触发的场景。
示例:
//每秒发送一次
var observable = Observable
.Interval(TimeSpan.FromSeconds(1))
.Select(x=> $"Interval: {x}");
observable
.Subscribe(x => Debug.Log(x))
.AddTo(this);
输出:
Interval: 0
Interval: 1
Interval: 2
Interval: 3
.........
5. 使用 Observable.Return
在 UniRx 中并没有 Just
的方法。实际上,UniRx 使用的是 Observable.Return
来实现类似功能。Observable.Return
和其他响应式编程库(如 Rx.NET)中的 Just
是等价的,它们的作用完全相同:创建一个只发射单个值的 IObservable
。
示例:
// 使用 Return 创建一个流,并进行链式操作
Observable.Return(5)
.Select(x => x * 2) // 将值乘以 2
.Subscribe(
value => Debug.Log($"OnNext: {value}"), // 输出结果
() => Debug.Log("OnCompleted")
);
输出:
OnNext: 10
OnCompleted
6. 使用 Observable.Range
- 用途:创建一个发出一系列整数的可观察对象。
- 实现方式:适用于需要生成一系列数据的场景。
示例:
// 创建一个发出一系列整数的可观察对象
var observable = Observable.Range(1, 5);
observable.Subscribe(x => Debug.Log($"OnNext: {x}"));
输出:
OnNext: 1
OnNext: 2
OnNext: 3
OnNext: 4
OnNext: 5
7. 使用 Observable.Repeat
- 用途:创建一个可观察对象,该对象重复发出特定项或项序列。例如,多次重复访问网络请求
示例1:
[Button("RunRepeat1")]
private void RunRepeat1()
{
// 创建一个流,发射随机1-5
var stream = Observable
.Repeat(UnityEngine.Random.Range(1, 5),
5, // 重复发射这个流 5 次
Scheduler.ThreadPool);//后台线程
//这里是主线程
Debug.Log($"MainThreadId:{Thread.CurrentThread.ManagedThreadId}");
stream.Subscribe(
//这里会执行5此在后台线程,但线程并不一定相同
value => Debug.Log($"OnNext: {value} ThreadId:{Thread.CurrentThread.ManagedThreadId}"),
//这里也在后台线程
() => Debug.Log($"OnCompleted ThreadId:{Thread.CurrentThread.ManagedThreadId}")
);
}
输出:
MainThreadId:1
OnNext: 4 ThreadId:116
OnNext: 4 ThreadId:113
OnNext: 4 ThreadId:113
OnNext: 4 ThreadId:113
OnNext: 4 ThreadId:113
OnCompleted ThreadId:113
结合 TakeWhile
或 TakeUntil
,可以实现条件停止的重复任务。
示例2:
bool isPlayerAlive = true;
//创建一个只发射单个值的流
Observable.Return(1)
.Delay(System.TimeSpan.FromSeconds(10))//延迟10秒
.Subscribe(x => {
isPlayerAlive = false;//将玩家设置死亡
});
Observable.Timer(System.TimeSpan.Zero, //第一次触发事件的延迟时间,这里指立即触发
System.TimeSpan.FromSeconds(2))//间隔时间
.Repeat()
.TakeWhile(x => isPlayerAlive) // 当玩家死亡时停止
.Subscribe(x =>//这里的x是事件触发的次数,而不是事件发生的时间戳
{
Debug.Log($"生成一个物品 {x}");
//Instantiate(itemPrefab, spawnPoint.position, Quaternion.identity);
},
() => Debug.Log("结束生成物品"));
输出:
生成一个物品0
生成一个物品1
生成一个物品2
生成一个物品3
生成一个物品4
结束生成物品
8. 使用 Observable.Start
Observable.Start
是一个用于将同步操作或耗时任务转换为异步 Observable
流的方法。它会在后台线程中执行指定的操作,并将结果作为 IObservable<T>
发射出来
示例:
// 在后台线程中执行一个耗时任务
Observable.Start(() =>
{
//这里是子线程
Debug.Log($"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
// 模拟耗时任务
System.Threading.Thread.Sleep(2000); // 假设任务耗时 2 秒
return "Task Completed!";
})
//.ObserveOnMainThread() // 也可选择切换回到主线程
.Subscribe(
//这里是子线程
result => Debug.Log($"OnNext: {result} ThreadId:{Thread.CurrentThread.ManagedThreadId}"), // 任务完成后接收结果
//这里也是子线程
() => Debug.Log($"OnCompleted ThreadId:{Thread.CurrentThread.ManagedThreadId}") // 任务完成时触发
);
输出:
MainThreadId:1
ThreadId:216
OnNext: Task Completed! ThreadId:216
OnCompleted ThreadId:216
那么以上就是UniRx在创建数据流时,经常会用到的操作符。
这里是RuntimeCube.com,下一篇,我们会了解-转换可观察对象(Transforming Observables)。