How to send batch SQL update with spring r2dbc

Post on June 20, 2024
, ,

How to do a single statement SQL update using r2dbc ?

Probably, the most common way is the following
import org.springframework.r2dbc.core.DatabaseClient;
ConnectionFactory factory = 

 DatabaseClient client = DatabaseClient.create(factory);
 Mono<Map<String, Object>> actor = client.sql("INSERT INTO t_actor (first_name, last_name ) VALUES (:fName, :lName")
 .bind("fName", "First")
 .bind("lNane", "last")
 .fetch().first();

With java multi-line support, and bind method has common sql-injection protection. This is a reasonable interface to work with.

Batch Update

Bind the same parm multiple times org.springframework.r2dbc.core.DatabaseClient.sql wouldn’t yield a batch update.

Luckily, DatabaseClient has inConnectionMany method.

Combining with Statement, one can do batch in the following fashion.
import org.springframework.r2dbc.core.DatabaseClient;
import io.r2dbc.spi.Statement;
import reactor.core.publisher.Flux;

...
DatabaseClient databaseClient;
....
databaseClient.inConnectionMany(connection -> {
   Statement statement = connection.createStatement("INSERT INTO t_actor (first_name, last_name ) VALUES (?fName, ?lName"));
    statement.bind("fName", ...)
        .bind("lName", ...);
    // statement.add need to called for non-head non-tail element
    statement.add();
    statement.bind("fName", ...)
        .bind("lName", ...);
    return Flux.from(statments.execute());
});

Statement.add needs to invoked correctly. > Save the current binding and create a new one to indicate the statement should be executed again with new bindings provided through subsequent calls to bind and bindNull.

Otherwise, java.lang.IllegalStateException: Not all parameter values are provided yet. might occur. ## Why not use Batch

Batch doesn’t support bind. It only can work with string. Unless the batch update only involve constant, which sounds unlikely, sql-injection should be a concern, using Statement should be a safer approach.

Compose batches

Flux.thenMany could be use to compose two Flux.from(Statement.execute)

Transaction

One can add transaction management around the batch update in the follow fashion.
import lombok.NonNull;
import lombok.Builder;
import lombok.Value;
import org.springframework.r2dbc.core.DatabaseClient;
import org.springframework.transaction.ReactiveTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.reactive.TransactionalOperator;
import org.springframework.transaction.support.DefaultTransactionDefinition;

@Value
@Builder
class Test  {
    @NonNull DatabaseClient databaseClient;
    @NonNull ReactiveTransactionManager tm;
    @Builder.Default int isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ;
    @Builder.Default int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED;
  

 TransactionDefinition getTxnDfn() {
    var txnDfn = new DefaultTransactionDefinition();
    txnDfn.setIsolationLevel(config.isolationLevel);
    txnDfn.setPropagationBehavior(config.propagationBehavior);
    return txnDfn;
  }
  
  Flux<Result> update() {
    return TransactionalOperator.create(tm, getTxnDfn())
        .execute(
            status -> {
              return batchOp();
            });
  }
 
 }

Powered by Hakyll and Nix