Rxjava功能操作符的使用方法详解

网友投稿 470 2023-03-18


Rxjava功能操作符的使用方法详解

Rxjava功能个人感觉很好用,里面的一些操作符很方便,Rxjava有:被观察者,观察者,订阅者,

被观察者通过订阅者订阅观察者,从而实现观察者监听被观察者返回的数据

下面把Rxjava常用的模型代码列出来,还有一些操作符的运用:

依赖:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

// Because RxAndroid releases are few and far between, it is recommended you also

// explicitly depend on RxJava's latest version for bug fixes and new features.

compile 'io.reactivex.rxjava2:rxjava:2.1.5'

这个是另一种解析数据的方法,阿里巴巴旗下的,听说是解析最快的解析器。。。。

compile 'com.alibaba:fastjson:1.2.39'

import android.os.Bundle;

import android.support.v7.app.AppCompatActivity;

import android.view.View;

import android.widget.TextView;

import com.alibaba.fastjson.JSONObject;

import java.io.IOException;

import java.util.concurrent.TimeUnit;

import io.reactivex.BackpressureStrategy;

import io.reactivex.Flowable;

import io.reactivex.FlowableEmitter;

import io.reactivex.FlowableOnSubscribe;

import io.reactivex.Observable;

import io.reactivex.ObservableEmitter;

import io.reactivex.ObservableOnSubscribe;

import io.reactivex.Observer;

import io.reactivex.android.schedulers.AndroidSchedulers;

import io.reactivex.annotations.NonNull;

import io.reactivex.disposables.Disposable;

import io.reactivex.functions.BiFunction;

import io.reactivex.functions.Consumer;

import io.reactivex.functions.Function;

import io.reactivex.schedulers.Schedulers;

import okhttp3.Call;

import okhttp3.Callback;

import okhttp3.OkHttpClient;

import okhttp3.Request;

import okhttp3.Response;

public class MainActivity extends AppCompatActivity {

private TextView name;

@Override

protected void onCreate(Bundle savedInstanceState) {

super.onCreate(savedInstanceState);

setContentView(R.layout.activity_main);

name = (TextView) findViewById(R.id.name);

//用来调用下面的方法,监听。

name.setOnClickListener(new View.OnClickListener() {

@Override

public void onClick(View v) {

interval();

}

});

}

//例1:Observer

public void observer() {

//观察者

Observer observer = new Observer() {

@Override

public void onSubscribe(@NonNull Disposable d) {

}

@Override

public void onNext(@NonNull String s) {

//接收从被观察者中返回的数据

System.out.println("onNext :" + s);

}

@Override

public void onError(@NonNull Throwable e) {

}

@Override

public void onComplete() {

}

};

//被观察者

Observable observable = new Observable() {

@Override

protected void subscribeActual(Observer observer) {

observer.onNext("11111");

observer.onNext("22222");

observer.onComplete();

}

};

//产生了订阅

observable.subscribe(observer);

}

//例2:Flowable

private void flowable(){

//被观察者

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(@NonNull FlowableEmitter e) throws Exception {

for (int i = 0; i < 100; i++) {

e.onNext(i+"");

}

}

//背压的策略,buffer缓冲区 观察者

//背压一共给了五种策略

// BUFFER、

// DROP、打印前128个,后面的删除

// ERROR、

// LATEST、打印前128个和最后一个,其余删除

// MISSING

//这里的策略若不是BUFFER 那么,会出现著名的:MissingBackpressureException错误

}, BackpressureStrategy.BUFFER).subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("subscribe accept"+s);

Thread.sleep(1000);

}

});

}

//例3:线程调度器 Scheduler

public void flowable1(){

Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(@NonNull FlowableEmitter e) throws Exception {

for (int i = 0; i < 100; i++) {

//输出在哪个线程

System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());

e.onNext(i+"");

}

}

},BackpressureStrategy.BUFFER)

//被观察者一般放在子线程

.subscribeOn(Schedulers.io())

//观察者一般放在主线程

.observeOn(AndroidSchedulers.mainThread())

.subscribe(new Consumer() {

@Override

public void accept(String s) throws Exception {

System.out.println("s"+ s);

Thread.sleep(100);

//输出在哪个线程

System.out.println("subscribe Thread.currentThread.getName = " + Thread.currentThread().getName());

}

});

}

//例4:http请求网络,map转化器,fastjson解析器

public void map1(){

Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull final ObservableEmitter e) throws Exception {

OkHttpClient client = new OkHttpClient();

Request request = new Request.Builder()

.url("https://qhb.2dyt.com/Bwei/login")

.build();

client.newCall(request).enqueue(new Callback() {

@Override

public void onFailure(Call call, IOException e) {

}

@Override

public void onResponse(Call call, Response response) throws IOException {

String result = response.body().string();

http:// e.onNext(result);

}

});

}

})

//map转换器 flatmap(无序),concatmap(有序)

.map(new Function() {

@Override

public Bean apply(@NonNull String s) throws Exception {

//用fastjson来解析数据

return JSONObject.parseObject(s,Bean.class);

}

}).subscribe(new Consumer() {

@Override

public void accept(Bean bean) throws Exception {

System.out.println("bean = "+ bean.toString() );

}

});

}

//常见rxjava操作符

//例 定时发送消息

public void interval(){

Observable.interval(2,1, TimeUnit.SECONDS)

.take(10)

.subscribe(new Consumer() {

@Override

public void accept(Long aLong) throws Exception {

System.out.println("aLong = " + aLong);

}

});

}

//例 zip字符串合并

public void zip(){

Observable observable1 = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter e) throws Exception {

e.onNext("1");

e.onNext("2");

e.onNext("3");

e.onNext("4");

e.onComplete();

}

});

Observable observable2 = Observable.create(new ObservableOnSubscribe() {

@Override

public void subscribe(@NonNull ObservableEmitter e) throws Exception {

e.onNext("A");

e.onNext("B");

e.onNext("C");

e.onNext("D");

e.onComplete();

}

});

Observable.zip(observable1, observable2, new BiFunction() {

@Override

public String apply(@NonNull String o, @NonNull String o2) throws ExceptioXuoBspGVn {

return o + o2;

}

}).subscribe(new Consumer() {

@Override

public void accept(String o) throws Exception {

System.out.println("o"+ o);

}

});

}

总结

以上就是本文关于Rxjava功能操作符的使用方法详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Javaweb应用使用限流处理大量的并发请求详解、分享一个简单的java爬虫框架、Java线程之线程同步synchronized和volatile详解等,有什么问题可以随时留言,会及时回复大家的。感谢朋友们对本站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:接口管理平台推荐(接口管理是什么)
下一篇:开源api网关(开源api网关 无代码 字段映射统一 动态路由)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~