From b90efcb363931ba8cc025f81a03be2af65eb00bf Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Wed, 2 Dec 2015 11:08:29 +0100 Subject: [PATCH] Reactive Postgres support with pgasync 0.7 --- build.gradle | 2 +- .../postgres/PostgresPersonRepository.java | 37 ++++--------------- 2 files changed, 9 insertions(+), 30 deletions(-) diff --git a/build.gradle b/build.gradle index f095f83..c3f4e70 100644 --- a/build.gradle +++ b/build.gradle @@ -39,7 +39,7 @@ dependencies { compile "org.mongodb:mongodb-driver-reactivestreams:1.1.0" compile "com.couchbase.client:java-client:2.2.2" - compile "com.github.alaisi.pgasync:postgres-async-driver:0.6" + compile "com.github.alaisi.pgasync:postgres-async-driver:0.7" compile "org.slf4j:slf4j-jcl:1.7.12" compile "org.slf4j:jul-to-slf4j:1.7.12" diff --git a/src/main/java/playground/postgres/PostgresPersonRepository.java b/src/main/java/playground/postgres/PostgresPersonRepository.java index 673a500..82c8a86 100644 --- a/src/main/java/playground/postgres/PostgresPersonRepository.java +++ b/src/main/java/playground/postgres/PostgresPersonRepository.java @@ -16,12 +16,7 @@ package playground.postgres; -import java.util.Arrays; -import java.util.function.Consumer; - import com.github.pgasync.Db; -import com.github.pgasync.ResultSet; -import com.github.pgasync.Row; import playground.Person; import rx.Observable; @@ -43,32 +38,16 @@ public PostgresPersonRepository(Db db) { public Observable insert(Observable personStream) { return personStream.flatMap(p -> - Observable.create(subscriber -> { - Consumer onSuccess = result -> { - subscriber.onNext(result); - subscriber.onCompleted(); - }; - Consumer onError = throwable -> subscriber.onError(throwable); - db.query("insert into persons(firstname, lastname, address, postalCode, city) values($1, $2, $3, $4, $5)", - Arrays.asList(p.getFirstname(), p.getFirstname(), p.getAddress(), p.getPostalCode(), p.getCity()), onSuccess, onError); - // See https://github.com/ReactiveX/RxJava/issues/3037 about Observable - })).flatMap(document -> Observable.empty()); + db.querySet("insert into persons(firstname, lastname, address, postalCode, city) values($1, $2, $3, $4, $5)", + p.getFirstname(), p.getFirstname(), p.getAddress(), p.getPostalCode(), p.getCity()) + // See https://github.com/ReactiveX/RxJava/issues/3037 about Observable + ).flatMap(document -> Observable.empty()); } public Observable list() { - return Observable.create(subscriber -> { - Consumer onSuccess = resultSet -> { - // We can only get the multiple rows at the same time, it would be better to retreive them - // with an Observable or a Publisher. More details on the related feature request at - // https://github.com/alaisi/postgres-async-driver/issues/4#issuecomment-160980796 - for (Row row : resultSet) { - subscriber.onNext(new Person(row.getString("firstname"), row.getString("lastname"), - row.getString("address"), row.getString("postalCode"), row.getString("city"))); - } - subscriber.onCompleted(); - }; - Consumer onError = throwable -> subscriber.onError(throwable); - db.query("select * from persons", onSuccess, onError); - }); + return db.queryRows("select * from persons").map(row -> + new Person(row.getString("firstname"), row.getString("lastname"), + row.getString("address"), row.getString("postalCode"), row.getString("city")) + ); } }