有用的Kotlin版Rx代码片段
只是几个随机的,可能有用的Rx片段(kotlin版)。
计时器
是的,只是一个简单的计时器,运行5秒。
Observable.interval(1, TimeUnit.SECONDS, Schedulers.newThread())
        .take(5)
        .subscribeBy (
                onNext = {  },
                onComplete = {  }
        )
网络调用重试
如果发生故障,重试网络调用或任何长时间运行的进程(本例为三次),并且每次都增加延迟时间。 (说明)
Observable.fromPublisher<String> {
    it.onNext("Doing a network call!")
    Thread.sleep(1000)      // 长时间运行
    it.onError(Exception()) // 抛出错误异常
}.retryWhen { errors ->
    errors.zipWith(Observable.range(1, 3)   
            .concatMap { retryCount ->
                Observable.timer(retryCount.toLong() * 10, TimeUnit.SECONDS)
            }
    )
}.blockingSubscribeBy(
        onNext = { println(it) },
        onError = { println(it) },
        onComplete = { println("Complete") }
)
链接请求
链接请求或长时间运行的进程。
Observable.fromPublisher<String> {
    Thread.sleep(1000)           // 长时间运行
    it.onNext("First Response!") // 响应
}.flatMap { response ->
    Observable.fromPublisher<String> {
        println(response)        // 处理首次响应
        Thread.sleep(1000)       // 长时间运行
        it.onNext("Final Response!")
    }
}.subscribeBy(
        onNext = {
            println(it)          // 处理最后的响应
        }
)
轮询请求
轮询请求,直到某个条件成立。
  var count = 0
  Observable.fromPublisher<Int> {  
      count += 1       
      it.onNext(count)
      it.onComplete()               // pretend this is an api request
  }.repeatWhen {
      it.delay(3, TimeUnit.SECONDS) // poll after 3 seconds delay
  }.takeUntil {
      it == 3                       // condition to stop polling
  }.blockingSubscribeBy (
          onNext = { println(it) },             // 1 - (delay) - 2 - (delay) - 3
          onComplete = { println("Complete") }  // called when the condition is fulfilled 
  )
获取首字母缩写
从名字列表中获取首字母缩写。
Observable.fromArray("Some Name", "Some Other Name")
        .map { it.split(' ') }
        .flatMap { names ->
            Observable.fromIterable(names)
                    .filter { it.isNotEmpty() }
                    .takeLast(2)
                    .reduce("", { acc: String, element: String ->
                        "$acc${element[0]}"
                    })
                    .map { it.toUpperCase() }
                    .filter { it.isNotEmpty() }
                    .toObservable()
        }
        .subscribeBy(
                onNext = {
                    println(it) //SN, ON
                }
        )
获取RecyclerView可见项(RxAndroid)
用户完成滚动后,稍微延迟一下,从RecyclerView(使用RxBinding)获取所有可见项。
RxRecyclerView.scrollEvents(recyclerView)
        .debounce(1, TimeUnit.SECONDS)
        .map {
            (layoutManager.findFirstVisibleItemPosition()..layoutManager.findLastVisibleItemPosition()).map { index ->
               items[index]
            } 
        }.flatMap {
            Observable.fromIterable(it) // list to single item
        }.subscribeBy(onNext = {
                // Item1, Item2... 
        })
 
             
             
             
             
            