关于ReactiveX Python(RxPY)

关于ReactiveX Python(RxPY)

1、通过pip在Python中安装RxPY的library。
在cmd命令中输入:pip3 install rx

2、测试代码

# 测试RxPY

# 1
from rx import Observable, Observer

def push_five_strings(ob):
    ob.on_next("蒋")
    ob.on_next("智")
    ob.on_next("昊")
    ob.on_next("来")
    ob.on_next("了")
    ob.on_completed()

class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

# 2
from rx import Observable, Observer

class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.of("蒋", "智", "昊", "来", "了")

source.subscribe(PrintObserver())


# 3
from rx import Observable

source = Observable.of("蒋", "智", "昊", "来", "了")
source.subscribe(lambda value: print("Received {0}".format(value)))

3、参考资料
https://github.com/ReactiveX/RxPY

发表回复

您的电子邮箱地址不会被公开。