Fragen? Jetzt Anruf vereinbaren!

Batch-Insert/Update mit Spring Data und JPA

Spring Data und JPA sind unglaublich praktische Java-Technologien, die den Entwickler von der mühsamen Aufgabe des manuellen Schreibens von SQL-Code befreien und die Verwendung von Java POJOs als Programmierschnittstelle ermöglichen. Wenn es jedoch um Batch-Operationen geht, kann es zu ernsthaften Leistungsproblemen kommen.

In diesem Artikel erfahren Sie, wie Sie diese Herausforderungen auf elegante und dennoch effiziente Weise lösen können.

Unser Szenario

Wir haben eine Datenbank mit etwa 1 Mio. Einträgen, verteilt auf 10 Tabellen. Jede Woche erhalten wir ein ZIP-Archiv mit neuen UND aktualisierten Einträgen. Die Anzahl der Einträge variiert, aber sie liegt bei etwa 50.000 Einträgen – jeder Eintrag ist eine einzelne Datei mit etwa 100kb Größe. Ein Eintrag im ZIP-Archiv entspricht einer Zeile in der Datenbank, in der die gesamte Datei zusammen mit einigen aus den Daten extrahierten Metadaten als zusätzliche Spalten gespeichert ist. Die Verbindung zwischen einem Eintrag in der Aktualisierungsdatei und der Datenbank ist ein, möglicherweise zusammengesetzter, Domänenschlüssel. Der Datenbankzugriff erfolgt über eine Spring-Boot-Anwendung unter Verwendung von Spring Data JPA und Hibernate, wobei der 2nd-Level-Cache aktiviert ist.

Der Anwendungsfall: Aktualisieren aller bereits vorhandenen Einträge in der Datenbank und Hinzufügen aller neuen Einträge. Nun, leichter gesagt als getan!

Die Datenbank

In unserem Szenario haben wir eine Datenbank mit etwa 10 einfachen Tabellen, jede mit

Diese Tabellen haben keine Spalten mit Nutzdaten gemeinsam, insbesondere nicht die Domänenschlüssel (nur die technischen Primärschlüssel). Der technische Primärschlüssel (die UUID) ist nur im Kontext des Systems bekannt. Er wird beim Anlegen einer Zeile in der Datenbank generiert und ist insbesondere nicht in den Daten der Aktualisierungsdatei enthalten. Wir werden gleich sehen, warum das eine Herausforderung ist.

Der Klarheit halber werden wir uns auf zwei Beispieltabellen konzentrieren. Die Inhaltsspalte dient als willkürlicher Platzhalter für die Nutzdaten, die in Wirklichkeit über mehrere verschiedene Spalten verteilt sind.

Das Modell

Die dazu passenden JPA Entities sind:

@Entity
@Table(name = "PartTable",indexes = @Index(columnList = "Number, Version", unique = true))
@Cacheable
public class PartEntity {

    @Id
    private String id;

    @Column(name = "Number")
    private String number;

    @Column(name = "Version")
    private int version;

    @Column(name = "Content")
    private String content;
}
@Entity
@Table(name = "LabelTable",indexes = @Index(columnList = "Name", unique = true))
@Cacheable
public class LabelEntity {

    @Id
    private String id;

    @Column(name = "Name")
    private String name;

    @Column(name = "Content")
    private String content;
}

Die Herausforderungen

Wie eingangs erwähnt, werden wir bei der Implementierung des Anwendungsfalls in diesem Szenario mit mehreren Herausforderungen konfrontiert. Die Herausforderungen sind:

  1. Wir müssen die Datenbank auf vorhandene Einträge prüfen, um entscheiden zu können, ob ein UPDATE oder ein INSERT erforderlich ist.
  2. Wir haben sehr viele Einträge und eine Leistungsanforderung, so dass ein naiver Ansatz zu einer inakzeptablen Laufzeit führen könnte.
  3. Was passiert mit unserem Arbeitsspeicher? Denken Sie daran, dass wir eine Datenmenge von 50.000 * ~100kb = ~ 4,7Gb haben.
  4. Und zu guter Letzt: kann all das auf eine nette, generische Art und Weise ohne viel duplizierten Code erledigt werden? (Spoiler: Ja!)

Die Lösung

Um die Lösung verständlicher zu machen, werden wir die verschiedenen Herausforderungen Schritt für Schritt lösen, bis wir die vollständige Lösung umgesetzt haben.

First Things First: Abfrage der Datenbank

Wie können wir die Datenbank effizient auf vorhandene Einträge in Spring Data JPA überprüfen? Wie bereits erwähnt, ist die einzige Verbindung zwischen den Importdaten und der Datenbank ein möglicherweise zusammengesetzter Domänenschlüssel. Der naivste und langsamste Weg, dies zu tun, wäre also eine Abfrage pro Entität. Etwa so:

SELECT * from PartTable where Number = entity1Number AND Version = entity1Version;

Wer schon einmal 50.000 Anweisungen – eine nach der anderen – ausgeführt hat, weiß: das ist kein Spaß! Zu dem reinen Datenbank-Roundtrip kommt noch hinzu, dass mit jeder Abfrage eine weitere Entität in der Hibernate-Sitzung behandelt werden muss. Dies führt zu zusätzlichem Overhead, z.B. durch automatische Dirty Checks vor jeder Abfrage. Dieser Overhead allein kann schon ein Performance-Killer sein.

Wie können wir es also besser machen? Idealerweise würden wir gerne genau eine Anweisung ausführen, um alle vorhandenen Entitäten abzufragen. Etwa so:

SELECT * from PartTable where (Number = ?1 AND Version = ?2) OR (Number = ?3 AND Version = ?4) OR (Number = ?5 AND Version = ?6)...;

Kann das irgendwie mit Spring Data JPA umgesetzt werden? Wir können die IN (…)-Anweisung verwenden, die leider nur für einzelne primitive Schlüssel funktioniert und NICHT für kombinierte Schlüssel wie die, die wir haben. Wenn wir jedoch nur primitive Schlüssel haben, z.B. eine einzelne String-Spalte, können wir einfach die folgende Methode zu unserem Spring Data Repository hinzufügen.

List<PartEntity> findAllWhereValueIn(final List<String> values);

In unserem Fall brauchen wir eine Art von dynamischen Abfragen. Die hässliche Art, dies zu tun, wäre, mit der Verkettung von HQL-Schnipseln herumzufummeln. Etwa so:

    StringBuilder builder = new StringBuilder("Select * from PartEntity where ");
    for (Entity entity : newOrUpdatedEntities){
        builder.append("(Number = :");
        builder.append(entity.getNumber());
        builder.append("AND Version = :");
        builder.append(entity.getVersion());
        builder.append(") OR ");
    }

Das funktioniert, aber:

  1. Es ist nicht cool!
  2. Wir könnten anfällig für HSQL / SQL Injection sein, oder wir brauchen eine komplexe Parameterbehandlung.
  3. Wir brechen das saubere Muster der Spring Data Repository Interfaces auf, da wir “echten” Code benötigen, um Abfragen auszuführen.
  4. Wir sind nun an eine bestimmte Spring Data-Implementierung (JPA) gebunden.

Gibt es einen besseren Weg?

Darf ich Ihnen vorstellen: der QuerydslPredicateExecutor

//package org.springframework.data.querydsl
public interface QuerydslPredicateExecutor<T> {
    Optional<T> findOne(Predicate predicate);

    Iterable<T> findAll(Predicate predicate);

    Iterable<T> findAll(Predicate predicate, Sort sort);

    Iterable<T> findAll(Predicate predicate, OrderSpecifier<?>... orders);

    Iterable<T> findAll(OrderSpecifier<?>... orders);

    Page<T> findAll(Predicate predicate, Pageable pageable);

    long count(Predicate predicate);

    boolean exists(Predicate predicate);
}

Aber was ist das?

Querydsl ist ein umfangreiches Java Framework, das die dynamische (Laufzeit-)Generierung von typsicheren Abfragen über eine Java API ermöglicht. Der QuerydslPredicateExecutor ist das Verbindungsstück, um Querydsl nahtlos in unsere Spring Data Repositories zu integrieren. Wie Sie sehen, können wir eine Art von Predicate (Querydsl) bereitstellen und erhalten dafür ein Iterable<T> zurück - großartig! Aber was ist ein Predicate? Ein Querydsl-Predicate ist die Basisschnittstelle für boolesche Ausdrücke. Im Grunde ist also jedes Kriterium, das Sie in Querydsl definieren können, ein Prädikat. Um den QuerydslPredicateExecutor zu verwenden, müssen wir nur die Schnittstelle in all unseren relevanten Repositories erweitern (Sie sollten vielleicht Ihr eigenes “abstraktes” Basis-Repository dafür erstellen, wie ich es unten getan habe).

@NoRepositoryBean // important! otherwise you'll get very weird errors
public interface BatchEnabledRepository<T, ID> extends CrudRepository<T, ID>, QuerydslPredicateExecutor<T> {
    //let this be your "base" repository for all entities you see fit.
}

Cool, aber wie kommt man an ein solches Prädikat? Nun, das erfordert ein wenig Arbeit, die wir in diesem Artikel nicht adressieren, da ausgezeichnete Tutorials zu diesem Thema im Internet zu finden sind. Ein guter Startpunkt ist zum Beispiel hier: Intro to Querydsl. Kurzum – wir brauchen Q-Klassen!

Alternativ: Das Gleiche kann mit der Criteria API und dem JpaSpecificationExecutor erreicht werden. Aber der QueryDslPredicateExecutor funktioniert auch für nicht JPA Repositories. Siehe Advanced Spring Data JPA - Specifications and Querydsl.

An diesem Punkt wären wir startklar, aber wohin mit dem “Prädikaterstellungscode”? Ich entschied mich in guter alter OOP-Manier, eine gemeinsame Schnittstelle zu definieren und jeder Entitätsklasse eine Methode hinzuzufügen, die uns ein für diese Entität spezifisches Prädikat liefert.

public interface UpdateableEntity {
    Predicate getKeyPredicate();
}

@Entity
@Table(name = "PartTable",indexes = @Index(columnList = "Number, Version", unique = true))
@Cacheable
public class PartEntity implements UpdateableEntity {
    
    @Id
    private String id;

    @Column(name = "Number")
    private String number;

    @Column(name = "Version")
    private int version;

    @Column(name = "Content")
    private String content;

    @Override
    public Predicate getKeyPredicate() {
        return QPartEntity.partEntity.number.eq(number).and(QPartEntity.partEntity.version.eq(version));
    }
}

Schließlich können wir mit folgendem Code alle vorhandenen Entitäten aus der Datenbank abfragen. Der Übersichtlichkeit halber haben wir den Code weggelassen, der notwendig ist, um die zu importierenden Einträge (in meinem Fall die ZIP-Datei) auszulesen. In unserem Fall stehen diese Einträge dann als losgelöste JPA-Entitäten zur Verfügung. Dies sind die List<T>-Elemente in dem folgenden Ausschnitt:

public <T extends UpdateableEntity> void updateOrInsert(List<T> elements, BatchEnabledRepository<T,?> repository) {
    final List<Predicate> predicates = elements.stream()
                .map(UpdateableEntity::getKeyPredicate)
                .collect(Collectors.toList());
    final Predicate predicate = com.querydsl.core.types.ExpressionUtils.anyOf(predicates.toArray(new Predicate[0]));
    final Iterable<T> alreadySaved = repository.findAll(predicate);
    final List<T> alreadySavedAsList = Lists.newArrayList(alreadySaved);
}

Zusätzlicher Hinweis: Die Methode gibt in unserem Fall nicht void zurück, sondern ein Objekt, das beschreibt, was passiert ist (hauptsächlich für die Protokollierung und das Benutzer-Feedback). Also wie viele Objekte aktualisiert, eingefügt und ignoriert wurden (weil sie final waren).

Toll, jetzt haben wir mit einer einzigen Abfrage alle existierenden Entitäten in der Datenbank gefunden! Schauen wir uns nun an, wie wir sie patchen können.

Vorhandene Entitäten mit neuem Inhalt patchen

Das nächste Problem ist: Wir erhalten z.B. 200 Objekte zurück - welche Importobjekte sind das?

Wie finden wir heraus, welche der Eingabeobjekte mit welchen der gefundenen Ergebnissen übereinstimmen? Eine einfache Iteration durch alle Elemente und ein Vergleich wäre wieder höchst ineffizient: O(n x m). Daher wollen wir eine Lookup-Map verwenden, und dafür brauchen wir ein Schlüsselobjekt für jede Entität. Das Überschreiben von hashcode & equals der Entitäten selbst wäre nicht sinnvoll, da dies unerwünschte Seiteneffekte zum Beispiel mit Hibernate / JPA haben würde.

Daher benötigen wir eine Möglichkeit, einen Schlüssel für jedes Objekt zu erstellen. Nachdem wir das getan haben, müssen wir außerdem die bestehende Entität mit dem neuen Inhalt patchen.

Das machen wir also als nächstes. Dazu erweitern wir unsere UpdateableEntity-Schnittstelle:

public interface UpdateableEntity<T> {
    Predicate getKeyPredicate();

    Object getKey();

    void patch(final T t);
}

@Entity
@Table(name = "PartTable",indexes = @Index(columnList = "Number, Version", unique = true))
@Cacheable
public class PartEntity implements UpdateableEntity<PartEntity> {

    @Id
    private String id;

    @Column(name = "Number")
    private String number;

    @Column(name = "Version")
    private int version;

    @Column(name = "Content")
    private String content;

    @Override
    public Predicate getKeyPredicate() {
        return QPartEntity.partEntity.number.eq(number).and(QPartEntity.partEntity.version.eq(version));
    }

    @Override
    public Object getKey() {
        // This could be any object, as long as hashCode() & equals() behaves as expected.
        return new ImmutablePair<>(partNumber, version);
    }

    @Override
    public void patch(final PartEntity entityToMerge) {
        content = entityToMerge.content;
        // As a reminder - in our real case this are more like a dozen fields
        // of course NOT the identifying fields (number and version in this case)
        // it wouldn't hurt, but it makes no sense to do it. 
        // The UUID is set only in "this" object.
    }
}

Easy! Geschafft! Nun wollen wir unsere Logik von oben erweitern.

public <T extends UpdateableEntity> void updateOrInsert(List<T> elements, BatchEnabledRepository<T,?> repository) {
    final List<Predicate> predicates = elements.stream()
                .map(UpdateableEntity::getKeyPredicate)
                .collect(Collectors.toList());
    final Predicate predicate = com.querydsl.core.types.ExpressionUtils.anyOf(predicates.toArray(new Predicate[0]));
    final Iterable<T> alreadySaved = repository.findAll(predicate);
    final List<T> alreadySavedAsList = Lists.newArrayList(alreadySaved);
    //new code
    final Map<Object, T> entitiesByKey = alreadySavedAsList.stream()
                .collect(Collectors.toMap(T::getKey, Function.identity()));
    final List<T> patchedBatchSet = elements.stream()
            .map(c -> {
                final T savedEntity = entitiesByKey.get(c.getKey());
                if (savedEntity != null) {
                    savedEntity.patch(c);
                    return savedEntity;
                }
                return c;
            })
            .collect(Collectors.toList());
    // This list contains now the patched entities from the database 
    // and the entities from import in case of new ones.
    repository.saveAll(patchedBatchSet);
}

Oh nein, wir haben kein RAM mehr!

Der verfügbare Arbeitsspeicher wird zu einem großen Problem. Wie bereits erwähnt, 50.000 Einträge * ~100kb = ~ 4,7Gb. Wenn jeder Eintrag bereits in der Datenbank vorhanden ist, werden wir bei der Ausführung unserer Anfrage mindestens 10 GB RAM benötigen. Warum ist das so? Das hängt mit der Sitzungsverwaltung in Hibernate / JPA zusammen. Hibernate verwaltet einen Verweis auf jedes Objekt, das während der Sitzung angefasst wurde (z. B. durch Abruf aus der Datenbank), bis die Sitzung beendet ist (die Transaktion ist abgeschlossen). Dies ermöglicht es Ihnen, Entitäten innerhalb einer Transaktion/Sitzung zu ändern und zu persistieren, ohne dass Sie sich darum kümmern müssen, jede Entität explizit zu speichern. Dies hat zur Folge, dass keine Entität gelöscht wird, solange die Sitzung läuft. Darüber hinaus speichert Hibernate den ursprünglichen Datenbankzustand jeder Entität, die aus der Datenbank abgerufen wird, um die Überprüfung des schmutzigen Zustands zu unterstützen. Daher rührt der doppelte Speicherbedarf.

Und warum bekomme ich StackOverflowExceptions, auch wenn ich genügend Speicher habe?

Nun, es stellt sich heraus, dass wir nicht EINE Abfrage mit 50.000 OR-Anweisungen erstellen können. Es gibt sowohl von Spring Data als auch von der DB Grenzen für die Größe von Abfragen.

Ok, wir schlagen zwei Fliegen mit einer Klappe! Wir teilen unsere einzelne Anweisung in Stapel kleinerer Größen auf. Erstens löst dies unser Problem der Abfragegröße. In meinem Fall habe ich eine Stapelgröße von 500 gewählt (da 1.000 bereits zu einem Absturz mit StackOverflowExceptions führen würde). Zweitens löst es unser Speicherproblem, wenn wir sicherstellen können, dass nur die Objekte des Stapels im Speicher verbleiben und alles andere in den Müll geworfen werden kann. Wie kann man das sicherstellen?

Es gibt mehrere Möglichkeiten, dieses Problem zu lösen.

  1. Wir könnten (und sollten wahrscheinlich) EntityManager.flush() und EntityManager.clear() nach jedem Stapel verwenden, um die Referenzen in den Heap zu entfernen (GC kümmert sich um den Rest)
  2. Wir könnten einzelne Transaktionen für jeden Batch verwenden.
  3. Wir könnten eine StatelessSession verwenden (Hibernate Native)

Ich habe mich für den zweiten Ansatz entschieden, da der erste bei mir nicht funktioniert hat. Ich habe nicht ganz verstanden, warum, aber aus irgendeinem Grund hielt die Transaktion immer noch einen Verweis auf die Heap-Objekte (wie ich in JProfiler sehen konnte). Die dritte Variante greift auf Hibernate Native zurück und schließt sich damit selbst aus.

Als Kompromiss verlieren wir bei diesem Ansatz die transaktionale Integrität zwischen den Batches. Wenn wir BATCH_SIZE z.B. auf 500 setzen und die ersten 20.000 Entitäten einwandfrei funktionieren, dann aber etwas schief geht, können wir die Transaktion nicht einfach zurücksetzen, da jeder Batch-Aufruf in einer eigenen Transaktion ausgeführt wird! Am Ende haben wir eine teilweise aktualisierte Datenbank und müssen möglicherweise unseren eigenen Code schreiben, um das zu tun, was hier erforderlich ist. In meinem Szenario ist das aber nicht wirklich ein Problem.

...
// split all elements in batch-size fragments
// call the method below with the fragments in a loop
// to prevent real "nested" transaction I disabled transaction creation on the outer methods 
// (in my case the REST controller) with
@Transactional(propagation = Propagation.NEVER)
...

// Important: The outer and the inner method have to be in different Spring Beans.
// Otherwise, the TransactionalAspect will not work on this method.
@Transactional(propagation = Propagation.REQUIRES_NEW) //creates a new transaction every time
public <T extends UpdateableEntity> void updateOrInsert(List<T> elements, BatchEnabledRepository<T,?> repository) {
    ...
}

Second Level Caching

In meinem Fall ist Second Level Caching aktiviert, was für diesen Anwendungsfall schlecht ist, da der Cache alle Objekte sammelt und den Speicher wieder aufbläht. Dies kann jedoch leicht vermieden werden, indem man es für die aktuelle Session ausschaltet.

    final Session session = entityManager.unwrap(Session.class);
    final CacheMode cacheMode = session.getCacheMode();
    try {
        LOGGER.info("Deactivating 2nd level caching.");
        session.setCacheMode(CacheMode.IGNORE);
        ...
        do the work
        ...
    } finally {
        LOGGER.info("Reactivating 2nd level caching.");
        session.setCacheMode(cacheMode);
        // you might wanna clear 2nd level cache here
    }

Weitere Überlegungen

In unserem realen Szenario haben wir eine zusätzliche Anforderung: für jede Zeile muss einzeln bestimmt werden, ob der Wert in der Datenbank überschrieben werden kann. Einige der Werte sind final. Um das zu erreichen, haben wir ein java.util.Predicate _overwrite_ zur obigen Methode hinzugefügt, um zu bestimmen, ob die geladene Entität mit dem neuen Wert aus der ZIP-Datei überschrieben werden soll.

Ich persönlich fand dieses Problem sehr schwer zu knacken und es hat mir einige graue Haare beschert, aber die endgültige Lösung gefällt mir sehr. Nur die Tatsache, dass ich “verschachtelte” Transaktionen verwenden musste, stört mich, auch wenn das in meinem Fall kein Problem ist, da das System idempotent ist.

Zusammen anpacken?

Leider habe ich nicht die Zeit gefunden, ein funktionierendes Beispiel zu erstellen und es auf github zu stellen. Wenn Sie dies tun möchten, helfe ich gerne!

Bildnachweis Titel: © Robert Kneschke, Adobe Stock
Rainer Ganß

Rainer Ganß

Software Architekt

ganss@4soft.de

Wie sind Ihre Erfahrungen mit Bulk-Operationen in Spring Data/JPA? Haben Sie andere Lösungsansätze? Ich freue mich über einen regen Austausch - kontaktieren Sie mich gern.