Apache Spark – Lessons Learned

Wir haben uns in letzter Zeit sehr intensiv mit Apache Spark auseinandergesetzt und möchten in diesem Beitrag einen Überblick unserer bisherigen Erfahrung teilen.

Kleiner Überblick über das System 

Apache Spark ist ein schnelles und universelles Framework zur Berechnung von großen Datenmengen in einer Clusterumgebung. Neben MapReduce bietet Spark inzwischen eine Vielfalt an Optionen zur Datenverarbeitung, wie Spark SQL, eine eigene Machine Learning Libary, sowie Graphen und Streaming Methoden.

Spark bietet eine starke Vereinfachung von Operationen, welche über ein Cluster parallelisiert werden können. Dies geschieht fast automatisch. Hilfe bei der Verarbeitung bieten die RDDs (Resilent Distributed Datasets), auf denen das Konzept aufgebaut ist. Mit Spark 1.6 kamen auch Spark Dataframes hinzu, die die RDDs unterstützen sollen und eine Brücke zu Spark SQL schlagen. Die RRDs erlauben es Spark, ein Datenset in mehrere Partitionen aufzuteilen. Die Aufteilung wird an die Worker versendet, die dann ihre zugeteilte Partition aus der Datei lesen. Bei diesem Schritt ist es wichtig, dass alle Worker Zugriff auf die Datei (z.B. durch das HDFS) haben. So kann das Datenset parallel auf mehreren Knoten gleichzeitig verarbeitet werden. Außerdem können die RRDs In Memory gehalten werden, sodass ein noch höherer Grad an Verarbeitungsgeschwindigkeit geboten werden kann. Führt man auf diesem RDD eine oder mehrere Berechnungsaktionen aus, berechnet der JobScheduler die dafür benötigten Tasks und verteilt diese an die einzelnen Executors in den Knoten des Clusters, die daraufhin möglichst parallel den Job ausführen. 

Spark kann über Java, Scala, Python und R angesprochen werden und auf verschiedenen Umgebungen wie Yarn, Mesos, Kubernetes oder komplett Standalone laufen. 

Verwendet man keine Datenbank, sondern lokale Daten, sollte man - um diese auf alle Knoten verteilen zu können - ein verteiltes Dateisystem wie HDFS oder Alluxio nutzen, da die Dateien sonst manuell auf die Knoten verteilt werden müssen.  

Optimierung ist der Schlüssel zum Erfolg 

Wir nutzen Spark hauptsächlich im Umfeld von Hadoop mit Hilfe von Yarn und HDFS. Schnell haben wir gemerkt, dass wir die Ressourcen mit den Standardeinstellungen nicht so ausnutzen, wie wir es eigentlich möchten. Daher haben wir einen Weg gesucht, die Einstellungen so kalkulieren zu können, dass sie die gegebenen Ressourcen optimal nutzen. Bei unserer Recherche sind wir auf Seiten (https://umbertogriffo.gitbooks.io/apache-spark-best-practices-and-tuning/content/sparksqlshufflepartitions_draft.html) gestoßen, die uns Formeln zur Hand gegeben haben, wie wir anhand der von uns gegebenen Anzahl an Knoten und deren Ressourcen eine gute Parameterkonfiguration treffen können. 

Entscheidend hierfür sind folgende Parameter, die beim Submit des Sparkjobs übergeben werden oder vorher im Yarn-Cluster eingestellt werden müssen:

Einstellungen in Yarn 

yarn.nodemanager.resource.memory-mb:  
Die Größe des Speichers, der auf einem Knoten für Yarn bereitgestellt werden soll.  

Formel: (({Ram pro Knoten in GB} - 2 GB) * 1024) in MB 

yarn.scheduler.maximum-allocation-mb:  
Die maximale Größe des Speichers, die einem Container innerhalb des Yarn-Knotens bereitgestellt werden kann. Ein Yarn-Knoten kann mehrere Container ausführen. 

Formel: {yarn.nodemanager.resource.memory-mb} / 2 - 1GB 

 
yarn.nodemanager.resource.cpu-vcores:  
Die maximal verfügbare Anzahl an VCores innerhalb eines Yarn-Knoten. 

Formel: {Anzahl der VCores pro Knoten} 

Parameter für Spark-Submit 

num-Executors:  
Die Anzahl der Executors in der gesamten Session, welche über die Knoten verteilt werden. Hierfür berechnen wir zuerst die Anzahl der Executors die wir pro Knoten haben möchten und multiplizieren diese letztlich mit der Anzahl der uns verfügbaren Knoten. 

Formel (1): Anzahl der Executors pro Knoten = ({yarn.nodemanager.resource.cpu-vcores}) / 5) – 1 

Formel (2): {Anzahl der Executors pro Knoten} * {Anzahl der Knoten} 

executor-cores:  
Anzahl der VCores die pro Executor bereitgestellt werden.  

Formel: ({yarn.nodemanager.resource.cpu-vcores} – 5) / {Anzahl der Executors pro Knoten} 

executor-memory:
Größe des Speichers, die jeder Executor zur Verfügung gestellt bekommt. 

Formel: ({yarn.nodemanager.resource.memory-mb} - 1024) / ({Anzahl der Executors pro Knoten} + 1) 

driver-cores:
Anzahl der VCores die für den Treiber bereitgestellt werden. 

Formel: {driver-cores} = {executor-cores} 

driver-memory:
Größe des Speichers, die der Treiber zugeteilt bekommt. 

Formel: {driver-memory} = {executor-memory} 

Sonstige Erfahrungen 

Ein wichtiger Punkt bei der Nutzung von Spark in einer Clusterumgebung innerhalb des Spark-Submits ist das Versenden der verwendeten Bibliotheken an die Worker. Wir raten davon ab, grundsätzlich alle Projekt-Bibliotheken mitzusenden, um sich die Arbeit zu ersparen, die wenigen verwendeten aufzulisten. Hierbei können Konflikte zwischen einzelnen Bibliotheken entstehen und zu Fehlermeldungen führen.  

Bei der Verarbeitung von großen Daten-Paketen, kann es dazu kommen, dass durch die angewendeten Berechnungen der Overhead vollläuft und eine Fehlermeldung geworfen wird. Hierbei ist es möglich den Speicher der Optionen spark.executor.memoryOverhead und spark.driver.memoryOverhead zu erhöhen oder die Daten in den Executors kleiner zu halten.  

Verwendet man Spark in einer Clusterumgebung, empfiehlt es sich außerdem wie bereits erwähnt ein distributives Dateisystem (z.B. HDFS) zu nutzen, um die Datenpflege zwischen den Workern nicht manuell, durchführen zu müssen. 

Unser Fazit 

Apache Spark bietet für uns eine effiziente Technologie zur Berechnung von großen Datenmengen, welche ebenfalls im Streamingumfeld eingesetzt werden kann. Ein weiterer für uns sehr spannender Bereich ist der Einsatz SparkML auch im Hinblick auf zusätzliche Frameworks wie Spark Flow, welche uns Erlauben wertvolle Bibliotheken wie Tensorflow im Spark-Umfeld einzusetzen. Daher werden wir Spark auf jeden Fall weiterverfolgen und einsetzen.