RxJava+Retrofit+OkHttp实现多文件下载之断点续传

网友投稿 766 2023-03-16


RxJava+Retrofit+OkHttp实现多文件下载之断点续传

背景

断点续传下载一直是移动开发中必不可少的一项重要的技术,同样的Rxjava和Retrofit的结合让这个技术解决起来更加的灵活,我们完全可以封装一个适合自的下载框架,简单而且安全!

效果

实现

下载和之前的http请求可以相互独立,所以我们单独给download建立一个工程moudel处理

1.创建service接口

和以前一样,先写接口

注意:Streaming是判断是否写入内存的标示,如果小文件可以考虑不写,一般情况必须写;下载地址需要通过@url动态指定(不适固定的),@head标签是指定下载的起始位置(断点续传的位置)

/*断点续传下载接口*/

@Streaming/*大文件需要加入这个判断,防止下载过程中写入到内存中*/

@GET

Observable download(@Header("RANGE") String start, @Url String url);

2.复写ResponseBody

和之前的上传封装一样,下载更加的需要进度,所以我们同样覆盖ResponseBody类,写入进度监听回调

/**

* 自定义进度的body

* @author wzg

*/

public class DownloadResponseBody extends ResponseBody {

private ResponseBody responseBody;

private DownloadProgressListener progressListener;

private BufferedSource bufferedSource;

public DownloadResponseBody(ResponseBody responseBody, DownloadProgressListener progressListener) {

this.responseBody = responseBody;

this.progressListener = progressListener;

}

@Override

public BufferedSource source() {

if (bufferedSource == null) {

bufferedSource = Okio.buffer(source(responseBody.source()));

}

return bufferedSource;

}

private Source source(Source source) {

return new ForwardingSource(source) {

long totalBytesRead = 0L;

@Override

public long read(Buffer sink, long byteCount) throws IOException {

long bytesRead = super.read(sink, byteCount);

// read() returns the number of bytes read, or -1 if this source is exhausted.

totalBytesRead += bytesRead != -1 ? bytesRead : 0;

if (null != progressListener) {

progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1);

}

return bytesRead;

}

};

}

}

3.自定义进度回调接口

/**

* 成功回调处理

* Created by WZG on 2016/10/20.

*/

public interface DownloadProgressListener {

/**

* 下载进度

* @param read

* @param count

* @param done

*/

void update(long read, long count, boolean done);

}

4.复写Interceptor

复写Interceptor,可以将我们的监听回调通过okhttp的client方法addInterceptor自动加载我们的监听回调和ResponseBody

/**

* 成功回调处理

* Created by WZG on 2016/10/20.

*/

public class DownloadInterceptor implements Interceptor {

private DownloadProgressListener listener;

public DownloadInterceptor(DownloadProgressListener listener) {

this.listener = listener;

}

@Override

public Response intercept(Chain chain) throws IOException {

Response originalResponse = chain.proceed(chain.request());

return originalResponse.newBuilder()

.body(new DownloadResponseBody(originalResponse.body(), listener))

.build();

}

}

5.封装请求downinfo数据

这个类中的数据可自由扩展,用户自己选择需要保持到数据库中的数据,可以自由选择需要数据库第三方框架,demo采用greenDao框架存储数据

public class DownInfo {

/*存储位置*/

private String savePath;

/*下载url*/

private String url;

/*基础url*/

private String baseUrl;

/*文件总长度*/

private long countLength;

/*下载长度*/

privatMMrlExxe long readLength;

/*下载唯一的HttpService*/

private HttpService service;

/*回调监听*/

private HttpProgressOnNextListener listener;

/*超时设置*/

private int DEFAULT_TIMEOUT = 6;

/*下载状态*/

private DownState state;

}

6.DownState状态封装

很简单,和大多数封装框架一样

public enum DownState {

START,

DOWN,

PAUSE,

STOP,

ERROR,

FINISH,

}

7.请求HttpProgressOnNextListener回调封装类

注意:这里和DownloadProgressListener不同,这里是下载这个过程中的监听回调,DownloadProgressListener只是进度的监听

通过抽象类,可以自由选择需要覆盖的类,不需要完全覆盖!更加灵活

/**

* 下载过程中的回调处理

* Created by WZG on 2016/10/20.

*/

public abstract class HttpProgressOnNextListener {

/**

* 成功后回调方法

* @param t

*/

public abstract void onNext(T t);

/**

* 开始下载

*/

public abstract void onStart();

/**

* 完成下载

*/

public abstract void onComplete();

/**

* 下载进度

* @param readLength

* @param MMrlExxcountLength

*/

public abstract void updateProgress(long readLength, long countLength);

/**

* 失败或者错误方法

* 主动调用,更加灵活

* @param e

*/

public void onError(Throwable e){

}

/**

* 暂停下载

*/

public void onPuase(){

}

/**

* 停止下载销毁

*/

public void onStop(){

}

}

8.封装回调Subscriber

准备的工作做完,需要将回调和传入回调的信息统一封装到sub中,统一判断;和封装二的原理一样,我们通过自定义Subscriber来提前处理返回的数据,让用户字需要关系成功和失败以及向关心的数据,避免重复多余的代码出现在处理类中

sub需要继承DownloadProgressListener,和自带的回调一起组成我们需要的回调结果

传入DownInfo数据,通过回调设置DownInfo的不同状态,保存状态

通过Rxandroid将进度回调指定到主线程中(如果不需要进度最好去掉该处理避免主线程处理负担)

update进度回调在断点续传使用时,需要手动判断断点后加载的长度,因为指定断点下载长度下载后总长度=(物理长度-起始下载长度)

/**

* 用于在Http请求开始时,自动显示一个ProgressDialog

* 在Http请求结束是,关闭ProgressDialog

* 调用者自己对请求数据进行处理

* Created by WZG on 2016/7/16.

*/

public class ProgressDownSubscriber extends Subscriber implements DownloadProgressListener {

//弱引用结果回调

private WeakReference mSubscriberOnNextListener;

/*下载数据*/

private DownInfo downInfo;

public ProgressDownSubscriber(DownInfo downInfo) {

this.mSubscriberOnNextListener = new WeakReference<>(downInfo.getListener());

this.downInfo=downInfo;

}

/**

* 订阅开始时调用

* 显示ProgressDialog

*/

@Override

public void onStart() {

if(mSubscriberOnNextListener.get()!=null){

mSubscriberOnNextListener.get().onStart();

}

downInfo.setState(DownState.START);

}

/**

* 完成,隐藏ProgressDialog

*/

@Override

public void onCompleted() {

if(mSubscriberOnNextListener.get()!=null){

mSubscriberOnNextListener.get().onComplete();

}

downInfo.setState(DownState.FINISH);

}

/**

* 对错误进行统一处理

* 隐藏ProgressDialog

*

* @param e

*/

@Override

public void onError(Throwable e) {

/*停止下载*/

HttpDownManager.getInstance().stopDown(downInfo);

if(mSubscriberOnNextListener.get()!=null){

mSubscriberOnNextListener.get().onError(e);

}

downInfo.setState(DownState.ERROR);

}

/**

* 将onNext方法中的返回结果交给Activity或Fragment自己处理

*

* @param t 创建Subscriber时的泛型类型

*/

@Override

public void onNext(T t) {

if (mSubscriberOnNextListener.get() != null) {

mSubscriberOnNextListener.get().onNext(t);

}

}

@Override

public void update(long read, long count, boolean done) {

if(downInfo.getCountLength()>count){

read=downInfo.getCountLength()-count+read;

}else{

downInfo.setCountLength(count);

}

downInfo.setReadLength(read);

if (mSubscriberOnNextListener.get() != null) {

/*接受进度消息,造成UI阻塞,如果不需要显示进度可去掉实现逻辑,减少压力*/

rx.Observable.just(read).observeOn(AndroidSchedulers.mainThread())

.subscribe(new Action1() {

@Override

public void call(Long aLong) {

/*如果暂停或者停止状态延迟,不需要继续发送回调,影响显示*/

if(downInfo.getState()==DownState.PAUSE||downInfo.getState()==DownState.STOP)return;

downInfo.setState(DownState.DOWN);

mSubscriberOnNextListener.get().updateProgress(aLong,downInfo.getCountLength());

}

});

}

}

}

9.下载管理类封装HttpDownManager

单利获取

/**

* 获取单例

* @return

*/

public static HttpDownMahttp://nager getInstance() {

if (INSTANCE == null) {

synchronized (HttpDownManager.class) {

if (INSTANCE == null) {

INSTANCE = new HttpDownManager();

}

}

}

return INSTANCE;

}

因为单利所以需要记录正在下载的数据和回到sub

/*回调sub队列*/

private HashMap subMap;

/*单利对象*/

private volatile static HttpDownManager INSTANCE;

private HttpDownManager(){

downInfos=new HashSet<>();

subMap=new HashMap<>();

}

开始下载需要记录下载的service避免每次都重复创建,然后请求sercie接口,得到ResponseBody数据后将数据流写入到本地文件中(6.0系统后需要提前申请权限)

/**

* 开始下载

*/

public void startDown(DownInfo info){

/*正在下载不处理*/

if(info==null||subMap.get(info.getUrl())!=null){

return;

}

/*添加回调处理类*/

ProgressDownSubscriber subscriber=new ProgressDownSubscriber(info);

/*记录回调sub*/

subMap.put(info.getUrl(),subscriber);

/*获取service,多次请求公用一个sercie*/

HttpService httpService;

if(downInfos.contains(info)){

httpService=info.getService();

}else{

DownloadInterceptor interceptor = new DownloadInterceptor(subscriber);

OkHttpClient.Builder builder = new OkHttpClient.Builder();

//手动创建一个OkHttpClient并设置超时时间

builder.connectTimeout(info.getConnectionTime(), TimeUnit.SECONDS);

builder.addInterceptor(interceptor);

Retrofit retrofit = new Retrofit.Builder()

.client(builder.build())

.addConverterFactory(GsonConverterFactory.create())

.addCallAdapterFactory(RxJavaCallAdapterFactory.create())

.baseUrl(info.getBaseUrl())

.build();

httpService= retrofit.create(HttpService.class);

info.setService(httpService);

}

/*得到rx对象-上一次下載的位置開始下載*/

httpService.download("bytes=" + info.getReadLength() + "-",info.getUrl())

/*指定线程*/

.subscribeOn(Schedulers.io())

.unsubscribeOn(Schedulers.io())

/*失败后的retry配置*/

.retryWhen(new RetryWhenNetworkException())

/*读取下载写入文件*/

.map(new Func1() {

@Override

public DownInfo call(ResponseBody responseBody) {

try {

writeCache(responseBody,new File(info.getSavePath()),info);

} catch (IOException e) {

/*失败抛出异常*/

throw new HttpTimeException(e.getMessage());

}

return info;

}

})

/*回调线程*/

.observeOn(AndroidSchedulers.mainThread())

/*数据回调*/

.subscribe(subscriber);

}

写入文件

注意:一开始调用进度回调是第一次写入在进度回调之前,所以需要判断一次DownInfo是否获取到下载总长度,没有这选择当前ResponseBody 读取长度为总长度

/**

* 写入文件

* @param file

* @param info

* @throws IOException

*/

public void writeCache(ResponseBody responseBody,File file,DownInfo info) throws IOException{

if (!file.getParentFile().exists())

file.getParentFile().mkdirs();

long allLength;

if (info.getCountLength()==0){

allLength=responseBody.contentLength();

}else{

allLength=info.getCountLength();

}

FileChannel channelOut = null;

RandomAccessFile randomAccessFile = null;

randomAccessFile = new RandomAccessFile(file, "rwd");

channelOut = randomAccessFile.getChannel();

MappedByteBuffer mappedBuffer = channelOut.map(FileChannel.MapMode.READ_WRITE,

info.getReadLength(),allLength-info.getReadLength());

byte[] buffer = new byte[1024*8];

int len;

int record = 0;

while ((len = responseBody.byteStream().read(buffer)) != -1) {

mappedBuffer.put(buffer, 0, len);

record += len;

}

responseBody.byteStream().close();

if (channelOut != null) {

channelOut.close();

}

if (randomAccessFile != null) {

randomAccessFile.close();

}

}

停止下载

调用 subscriber.unsubscribe()解除监听,然后remove记录的下载数据和sub回调,并且设置下载状态(同步数据库自己添加)

/**

* 停止下载

*/

public void stopDown(DownInfo info){

if(info==null)return;

info.setState(DownState.STOP);

info.getListener().onStop();

if(subMap.containsKey(info.getUrl())) {

ProgressDownSubscriber subscriber=subMap.get(info.getUrl());

subscriber.unsubscribe();

subMap.remove(info.getUrl());

}

/*同步数据库*/

}

暂停下载

原理和停止下载原理一样

/**

* 暂停下载

* @param info

*/

public void pause(DownInfo info){

if(info==null)return;

info.setState(DownState.PAUSE);

info.getListener().onPuase();

if(subMap.containsKey(info.getUrl())){

ProgressDownSubscriber subscriber=subMap.get(info.getUrl());

subscriber.unsubscribe();

subMap.remove(info.getUrl());

}

/*这里需要讲info信息写入到数据中,可自由扩展,用自己项目的数据库*/

}

暂停全部和停止全部下载任务

/**

* 停止全部下载

*/

public void stopAllDown(){

for (DownInfo downInfo : downInfos) {

stopDown(downInfo);

}

subMap.clear();

downInfos.clear();

}

/**

* 暂停全部下载

*/

public void pauseAll(){

for (DownInfo downInfo : downInfos) {

pause(downInfo);

}

subMap.clear();

downInfos.clear();

}

整合代码HttpDownManager

同样使用了封装二中的retry处理和运行时异常自定义处理封装(不复述了)

补充

有同学说不知道数据库这块怎么替换,所以我加入了greenDao框架去优化数据库存储,在实际运用中可以将这块的逻辑替换成你项目的数据库框架(之前用的都是realm,这回正好练练手)

只需要替换DbUtil的方法即可

总结

到此我们的Rxjava+ReTrofit+okHttp深入浅出-封装就基本完成了,已经可以完全胜任开发和学习的全部工作,如果后续再使用过程中有任何问题欢迎留言给我,会一直维护!

1.Retrofit+Rxjava+okhttp基本使用方法

    2.统一处理请求数据格式

    3.统一的ProgressDialog和回调Subscriber处理

    4.取消http请求

    5.预处理http请求

    6.返回数据的统一判断

    7.失败后的retry封装处理

    8.RxLifecycle管理生命周期,防止泄露

    9.文件上传和文件下载(支持多文件断点续传)

源码:传送门-全部封装源码


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:网关api统计(api网关管理系统)
下一篇:详解使用spring boot admin监控spring cloud应用程序
相关文章

 发表评论

暂时没有评论,来抢沙发吧~