Rxjava2_Flowable_Sqlite_Android数据库访问实例

网友投稿 315 2023-02-18


Rxjava2_Flowable_Sqlite_Android数据库访问实例

一、使用Rxjava访问数据库的优点:

1.随意的线程控制,数据库操作在一个线程,返回数据处理在ui线程

2.随时订阅和取消订阅,而不必再使用回调函数

3.对读取的数据用rxjava进行过滤,流式处理

4.使用sqlbrite可以原生返回rxjava的格式,同时是响应式数据库框架

(有数据添加和更新时自动调用之前订阅了的读取函数,达到有数据添加自动更新ui的效果,

同时这个特性没有禁止的方法,只能通过取消订阅停止这个功能,对于有的框架这反而是一种累赘)

二、接下来之关注实现过程:

本次实现用rxjava2的Flowable,有被压支持(在不需要被压支持的情况建议使用Observable)

实现一个稳健的的可灵活切换其他数据库的结构,当然是先定义数据库访问接口。然后跟具不同的数据库实现接口的方法

定义接口:(对于update,delete,insert,可以选择void类型,来简化调用代码,但缺少了执行结果判断)

public interface DbSource {

//String sql = "insert into table_task (tid,startts) values(tid,startts)";

FlowabnEyOwEale insertNewTask(int tid, int startts);

//String sql = "select * from table_task";

Flowable> getAllTask();

//String sql = "select * from table_task where endts = 0";

Flowable> getRunningTask();

//String sql = "update table_task set isuploadend=isuploadend where tid=tid";

Flowable markUploadEnd(int tid, boolean isuploadend);

//String sql = "delete from table_task where tid=tid and endts>0";

Flowable deleteTask(int tid);

}

三、用android原生的Sqlite实现数据库操作

public class SimpleDb implements DbSource {

private static SimpleDb sqlite;

private SqliteHelper sqliteHelper;

private SimpleDb(Context context) {

this.sqliteHelper = new SqliteHelper(context);

}

public static synchronized SimpleDb getInstance(Context context) {

if (sqlite == null )

sqlite = new SimpleDb(context);

return sqlite;

}

Flowable insertNewTask(int tid, int startts) {

return Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法

ContentValues values = new ContentValues();

values.put(“tid”, 1);

values.put(“startts”,13233);

if(sqliteHelper.getWriteableDatabase().insert(TABLE_NAME, null, values) != -1)

e.onNext(true);

else

e.onNext(false);

e.onComplete();

}

}, BackpressureStrategy.BUFFER);

}

Flowable> getAllTask() {

return Flowable.create(new FlowableOnSubscribe>() {

@Override

public void subscribe(FlowableEmitter> e) throws Exception {

List taskList = new ArrayList<>();

StringBuilder sql = new StringBuilder(100);

sql.append("select * from ");

sql.append(SqliteHelper.TABLE_NAME_TASK);

SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();

Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);

if (cursor.moveToFirst()) {

int count = cursor.getCount();

for (int a = 0; a < count; a ++) {

TaskItem item = new TaskItem();

item.setTid(cursor.getInt(1));

item.setStartts(cursor.getInt(2));

item.setEndts(cursor.getInt(3));

taskList.add(item);

cursor.move(1);

}

}

cursor.close();

sqLiteDatabase.close();

e.onNext(taskList);

e.onComplete();

}

}, BackpressureStrategy.BUFFER);

}

Flowable> getRunningTask() {

return Flowable.create(new FlowableOnSubscribe>() {

@Override

public void subscribe(FlowableEmitter> e) throws Exception {

TaskItem item = null;

StringBuilder sql = new StringBuilder(100);

sql.append("select * from ");

sql.append(SqliteHelper.TABLE_NAME_TASK);

sql.append(" where endts=0 limit 1");

SQLiteDatabase sqLiteDatabase = sqliteHelper.getReadableDatabase();

Cursor cursor = sqLiteDatabase.rawQuery(sql.toString(), null);

if (cursor.moveToFirst()) {

int count = cursor.getCount();

if (count == 1) {

item = new TaskItem();

item.setId(cursor.getInt(0));

item.setTid(cursor.getInt(1));

item.setStartts(cursor.getInt(2));

item.setEndts(cursor.getInt(3));

}

}

cursor.close();

sqLiteDatabase.close();

e.onNext(Optional.fromNullable(item)); //import com.google.common.base.Optional;//安全检查,待会看调用的代码,配合rxjava很好

e.onComplete();

}

}, BackpressureStrategy.BUFFER);

}

Flowable markUploadEnd(int tid, boolean isuploadend) {

return Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法

//数据库操作代码

e.onNext(false);//返回结果

e.onComplete();//返回结束

}

}, BackpressureStrategy.BUFFER);

}

Flowable deleteTask(int tid) {

return Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

//这里数据库操作只做示例代码,主要关注rxjava的Flowable使用方法

//数据库操作代码

e.onNext(false);//返回结果

e.onComplete();//返回结束

}

}, BackpressureStrategy.BUFFER);

}

}

四、同一个接口使用sqlbrite的实现方式

public class BriteDb implements DbSource {

@NonNull

protected final BriteDatabase mDatabaseHelper;

@NonNull

private Function mTaskMapperFunction;

@NonNull

private Function mPoiMapperFunction;

@NonNull

private Function mInterestPoiMapperFunction;

// Prevent direct instantiation.

private BriteDb(@NonNull Context context) {

DbHelper dbHelper = new DbHelper(context);

SqlBrite sqlBrite = new SqlBrite.Builder().build();

mDatabaseHelper = sqlBrite.wrapDatabaseHelper(dbHelper, Schedulers.io();

mTaskMapperFunction = this::getTask;

mPoiMapperFunction = this::getPoi;

mInterestPoiMapperFunction = this::getInterestPoi;

}

@Nullable

private static BriteDb INSTANCE;

public static BriteDb getInstance(@NonNull Context context) {

if (INSTANCE == null) {

INSTANCE = new BriteDb(context);

}

return INSTANCE;

}

@NonNull

private TaskItem getTask(@NonNull Cursor c) {

TaskItem item = new TaskItem();

item.setId(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ID)));

item.setTid(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_TID)));

item.setStartts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_STARTTS)));

item.setEndts(c.getInt(c.getColumnIndexOrThrow(PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS)));

return item;

}

@Override

public void insertNewTask(int tid, int startts) {

ContentValues values = new ContentValues();

values.put(PersistenceContract.TaskEntry.COLUMN_TASK_TID, tid);

values.put(PersistenceContracnEyOwEat.TaskEntry.COLUMN_TASK_STARTTS, startts);

mDatabaseHelper.insert(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, SQLiteDatabase.CONFLICT_REPLACE);

}

@Override

public Flowable> getAllTask() {

String sql = String.format("SELECT * FROM %s", PersistenceContract.TaskEntry.TABLE_NAME_TASK);//TABLE_NAME_TASK表的名字字符串

return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql)

.mapToList(mTaskMapperFunction)

.toFlowable(BackpressureStrategy.BUFFER);

}

@Override

public Flowable> getRunningTask() {

String sql = String.format("SELECT * FROM %s WHERE %s = ? limit 1",

PersistenceContract.TaskEntry.TABLE_NAME_TASK, PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS);

return mDatabaseHelper.createQuery(PersistenceContract.TaskEntry.TABLE_NAME_TASK, sql, "0")

.mapToOne(cursor -> Optional.fromNullable(mTaskMapperFunction.apply(cursor)))

.toFlowable(BackpressureStrategy.BUFFER);

}

@Override

public Flowable markUploadEnd(int tid, boolean isuploadend) {

return Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

ContentValues values = new ContentValues();

if(isuploadend) {

values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 1);

} else {

values.put(PersistenceContract.TaskEntry.COLUMN_TASK_ISUPLOADEND, 0);

}

String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ?";

//String[] selectionArgs = {String.valueOf(tid)};

String selectionArgs = String.valueOf(tid);

int res = mDatabaseHelper.update(PersistenceContract.TaskEntry.TABLE_NAME_TASK, values, selection, selectionArgs);

if (res > 0) {

e.onNext(true);//返回结果

} else {

e.onNext(false);//返回结果

}

e.onComplete();//返回结束

}

}, BackpressureStrategy.BUFFER);

}

@Override

public Flowable deleteTask(int tid) {

return Flowable.create(new FlowableOnSubscribe() {

@Override

public void subscribe(FlowableEmitter e) throws Exception {

String selection = PersistenceContract.TaskEntry.COLUMN_TASK_TID + " = ? AND "+

PersistenceContract.TaskEntry.COLUMN_TASK_ENDTS + " > 0";

String[] selectionArgs = new String[1];

selectionArgs[0] = String.valueOf(tid);

int res = mDatabaseHelper.delete(PersistenceContract.TaskEntry.TABLE_NAME_TASK, selection, selectionArgs);

if (res > 0) {

e.onNext(true);//返回结果

} else {

e.onNext(false);//返回结果

nEyOwEa }

e.onComplete();//返回结束

}

}, BackpressureStrategy.BUFFER);

}

}

五、数据库调用使用方法

使用了lambda简化了表达式进一步简化代码:

简化方法:在/app/build.gradle里面加入如下内容:(defaultConfig的外面)

compileOptions {

sourceCompatibility JavaVersion.VERSION_1_8

targetCompatibility JavaVersion.VERSION_1_8

}

接口调用(获得数据库实例):

//全局定义的实例获取类,以后想要换数据库,只需在这个类里切换即可

public class Injection {

public static DbSource getDbSource(Context context) {

//choose one of them

//return BriteDb.getInstance(context);

return SimpleDb.getInstance(context);

}

}

DbSource db = Injection.getInstance(mContext);

disposable1 = db.getAllTask()

.flatMap(Flowable::fromIterable)

.filter(task -> { //自定义过滤

if (!task.getIsuploadend()) {

return true;

} else {

return false;

}

})

.subscribe(taskItems -> //这里是使用了lambda简化了表达式

doTaskProcess(taskItems)

, throwable -> {

throwable.printStackTrace();

},// onCompleted

() -> {

if (disposable1 != null && !disposable1.isDisposed()) {

disposable1.dispose();

}

});

disposable1 = db.getRunningTask()

.filter(Optional::isPresent) //判断是否为空,为空的就跳过

.map(Optional::get) //获取到真的参数

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(taskItem -> { //onNext()

//has running task

mTid = taskItem.getTid();

}, throwable -> throwable.printStackTrace() //onError()

, () -> disposable1.dispose()); //onComplete()

disposable1 = db.markUploadEnd(tid, isuploadend)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(status -> { //onNext()

if (status) {

//dosomething

}

}, throwable -> throwable.printStackTrace() //onError()

, () -> disposable1.dispose()); //onComplete()

disposable1 = db.deleteTask(tid)

.subscribeOn(Schedulers.io())

.observeOn(AndroidSchedulers.mainThread())

.subscribe(status -> { //onNext()

if (status) {

//dosomething

}

}, throwable -> throwable.printStackTrace() //onError()

, () -> disposable1.dispose()); //onComplete()


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java+mysql实现商品抢购功能
下一篇:VUE 使用中踩过的坑
相关文章

 发表评论

暂时没有评论,来抢沙发吧~