Retrofit+Rxjava下载文件进度的实现

网友投稿 521 2023-03-15


Retrofit+Rxjava下载文件进度的实现

前言

最近在学习Retrofit,虽然Retrofit没有提供文件下载进度的回调,但是Retrofit底层依赖的是OkHttp,实际上所需要的实现OkHttp对下载进度的监听,在OkHttp的官方Demo中,有一个Progress.java的文件,顾名思义。点我查看。

准备工作

本文采用Dagger2,Retrofit,RxJava。

compile'com.squareup.retrofit2:retrofit:2.0.2'

compile'com.squareup.retrofit2:converter-gson:2.0.2'

compile'com.squareup.retrofit2:adapter-rxjava:2.0.2'

//dagger2

compile'com.google.dagger:dagger:2.6'

apt'com.google.dagger:dagger-compiler:2.6'

//RxJava

compile'io.reactivex:rxandroid:1.2.0'

compile'io.reactivex:rxjava:1.1.5'

compile'com.jakewharton.rxbinding:rxbinding:0.4.0'

改造ResponseBody

okHttp3默认的ResponseBody因为不知道进度的相关信息,所以需要对其进行改造。可以使用接口监听进度信息。这里采用的是RxBus发送FileLoadEvent对象实现对下载进度的实时更新。这里先讲改造的ProgressResponseBody。

public class ProgressResponseBody extends ResponseBody {

private ResponseBody responseBody;

private BufferedSource bufferedSource;

public ProgressResponseBody(ResponseBody responseBody) {

this.responseBody = responseBody;

}

@Override

public MediaType contentType() {

return responseBody.contentType();

}

@Override

public long contentLength() {

return responseBody.contentLength();

}

@Override

public BufferedSource source() {

if (bufferedSource == null) {

bufferedSource = Okio.buffer(source(responseBody.source()));

}

return bufferedSource;

}

private Source source(Source source) {

return new ForwardingSource(source) {

long bytesReaded = 0;

@Override

public long read(Buffer sink, long byteCount) throws IOException {

long bytesRead = super.read(sink, byteCount);

bytesReaded += bytesRead == -1 ? 0 : bytesRead;

//实时发送当前已读取的字节和总字节

RxBus.getInstance().post(new FileLoadEvent(contentLength(), bytesReaded));

return bytesRead;

}

};

}

}

呃,OKIO相关知识我也正在学,这个是从官方Demo中copy的代码,只不过中间使用了RxBus实时发送FileLoadEvent对象。

FileLoadEvent

FileLoadEvent很简单,包含了当前已加载进度和文件总大小。

public class FileLoadEvent {

long total;

long bytesLoaded;

public long getBytesLoaded() {

return bytesLoaded;

}

public lohttp://ng getTotal() {

return total;

}

public FileLoadEvent(long total, long bytesLoaded) {

this.total = total;

this.bytesLoaded = bytesLoaded;

}

}

RxBus

RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用OTTO或者 EventBus。点我查看详情。

public class RxBus {

private static volatile RxBus mInstance;

private SerializedSubject mSubject;

private HashMap mSubscriptionMap;

/**

* PublishSubject只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者

* Subject同时充当了Observer和Observable的角色,Subject是非线程安全的,要避免该问题,

* 需要将 Subject转换为一个 SerializedSubject ,上述RxBus类中把线程非安全的PublishSubject包装成线程安全的Subject。

*/

private RxBus() {

mSubject = new SerializedSubject<>(PublishSubject.create());

}

/**

* 单例 双重锁

* @return

*/

public static RxBus getInstance() {

if (mInstance == null) {

synchronized (RxBus.class) {

if (mInstance == null) {

mInstance = new RxBus();

}

}

}

return mInstance;

}

/**

* 发送一个新的事件

* @param o

*/

public void post(Object o) {

mSubject.onNexhttp://t(o);

}

/**

* 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者

* @param type

* @param

* @return

*/

public Observable tObservable(final Class type) {

//ofType操作符只发射指定类型的数据,其内部就是filter+cast

return mSubject.ofType(type);

}

public Subscription doSubscribe(Class type, Action1 next, Action1 error) {

return tObservable(type)

.onBackpressureBuffer()

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(next, error);

}

public void addSubscription(Object o, Subscription subscription) {

if (mSubscriptionMap == null) {

mSubscriptionMap = new HashMap<>();

}

String key = o.getClass().getName();

if (mSubscriptionMap.get(key) != null) {

mSubscriptionMap.get(key).add(subscription);

} else {

CompositeSubscription compositeSubscription = new CompositeSubscription();

compositeSubscription.add(subscription);

mSubscriptionMap.put(key, compositeSubscription);

// Log.e("air", "addSubscription:订阅成功 " );

}

}

public void unSubscribe(Object o) {

if (mSubscriptionMap == null) {

return;

}

String key = o.getClass().getName();

if (!mSubscriptionMap.containsKey(key)) {

return;

}

if (mSubscriptionMap.get(key) != null) {

mSubscriptionMap.get(key).unsubscribe();

}

mSubscriptionMap.remove(key);

//Log.e("air", "unSubscribe: 取消订阅" );

}

}

FileCallBack

那么,重点来了。代码其实有5个方法需要重写,好吧,其实这些方法可以精简一下。其中progress()方法有两个参数,progress和total,分别表示文件已下载的大小和总大小,我们将这两个参数不断更新到UI上就行了。

public abstract class FileCallBack {

private String destFileDir;

private String destFileName;

public FileCallBack(String destFileDir, String destFileName) {

this.destFileDir = destFileDir;

this.destFileName = destFileName;

subscribeLoadProgress();

}

public abstract void onSuccess(T t);

public abstract void progress(long progress, long total);

public abstract void onStart();

public abstract void onCompleted();

public abstract void onError(Throwable e);

public void saveFile(ResponseBody body) {

InputStream is = null;

byte[] buf = new byte[2048];

int len;

FileOutputStream fos = null;

try {

is = body.byteStream();

File dir = new File(destFileDir);

if (!dir.exists()) {

dir.mkdirs();

}

File file = new File(dir, destFileName);

fos = new FileOutputStream(file);

while ((len = is.read(buf)) != -1) {

fos.write(buf, 0, len);

}

fos.flush();

unsubscribe();

//onCompleted();

} catch (FileNotFoundException e) {

e.printStackTrace();

} catch (IOException e) {

e.printStackTrace();

} finally {

try {

if (is != null) is.close();

if (fos != null) fos.close();

} catch (IOException e) {

Log.e("saveFile", e.getMessage());

}

}

}

/**

* 订阅加载的进度条

*/

public void subscribeLoadProgress() {

Subscription subscription = RxBus.getInstance().doSubscribe(FileLoadEvent.class, new Action1() {

@Override

public void call(FileLoadEvent fileLoadEvent) {

progress(fileLoadEvent.getBytesLoaded(),fileLoadEvent.getTotal());

}

}, new Action1() {

@Override

public void call(Throwable throwable) {

//TODO 对异常的处理

}

});

RxBus.getInstance().addSubscription(this, subscription);

}

/**

* 取消订阅,防止内存泄漏

*/

public void unsubscribe() {

RxBus.getInstance().unSubscribe(this);

}

}

开始下载

使用自己的ProgressResponseBody

通过OkHttpClient的拦截器去拦截Response,并将我们的ProgressReponseBody设置进去监听进度。

public class ProgressInterceptor implements Interceptor {

@Override

public Response intercept(Chain chain) throws IOException {

Response originalResponse = chain.proceed(chain.request());

return originalResponse.newBuilder()

.body(new ProgressResponseBody(originalResponse.body()))

.build();

}

}

构建Retrofit

@Module

public class ApiModule {

@Provides

@Singleton

public OkHttpClient provideClient() {

OkHttpClient client = new OkHttpClient.Builder()

.addInterceptor(new ProgressInterceptor())

.build();

return client;

}

@Provides

@Singleton

public Retrofit provideRetrofit(OkHttpClient client){

Retrofit retrofit = new Retrofit.Builder()

.client(client)

.baseUrl(Constant.HOST)

.addCallAdapterFactory(RxJavaCallAdapterFactory.create())

.addConverterFactory(GsonConverterFactory.create())

.build();

return retrofit;

}

@Provides

@Singleton

public ApiInfo provideApiInfo(Retrofit retrofit){

return retrofit.create(ApiInfo.class);

}

@Provides

@Singleton

public ApiManager provideApiManager(Application application, ApiInfo apiInfo){

return new ApiManager(application,apiInfo);

}

}

请求接口

publicofHxCQ interface ApiInfo {

@Streaming

@GET

Observable download(@Url String url);

}

执行请求

public void load(String url, final FileCallBack callBack){

apiInfo.download(url)

.subscribeOn(Schedulers.io())//请求网络 在调度者的io线程

.observeOn(Schedulers.io()) //指定线程保存文件

.doOnNext(new Action1() {

@Override

public void call(ResponseBody body) {

callBack.saveFile(body);

}

})

.observeOn(AndroidSchedulers.mainThread()) //在主线程中更新ui

.subscribe(new FileSubscriber(application,callBack));

}

在presenter层中执行网络请求。

通过V层依赖注入的presenter对象调用请求网络,请求网络后调用V层更新UI的操作。

public void load(String url){

String fileName = "app.apk";

String fileStoreDir = Environment.getExternalStorageDirectory().getAbsolutePath();

Log.e(TAG, "load: "+fileStoreDir.toString() );

FileCallBack callBack = new FileCallBack(fileStoreDir,fileName) {

@Override

public void onSuccess(final ResponseBody responseBody) {

Toast.makeText(App.getInstance(),"下载文件成功",Toast.LENGTH_SHORT).show();

}

@Override

public void progress(long progress, long total) {

iHomeView.update(total,progress);

}

@Override

public void onStart() {

iHomeView.showLoading();

}

@Override

public void onCompleted() {

iHomeView.hideLoading();

}

@Override

public void onError(Throwable e) {

//TODO: 对异常的一些处理

e.printStackTrace();

}

};

apiManager.load(url, callBack);

}

踩到的坑。

依赖的Retrofit版本一定要保持一致!!!说多了都是泪啊。

保存文件时要使用RxJava的doOnNext操作符,后续更新UI的操作切换到UI线程。

总结

看似代码很多,其实过程并不复杂:

在保存文件时,调用ForwardingSource的read方法,通过RxBus发送实时的FileLoadEvent对象。

FileCallBack订阅RxBus发送的FileLoadEvent。通过接收到FileLoadEvent中的下载进度和文件总大小对UI进行更新。

在下载保存文件完成后,取消订阅,防止内存泄漏。

Demo地址:https://github.com/AirMiya/DownloadDemo


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:路由器没有管理员密码(怎么改路由器的密码)
下一篇:详解Vue + Vuex 如何使用 vm.$nextTick
相关文章

 发表评论

暂时没有评论,来抢沙发吧~