Skip to content

java

2 posts with the tag “java”

How to send batch SQL update with spring r2dbc

How to do a single statement SQL update using r2dbc ?

Probably, the most common way is the following

import org.springframework.r2dbc.core.DatabaseClient;
ConnectionFactory factory =
DatabaseClient client = DatabaseClient.create(factory);
Mono<Map<String, Object>> actor = client.sql("INSERT INTO t_actor (first_name, last_name ) VALUES (:fName, :lName")
.bind("fName", "First")
.bind("lNane", "last")
.fetch().first();

With java multi-line support, and bind method has common sql-injection protection. This is a reasonable interface to work with.

Batch Update

Bind the same parm multiple times org.springframework.r2dbc.core.DatabaseClient.sql wouldn’t yield a batch update.

Luckily, DatabaseClient has inConnectionMany method.

Combining with Statement, one can do batch in the following fashion.

import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Flux;
...
DatabaseClient databaseClient;
....
databaseClient.inConnectionMany(connection -> {
Statement statement = connection.createStatement("INSERT INTO t_actor (first_name, last_name ) VALUES (?fName, ?lName"));
statement.bind("fName", ...)
.bind("lName", ...);
// statement.add need to called for non-head non-tail element
statement.add();
statement.bind("fName", ...)
.bind("lName", ...);
return Flux.from(statments.execute());
});

Statement.add needs to invoked correctly.

Save the current binding and create a new one to indicate the statement should be executed again with new bindings provided through subsequent calls to bind and bindNull.

Otherwise, java.lang.IllegalStateException: Not all parameter values are provided yet. might occur.

Why not use Batch

Batch doesn’t support bind. It only can work with string. Unless the batch update only involve constant, which sounds unlikely, sql-injection should be a concern, using Statement should be a safer approach.

Compose batches

Flux.thenMany could be use to compose two Flux.from(Statement.execute)

Transaction

One can add transaction management around the batch update in the follow fashion.

import lombok.NonNull;
import lombok.Builder;
import lombok.Value;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
@Value
@Builder
class Test {
@NonNull DatabaseClient databaseClient;
@NonNull ReactiveTransactionManager tm;
@Builder.Default int isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ;
@Builder.Default int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
TransactionDefinition getTxnDfn() {
var txnDfn = new DefaultTransactionDefinition();
txnDfn.setIsolationLevel(config.isolationLevel);
txnDfn.setPropagationBehavior(config.propagationBehavior);
return txnDfn;
}
Flux<Result> update() {
return TransactionalOperator.create(tm, getTxnDfn())
.execute(
status -> {
return batchOp();
});
}
}

Project reactor - Mono zip with void

Mono<Void>

In reactor, sometime we want to give certain operation return type like Mono<Void>, like we don’t need any information from the operation, as long as the operation succeed. In this sense, we are using Void as Unit type

awkward case of Unit type in java

a unit type is a type that allows only one value.

https://en.wikipedia.org/wiki/Unit_type

In Java, the unit type is Void and its only value is null.

https://docs.oracle.com/javase/8/docs/api/java/lang/Void.html

The Void class is an uninstantiable placeholder class to hold a reference to the Class object representing the Java keyword void.

So Void supposes to be uninstantiable, but in practices, people use it as unit type along with null

implication in project reactor

In project reactor, there is Mono.zip

Aggregate given monos into a new Mono that will be fulfilled when all of the given Monos have produced an item, aggregating their values according to the provided combinator function. An error or empty completion of any source will cause other sources to be cancelled and the resulting Mono to immediately error or complete, respectively.

this method doesn’t work well with Mono<Void>

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
public class UnitTest {
@DisplayName("prove Mono void is not zip able, otherwise pipeline should have one element")
@Test
void voidIsNotzipable() {
StepVerifier.create(
Mono.zip(
Mono.<Void>fromCallable(
() -> {
return null;
}),
Mono.just(2))
.map(tuples -> 2))
.verifyComplete();
StepVerifier.create(Mono.zip(Mono.just(2).then(), Mono.just(2)).map(tuples -> 2))
.verifyComplete();
}
}

a simple workaround

we define our own unit type

import java.io.Serializable;
/** There is only one value of type Unit, () Void with null doesn't play well with Mono.zip */
public class Unit implements Serializable {
private Unit() {}
public static final Unit INSTANCE = new Unit();
}
StepVerifier.create(
Mono.zip(Mono.just(Unit.INSTANCE), Mono.just(Unit.INSTANCE)).map(tuples -> 2))
.assertNext(val -> Assertions.assertThat(val).isEqualTo(2))
.verifyComplete();