Skip to content
主页博客推荐博客

[UniRx] 操作符 - 创建可观察对象(Creating Observables) 第一篇

  • #UniRx
  • #教程
Read time: 1 minute
Yaoming
示例代码

代码由GPT-4o等多种AI参与生成,并通过人工审查确保可执行.

登录后下载

UniRx 中,创建可观察对象是响应式编程的核心部分。可观察对象(IObservable<T>)是数据流的来源,它可以发出数据、完成信号或错误信号。UniRx 提供了多种方式来创建可观察对象,适用于不同的场景。

以下是 UniRx 中创建可观察对象的常见方法和使用场景:

1. 使用 Observable.Create

  • 用途:手动创建一个自定义的可观察对象。
  • 实现方式:通过传入一个委托,定义如何发出数据、完成或抛出错误。

示例:

var observable = Observable.Create<int>(observer =>  
{  
    // 发出数据  
    observer.OnNext(1);  
    observer.OnNext(2);  
    observer.OnNext(3);  

    // 发出完成信号  
    observer.OnCompleted();  

    // 返回一个 IDisposable,用于清理资源  
    return Disposable.Empty;  
});  

// 订阅观察者  
observable.Subscribe(  
    x => Debug.Log($"OnNext: {x}"),   
    ex => Debug.LogError($"OnError: {ex}"),   
    () => Debug.Log("OnCompleted")  
);

2. 使用 Observable.Defer

  • 延迟创建Defer 会等到有订阅者时才执行创建逻辑,而不是在声明时立即执行。
  • 动态生成流:每次订阅时,Defer 都会重新调用创建逻辑,生成一个新的可观察对象。
  • 避免共享状态:如果你的可观察对象依赖某些动态数据或状态,Defer 可以确保每次订阅时都使用最新的状态。

错误示例:

假设我们有一个方法 GetRandomNumber,每次调用都会生成一个随机数。如果直接创建一个 Observable,随机数会在声明时生成,而不是在订阅时生成:

// 错误示例:随机数在声明时生成  
var randomObservable = Observable.Return(UnityEngine.Random.Range(1, 100));  
randomObservable.Subscribe(x => Debug.Log($"Subscription 1: {x}"));  
randomObservable.Subscribe(x => Debug.Log($"Subscription 2: {x}"));

错误输出:

Subscription1:42

Subscription2:42

使用 Defer 可以确保每次订阅时都生成新的随机数:

// 正确示例:随机数在订阅时生成  
var randomObservable = Observable.Defer(() =>  
{  
    return Observable.Return(UnityEngine.Random.Range(1, 100));  
});  

randomObservable.Subscribe(x => Debug.Log($"Subscription 1: {x}"));  
randomObservable.Subscribe(x => Debug.Log($"Subscription 2: {x}"));

正确输出:

Subscription 1: 42  
Subscription 2: 87
注意事项:
  • 每次订阅都会重新创建流Defer 的核心特点是每次订阅时都会重新调用创建逻辑,因此适合需要动态生成数据流的场景。如果不需要动态生成,可以直接使用 Observable.Return 或其他方法。
  • 避免不必要的复杂性:如果数据流是固定的,或者不需要动态生成,使用 Defer 可能会增加不必要的复杂性。

与其他创建方法的对比:
方法特点适用场景
Observable.Return立即创建一个发出单个值的可观察对象。值是固定的,不需要动态生成。
Observable.Create手动创建一个可观察对象,可以完全控制数据流的发出逻辑。需要复杂的自定义逻辑时使用。
Observable.Defer延迟创建可观察对象,每次订阅时重新执行创建逻辑。数据流依赖动态状态,或需要避免共享状态和副作用时使用。

3. Never/Throw/Empty

  • Observable.Empty:
    • 用途:创建一个不发出任何数据但会立即完成的可观察对象。
    • 实现方式:适用于需要一个空的完成信号的场景。
  • Observable.Never:
    • 用途:创建一个永远不会发出数据或完成信号的可观察对象。
    • 实现方式:适用于测试或需要一个“静默”流的场景。
  • Observable.Throw:
    • 用途:创建一个立即发出错误信号的可观察对象。
    • 实现方式:适用于模拟错误的场景。

4. 使用 Observable.FromEvent

  • 用途:将自定义的 C# 事件,可以使用 Observable.FromEvent 将其转换为 IObservable

示例:
 /// <summary>
    /// 转换前的事件
    /// </summary>
    public event Action OnCustomEvent;

    [Button("RunFrom")]
    private void RunFrom()
    {
        // 将自定义事件转换为 Observable  
        Observable.FromEvent(
            handler => OnCustomEvent += handler, // 添加事件监听器  
            handler => OnCustomEvent -= handler  // 移除事件监听器  
        )
        .Subscribe(_ =>
        {
            Debug.Log("自定义事件被触发!");
        })
        .AddTo(this);//和当前MonoBehaviour绑定生命周期,用于销毁时取消监听

        // 模拟触发事件  
        Invoke(nameof(TriggerCustomEvent), 2f); // 等待2秒后触发事件  
    }
    void TriggerCustomEvent()
    {
        OnCustomEvent?.Invoke(); // 触发事件  
    }

输出

在运行 2 秒后,控制台会打印 "自定义事件被触发!"。

UniRx 提供了对 Unity 的 UnityEvent 的扩展方法,可以直接将类似 Button.onClick 的事件转换为 IObservable,无需手动使用 Observable.FromEvent

示例:
public UnityEngine.UI.Button myButton; // 在 Inspector 中拖入按钮 

[Button("RunFromUnityEvent")]
private void RunFromUnityEvent()
{
    // 使用 UniRx 提供的扩展方法监听按钮点击事件  
    myButton.OnClickAsObservable()
        .Subscribe(_ =>
        {
            Debug.Log("Button clicked!");
        })
        .AddTo(this); // 确保订阅在 GameObject 销毁时自动取消  
}

4. 使用 Observable.Interval

  • 用途:创建一个定时发出数据的可观察对象。
  • 实现方式:适用于需要周期性触发的场景。
示例:
//每秒发送一次
var observable = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .Select(x=> $"Interval: {x}");

observable
    .Subscribe(x => Debug.Log(x))
    .AddTo(this);

输出

Interval: 0
Interval: 1
Interval: 2
Interval: 3
.........

5. 使用 Observable.Return

UniRx 中并没有 Just 的方法。实际上,UniRx 使用的是 Observable.Return 来实现类似功能。Observable.Return 和其他响应式编程库(如 Rx.NET)中的 Just 是等价的,它们的作用完全相同:创建一个只发射单个值的 IObservable

示例:
// 使用 Return 创建一个流,并进行链式操作  
Observable.Return(5)
    .Select(x => x * 2) // 将值乘以 2  
    .Subscribe(
        value => Debug.Log($"OnNext: {value}"), // 输出结果  
        () => Debug.Log("OnCompleted")
    );

输出

OnNext: 10
OnCompleted

6. 使用 Observable.Range

  • 用途:创建一个发出一系列整数的可观察对象。
  • 实现方式:适用于需要生成一系列数据的场景。

示例:
// 创建一个发出一系列整数的可观察对象
var observable = Observable.Range(1, 5);

observable.Subscribe(x => Debug.Log($"OnNext: {x}"));

输出

OnNext: 1  
OnNext: 2  
OnNext: 3  
OnNext: 4  
OnNext: 5

7. 使用 Observable.Repeat

  • 用途:创建一个可观察对象,该对象重复发出特定项或项序列。例如,多次重复访问网络请求
示例1:
  [Button("RunRepeat1")]
    private void RunRepeat1()
    {
        // 创建一个流,发射随机1-5
        var stream = Observable
            .Repeat(UnityEngine.Random.Range(1, 5), 
                5, // 重复发射这个流 5 次  
                Scheduler.ThreadPool);//后台线程

        //这里是主线程
        Debug.Log($"MainThreadId:{Thread.CurrentThread.ManagedThreadId}");
        
        stream.Subscribe(
                //这里会执行5此在后台线程,但线程并不一定相同
                value => Debug.Log($"OnNext: {value} ThreadId:{Thread.CurrentThread.ManagedThreadId}"),
                //这里也在后台线程
                () => Debug.Log($"OnCompleted ThreadId:{Thread.CurrentThread.ManagedThreadId}")
            );
    }
输出:
MainThreadId:1
OnNext: 4 ThreadId:116
OnNext: 4 ThreadId:113
OnNext: 4 ThreadId:113
OnNext: 4 ThreadId:113
OnNext: 4 ThreadId:113
OnCompleted ThreadId:113

结合 TakeWhileTakeUntil,可以实现条件停止的重复任务。

示例2:
bool isPlayerAlive = true;

//创建一个只发射单个值的流
Observable.Return(1)
    .Delay(System.TimeSpan.FromSeconds(10))//延迟10秒
    .Subscribe(x => {
        isPlayerAlive = false;//将玩家设置死亡
    });


Observable.Timer(System.TimeSpan.Zero, //第一次触发事件的延迟时间,这里指立即触发
        System.TimeSpan.FromSeconds(2))//间隔时间
    .Repeat()
    .TakeWhile(x => isPlayerAlive) // 当玩家死亡时停止  
    .Subscribe(x =>//这里的x是事件触发的次数,而不是事件发生的时间戳
    {
        Debug.Log($"生成一个物品 {x}");
        //Instantiate(itemPrefab, spawnPoint.position, Quaternion.identity);
    },
    () => Debug.Log("结束生成物品"));
输出:
生成一个物品0
生成一个物品1
生成一个物品2
生成一个物品3
生成一个物品4
结束生成物品

8. 使用 Observable.Start

Observable.Start 是一个用于将同步操作或耗时任务转换为异步 Observable 流的方法。它会在后台线程中执行指定的操作,并将结果作为 IObservable<T> 发射出来

示例:

// 在后台线程中执行一个耗时任务  
Observable.Start(() =>
{
    //这里是子线程
    Debug.Log($"ThreadId:{Thread.CurrentThread.ManagedThreadId}");
    // 模拟耗时任务  
    System.Threading.Thread.Sleep(2000); // 假设任务耗时 2 秒  
    return "Task Completed!";
})
//.ObserveOnMainThread() // 也可选择切换回到主线程  
.Subscribe(
    //这里是子线程
    result => Debug.Log($"OnNext: {result} ThreadId:{Thread.CurrentThread.ManagedThreadId}"), // 任务完成后接收结果  
    //这里也是子线程
    () => Debug.Log($"OnCompleted  ThreadId:{Thread.CurrentThread.ManagedThreadId}")          // 任务完成时触发  
);

输出:

MainThreadId:1
ThreadId:216
OnNext: Task Completed! ThreadId:216
OnCompleted  ThreadId:216

那么以上就是UniRx在创建数据流时,经常会用到的操作符。

这里是RuntimeCube.com,下一篇,我们会了解-转换可观察对象(Transforming Observables)。

示例代码

代码由GPT-4o等多种AI参与生成,并通过人工审查确保可执行.

登录后下载