1、通过pip在Python中安装RxPY的library。
在cmd命令中输入:pip3 install rx
2、测试代码
# 测试RxPY # 1 from rx import Observable, Observer def push_five_strings(ob): ob.on_next("蒋") ob.on_next("智") ob.on_next("昊") ob.on_next("来") ob.on_next("了") ob.on_completed() class PrintObserver(Observer): def on_next(self, value): print("Received {0}".format(value)) def on_completed(self): print("Done!") def on_error(self, error): print("Error Occurred: {0}".format(error)) source = Observable.create(push_five_strings) source.subscribe(PrintObserver()) # 2 from rx import Observable, Observer class PrintObserver(Observer): def on_next(self, value): print("Received {0}".format(value)) def on_completed(self): print("Done!") def on_error(self, error): print("Error Occurred: {0}".format(error)) source = Observable.of("蒋", "智", "昊", "来", "了") source.subscribe(PrintObserver()) # 3 from rx import Observable source = Observable.of("蒋", "智", "昊", "来", "了") source.subscribe(lambda value: print("Received {0}".format(value)))
3、参考资料
https://github.com/ReactiveX/RxPY