1、通过pip在Python中安装RxPY的library。
在cmd命令中输入:pip3 install rx

2、测试代码
# 测试RxPY
# 1
from rx import Observable, Observer
def push_five_strings(ob):
ob.on_next("蒋")
ob.on_next("智")
ob.on_next("昊")
ob.on_next("来")
ob.on_next("了")
ob.on_completed()
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.create(push_five_strings)
source.subscribe(PrintObserver())
# 2
from rx import Observable, Observer
class PrintObserver(Observer):
def on_next(self, value):
print("Received {0}".format(value))
def on_completed(self):
print("Done!")
def on_error(self, error):
print("Error Occurred: {0}".format(error))
source = Observable.of("蒋", "智", "昊", "来", "了")
source.subscribe(PrintObserver())
# 3
from rx import Observable
source = Observable.of("蒋", "智", "昊", "来", "了")
source.subscribe(lambda value: print("Received {0}".format(value)))
3、参考资料
https://github.com/ReactiveX/RxPY