2016-04-03 11 views
4

Für ein riesiges Projekt mit vielen Entitäten habe ich eine gemeinsame Methode geschrieben: save().Zend Framework 2/Doctrine 2/Massenoperationen und Ereignisse triggern

Diese Methode wird in einem abstrakten Service gespeichert und wird im gesamten Projekt zum Speichern des Entitätsstatus verwendet.

Abstract :: save() sieht wie folgt aus:

public function save($entity) 
{ 
    $transactionStarted = $this->beginTransaction(); 

    try 
    { 
     $action = $entity->getId() ? self::UPDATE : self::CREATION; 

     $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 

     $this->getEntityManager()->persist($entity); 
     $this->getEntityManager()->flush(); 

     $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 

     if ($transactionStarted) 
     { 
      $this->commitTransaction(); 
     } 
    } catch (\Exception $e) 
    { 
     if ($transactionStarted) 
     { 
      $this->rollbackTransaction(); 
     } 

     throw new Exception('Unable to save entity', $e); 
    } 

    return true; 
} 

public function beginTransaction() 
{ 
    if (!$this->getEntityManager()->getConnection()->isTransactionActive()) 
    { 
     $this->getEntityManager()->getConnection()->beginTransaction(); 

     return true; 
    } 

    return false; 
} 

public function commitTransaction() 
{ 
    $this->getEntityManager()->getConnection()->commit(); 

    return $this; 
} 

public function rollbackTransaction() 
{ 
    $this->getEntityManager()->getConnection()->rollBack(); 

    return $this; 
} 

In meinem Fall, wenn ein Mitglied (neu Member Einheit) beim Aufruf des Member Service (erweiterte AbstractService) eingesetzt wird, wird eine E-Mail gesendet werden (zB) durch das Ereignis save.post. Oder eine andere Aktion, die sich auf eine andere Methode zum Speichern von Diensten bezieht, kann ebenfalls fortgesetzt werden.

Beispiel des "Kind" Mitgliederservice :: save() -Methode

MemberService 

public function save(Member $member) 
{ 
    // some stuff, e.g set a property 
    $member->setFirstName('John'); 

    return parent::save($member); 
} 

Beispiel für getriggerte Ereignis

$sharedEventManager->attach(MemberService::class, 'save.post', [$this, 'onMembersCreation']); 

public function onMembersCreation(EventInterface $event) 
{ 
    // send an email 

    // anything else ... update another entity ... (call AnotherService::save() too) 
} 

, die für einen einfachen Speichervorgang groß ist.

Aber jetzt möchte ich eine Menge Mitglieder massiv importieren, mit Kreationen, Updates, ... Und um das zu erreichen, lese ich das Doctrine-Dokument zu Bulk-Importen. Doc here

Aber wie meinen Code richtig zu aktualisieren, um "Massenspeicherung" und "Einzelspeicherung" zu behandeln? Und behalten Sie Transaktionssicherheit und Ereignisse?

+0

Was sind "viele Mitglieder"? 1k? 1M? Ihre Antwort wird definieren, welche Strategie Sie adaopt – JesusTheHun

+0

Hallo JesusTheHun, ich danke Ihnen, die erste, die von meinem Problem interessiert sein :) "eine Menge Mitglieder" ist von 4k bis 10k – ceadreak

+0

Ist dies ein One-Shot-Import oder muss es oft laufen? Zugrunde liegende Frage ist: Ist Leistung wichtig? – JesusTheHun

Antwort

1

Im Grunde schlage ich vor, dass Sie die Doctrine \ Common \ Collections \ Collection-Schnittstelle implementieren, vielleicht ArrayCollection erweitern, und erstellen Sie eine Methode speichern, die tun, was das Dokument Ihnen gesagt hat.

<?php 

class MyDirtyCollection extends \Doctrine\Common\Collections\ArrayCollection { 

    public function __construct(AbstractService $abstractService) 
    { 
     $this->service = $abstractService; 
    } 

    public function save() 
    { 
     foreach ($this as $entity) { 
      $this->service->save($entity); 
     } 
    } 
} 

class MyCollection extends \Doctrine\Common\Collections\ArrayCollection { 

    public $bulkSize = 500; 

    protected $eventManager; 
    protected $entityManager; 

    public function __construct(EntityManager $entityManager, EventManager $eventManager) 
    { 
     $this->entityManager = $entityManager; 
     $this->eventManager = $eventManager; 
    } 

    public function getEventManager() 
    { 
     return $this->eventManager; 
    } 

    public function getEntityManager() 
    { 
     return $this->entityManager; 
    } 

    public function setBulkSize(int $bulkSize) 
    { 
     $this->bulkSize = $bulkSize; 
    } 

    public function save() 
    { 
     $transactionStarted = $this->getEntityManager()->getConnection()->beginTransaction(); 

     try { 
      foreach ($this as $entity) { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 
       $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 
      } 

      $i = 0; 
      foreach ($this as $entity) { 
       $i++; 

       $this->getEntityManager()->persist($entity); 

       if (($i % $this->bulkSize) === 0) { 
        $this->getEntityManager()->flush(); 
        $this->getEntityManager()->clear(); 
       } 
      } 

      $this->getEntityManager()->flush(); 
      $this->getEntityManager()->clear(); 

      foreach ($this as $entity) { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 
       $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 
      } 

      if ($transactionStarted) { 
       $this->getEntityManager()->getConnection()->commitTransaction(); 
      } 

     } catch (Exception $e) { 
      $this->getEntityManager()->rollbackTransaction(); 
     } 
    } 
} 

So etwas;) Wenn Sie Ihre Daten können Sie Ihre Sammlung Hydrat holen, dann beschäftigen Sie Ihr Unternehmen und rufen schließlich $collection->save();

EDIT: Insert Klasse hinzufügen und verwenden Fall unter:

Die Leistung hier wird niedrig sein, aber immer noch besser als commit by commit. Dennoch sollten Sie Doctrine DBAL anstelle von ORM verwenden, wenn Sie nach hgih-Leistung suchen. Hier habe ich mit Ihnen meine DBAL Klasse für Bulk-Insert teilen:

<?php 

namespace JTH\Doctrine\DBAL; 

use Doctrine\DBAL\Query\QueryBuilder; 
use Exception; 
use InvalidArgumentException; 
use Traversable; 
use UnderflowException; 

class Insert extends QueryBuilder 
{ 
    const CALLBACK_FAILURE_SKIP = 0; 
    const CALLBACK_FAILURE_BREAK = 1; 

    protected $callbackFailureStrategy = self::CALLBACK_FAILURE_BREAK; 

    public static $defaultBulkSize = 500; 

    public $ignore = false; 
    public $onDuplicate = null; 

    public function values(array $values) 
    { 
     $this->resetQueryPart('values'); 
     $this->addValues($values); 
    } 

    public function addValues(array $values) 
    { 
     $this->add('values', $values, true); 
    } 

    public function setCallbackFailureStrategy($strategy) 
    { 
     if ($strategy == static::CALLBACK_FAILURE_BREAK) { 
      $this->callbackFailureStrategy = static::CALLBACK_FAILURE_BREAK; 
     } elseif ($strategy == static::CALLBACK_FAILURE_SKIP) { 
      $this->callbackFailureStrategy = static::CALLBACK_FAILURE_SKIP; 
     } else { 
      $class = self::class; 
      throw new InvalidArgumentException(
       "Invalid failure behaviour. See $class::CALLBACK_FAILURE_SKIP and $class::CALLBACK_FAILURE_BREAK" 
      ); 
     } 
    } 

    public function getCallbackFailureStrategy() 
    { 
     return $this->callbackFailureStrategy; 
    } 

    public function execute() 
    { 
     return $this->getConnection()->executeUpdate(
      $this->getSQLForInsert(), 
      $this->getParameters(), 
      $this->getParameterTypes() 
     ); 
    } 

    /** 
    * Converts this instance into an INSERT string in SQL. 
    * @return string 
    * @throws \Exception 
    */ 
    private function getSQLForInsert() 
    { 
     $count = sizeof($this->getQueryPart('values')); 

     if ($count == 0) { 
      throw new UnderflowException("No values ready for INSERT"); 
     } 

     $values = current($this->getQueryPart('values')); 
     $ignore = $this->ignore ? 'IGNORE' : '' ; 
     $sql = "INSERT $ignore INTO " . $this->getQueryPart('from')['table'] . 
      ' (' . implode(', ', array_keys($values)) . ')' . ' VALUES '; 

     foreach ($this->getQueryPart('values') as $values) { 
      $sql .= '(' ; 

      foreach ($values as $value) { 
       if (is_array($value)) { 
        if ($value['raw']) { 
         $sql .= $value['value'] . ','; 
        } else { 
         $sql .= $this->expr()->literal($value['value'], $value['type']) . ','; 
        } 
       } else { 
        $sql .= $this->expr()->literal($value) . ','; 
       } 
      } 

      $sql = substr($sql, 0, -1); 
      $sql .= '),'; 
     } 

     $sql = substr($sql, 0, -1); 

     if (!is_null($this->onDuplicate)) { 
      $sql .= ' ON DUPLICATE KEY UPDATE ' . $this->onDuplicate . ' '; 
     } 

     return $sql; 
    } 

    /** 
    * @param $loopable array | Traversable An array or object to loop over 
    * @param $callable Callable A callable that will be called before actually insert the row. 
    * two parameters will be passed : 
    * - the key of the current row 
    * - the row values (Array) 
    * An array of rows to insert must be returned 
    * @param $bulkSize int How many rows will be inserted at once 
    * @param bool $transactionnal 
    * @throws \Doctrine\DBAL\ConnectionException 
    * @throws \Exception 
    */ 
    public function bulk($loopable, callable $callable, $bulkSize = null, $transactionnal = true) 
    { 
     if (!is_array($loopable) and !($loopable instanceof Traversable)) { 
      throw new InvalidArgumentException("\$loppable must be either an array or a traversable object"); 
     } 

     $bulkSize = $bulkSize ?? static::$defaultBulkSize; 

     $this->getConnection()->getConfiguration()->setSQLLogger(null); // Avoid MonoLog memory overload 

     if ($transactionnal) { 
      $this->getConnection()->beginTransaction(); 
     } 

     $this->resetQueryPart('values'); 

     foreach ($loopable as $key => $values) { 
      try { 
       $callbackedValues = $callable($key, $values); 

       if (sizeof($callbackedValues) > 0) { 
        foreach ($callbackedValues as $callbackedValuesRow) { 
         $this->addValues($callbackedValuesRow); 
        } 
       } 
      } catch (Exception $e) { 
       /* 
       * If a callback exception must break the transaction, then throw the exception to the call stack 
       * Else, skip the row insertion 
       */ 
       if ($this->callbackFailureStrategy == static::CALLBACK_FAILURE_BREAK) { 
        throw $e; 
       } else { 
        continue; 
       } 
      } 

      $count = count($this->getQueryPart('values')); 

      if ($count >= $bulkSize) { 
       $this->execute(); 
       $this->resetQueryPart('values'); 
      } 
     } 

     $count = count($this->getQueryPart('values')); 

     if ($count > 0) { 
      $this->execute(); 
     } 

     $this->resetQueryPart('values'); 

     if ($transactionnal) { 
      $this->getConnection()->commit(); 
     } 
    } 

    /** 
    * @return boolean 
    */ 
    public function isIgnore() 
    { 
     return $this->ignore; 
    } 

    /** 
    * @param boolean $ignore 
    */ 
    public function setIgnore(bool $ignore) 
    { 
     $this->ignore = $ignore; 
    } 

    /** 
    * @return null|string 
    */ 
    public function getOnDuplicate() : string 
    { 
     return $this->onDuplicate; 
    } 

    /** 
    * @param null $onDuplicate 
    */ 
    public function setOnDuplicate($onDuplicate) 
    { 
     $this->onDuplicate = $onDuplicate; 
     $this->ignore = false; 
    } 


} 

Anwendungsfall:

try { 
     $i = new Insert($this->getDoctrine()->getConnection('myDB')); 
     $i->insert('myTable'); 
     $i->setOnDuplicate('col1 = VALUES(col1), updated_last = NOW()'); 
     $i->setCallbackFailureStrategy(Insert::CALLBACK_FAILURE_BREAK); 
     $i->bulk($myArrayOfRows, function ($key, $row) { 

      // Some pre-insert processing 

      $rowset[] = $row; 

      return $rowset; 

     }, 500, true); 

     $this->addFlash('success', 'Yay !'); 

    } catch (DBALException $e) { 
     $this->addFlash('error', 'Damn, error : ' . $e->getMessage()); 
    } 
+0

Mhhh Ich bin mir nicht sicher, ich versuchte eine sehr ähnliche Methode mit Array, nicht Sammlung aber hatte Probleme mit Transaktionen ... Ich werde wieder mit einer Sammlung testen, ob das Problem das gleiche ist – ceadreak

+0

Was war das Problem mit den Transaktionen? Überprüfen Sie mein Update auch – JesusTheHun

+0

@ JesusTheHun Ich werde Ihre Lösung dieses p.m testen, danke für die Zeit – ceadreak

0

Schließlich habe ich merge Lehre Methode, und es scheint sehr gut zu funktionieren.

schrieb ich ein separates AbstractService::saveBulk() Verfahren eine hohe Anzahl von Member Entitäten zu speichern, wie:

/** 
    * @param ArrayCollection $entities 
    * 
    * @return bool 
    * @throws Exception 
    */ 
    public function saveBulk(ArrayCollection $entities) 
    { 
     $batchSize = 100; 
     $i   = 0; 

     foreach ($entities as $entity) 
     { 
      $transactionStarted = $this->beginTransaction(); 

      try 
      { 
       $action = $entity->getId() ? self::UPDATE : self::CREATION; 

       $this->getEventManager()->trigger('save.pre', $entity, ['action' => $action]); 

       $entity = $this->getEntityManager()->merge($entity); 

       $this->getEntityManager()->persist($entity); 
       $this->getEntityManager()->flush(); 

       $this->getEventManager()->trigger('save.post', $entity, ['action' => $action]); 

       if (($i % $batchSize) === 0) 
       { 
        $this->getEntityManager()->clear(); 
       } 

       if ($transactionStarted) 
       { 
        $this->commitTransaction(); 
       } 
      } catch (\Exception $e) 
      { 
       if ($transactionStarted) 
       { 
        $this->rollbackTransaction(); 
       } 

       throw new Exception(Exception::UNEXPECTED_ERROR, 'Unable to save entity', $e); 
      } 
     } 

     $this->getEntityManager()->clear(); 

     return true; 
    } 

entgegen die Doctrine2 Dokumentation, ich nenne nur clear() und nicht flush() + clear() für jede Charge, weil für einige Ereignisse genannt Ich muss wissen, ob die Entität eine Datenbankkennung hat.

@JesusTheHun danke für Ihre Kommentare, die mir sehr helfen.

+0

Sie starten eine Transaktion für jede Entität statt einmal für Ihre Bulk. Warum verwendest du nicht die Methode, die ich dir in der Klasse MyCollection gegeben habe? (http://stackoverflow.com/questions/36390063/zend-framework-2-doctrine-2-bulk-operations-and-events-triggering/36602128#36602128) Wenn Sie auf jede Entität bündeln, macht es keinen Sinn, Sie müssen jede $ BatchSize spülen. Die Stapelgröße var muss ein Parameter, eine Klassenkonstante oder eine Objekteigenschaft sein, die auf dem Konstrukt oder einer statischen Eigenschaft initialisiert wurde. Wenn Sie die Modulo-Bedingung verwenden und beispielsweise 121 Entitäten haben, werden Sie die letzten 21 Entitäten nicht binden. – JesusTheHun

+0

Ich habe versucht, Ihre Methode, aber ohne Zusammenführung, immer noch den Fehler "Eine neue Entität wurde durch die Beziehung gefunden ...".'beginTransaction()' Methode überprüft, ob eine Transaktion bereits Starter ist, starte ich keine Transaktion für jede Entität – ceadreak

+0

Ich habe versucht, Ihre Methode erneut mit einer Zusammenführung vor persistance zu verwenden, und es funktioniert jetzt. bei 500 Entitäten dauert es 4 Sekunden weniger als bei meinen Methoden;). Antwort akzeptiert +1 – ceadreak