Skip to content

How to send batch SQL update with spring r2dbc

How to do a single statement SQL update using r2dbc ?

Probably, the most common way is the following

import org.springframework.r2dbc.core.DatabaseClient;
ConnectionFactory factory =
DatabaseClient client = DatabaseClient.create(factory);
Mono<Map<String, Object>> actor = client.sql("INSERT INTO t_actor (first_name, last_name ) VALUES (:fName, :lName")
.bind("fName", "First")
.bind("lNane", "last")
.fetch().first();

With java multi-line support, and bind method has common sql-injection protection. This is a reasonable interface to work with.

Batch Update

Bind the same parm multiple times org.springframework.r2dbc.core.DatabaseClient.sql wouldn’t yield a batch update.

Luckily, DatabaseClient has inConnectionMany method.

Combining with Statement, one can do batch in the following fashion.

import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Flux;
...
DatabaseClient databaseClient;
....
databaseClient.inConnectionMany(connection -> {
Statement statement = connection.createStatement("INSERT INTO t_actor (first_name, last_name ) VALUES (?fName, ?lName"));
statement.bind("fName", ...)
.bind("lName", ...);
// statement.add need to called for non-head non-tail element
statement.add();
statement.bind("fName", ...)
.bind("lName", ...);
return Flux.from(statments.execute());
});

Statement.add needs to invoked correctly.

Save the current binding and create a new one to indicate the statement should be executed again with new bindings provided through subsequent calls to bind and bindNull.

Otherwise, java.lang.IllegalStateException: Not all parameter values are provided yet. might occur.

Why not use Batch

Batch doesn’t support bind. It only can work with string. Unless the batch update only involve constant, which sounds unlikely, sql-injection should be a concern, using Statement should be a safer approach.

Compose batches

Flux.thenMany could be use to compose two Flux.from(Statement.execute)

Transaction

One can add transaction management around the batch update in the follow fashion.

import lombok.NonNull;
import lombok.Builder;
import lombok.Value;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;
@Value
@Builder
class Test {
@NonNull DatabaseClient databaseClient;
@NonNull ReactiveTransactionManager tm;
@Builder.Default int isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ;
@Builder.Default int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
TransactionDefinition getTxnDfn() {
var txnDfn = new DefaultTransactionDefinition();
txnDfn.setIsolationLevel(config.isolationLevel);
txnDfn.setPropagationBehavior(config.propagationBehavior);
return txnDfn;
}
Flux<Result> update() {
return TransactionalOperator.create(tm, getTxnDfn())
.execute(
status -> {
return batchOp();
});
}
}