vendredi, décembre 1, 2023

UDF Python optimisées pour les flèches dans Apache Spark™ 3.5


Dans Apache Spark™, ​​les fonctions définies par l’utilisateur (UDF) Python font partie des fonctionnalités les plus populaires. Ils permettent aux utilisateurs de créer un code personnalisé adapté à leurs besoins uniques en matière de traitement des données. Cependant, les UDF Python actuelles, qui s’appuient sur cloudpickle pour la sérialisation et la désérialisation, rencontrent des goulots d’étranglement en termes de performances, en particulier lorsqu’il s’agit de grandes entrées et sorties de données.

Dans Apache Spark 3.5 et Runtime Databricks 14.0, nous introduisons des UDF Python optimisées pour Arrow pour améliorer considérablement les performances. Au cœur de cette optimisation se trouve Flèche Apache, une représentation de données en mémoire en colonnes multilingues standardisée. En exploitant Arrow, ces UDF contournent les méthodes traditionnelles et plus lentes de (dé)sérialisation des données, conduisant à un échange de données rapide entre les processus JVM et Python. Grâce au système de types riche d’Apache Arrow, ces UDF optimisées offrent un moyen plus cohérent et standardisé de gérer la coercition de type.

L’optimisation des flèches pour les UDF Python est facultative et les utilisateurs peuvent contrôler s’ils souhaitent ou non activer l’optimisation des flèches pour les UDF individuelles à l’aide de l’option "useArrow" paramètre booléen de "functions.udf". Un exemple est le suivant :

>>> @udf(returnType='int', useArrow=True)  # An Arrow Python UDF
... def arrow_slen(s):
...   return len(s)
... 

De plus, les utilisateurs peuvent activer l’optimisation Arrow pour tous les UDF d’une SparkSession entière via une configuration Spark : "spark.sql.execution.pythonUDF.arrow.enabled"comme indiqué ci-dessous:

>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> 
>>> @udf(returnType='int')  # An Arrow Python UDF
... def arrow_slen(s):
...   return len(s)
...

(Dé)sérialisation plus rapide

Apache Arrow est un format de données en mémoire en colonnes qui permet un échange de données efficace entre différents systèmes et langages de programmation. Contrairement à Pickle, qui sérialise une ligne entière en tant qu’objet, Arrow stocke les données dans un format orienté colonne, permettant une meilleure compression et une meilleure localisation de la mémoire, ce qui convient mieux aux charges de travail analytiques.

Le graphique ci-dessous montre les performances d’un UDF Python optimisé par Arrow effectuant une seule transformation avec un ensemble de données d’entrée de différentes tailles. Le cluster se compose de 3 nœuds de calcul et d’un pilote, et chaque machine du cluster dispose de 16 processeurs virtuels et de 122 Gio de mémoire. L’UDF Python optimisé pour Arrow est ~1,6 fois plus rapide que le Python UDF mariné.

UDF Python optimisé pour les flèches

L’UDF Python optimisé par flèche présente un avantage significatif dans le chaînage des UDF. Comme indiqué ci-dessous, dans le même cluster, un UDF Python optimisé par Arrow peut exécuter ~1,9 fois plus rapide qu’un UDF Python mariné sur un ensemble de données de 32 Go.

UDF Python optimisé pour les flèches

Voir ici pour un benchmark complet et des résultats.

Coercition de type standardisée

La coercition de type UDF pose des problèmes lorsque les valeurs Python renvoyées par l’UDF ne correspondent pas au type de retour spécifié par l’utilisateur. Malheureusement, la coercition de type par défaut de Python UDF a certaines limitations, telles que le recours à None comme solution de secours en cas d’incompatibilités de type, conduisant à une ambiguïté potentielle et à une perte de données. De plus, la conversion de date, datetime et tuples en chaînes peut donner des résultats ambigus. Les UDF Python optimisées par Arrow résolvent ces problèmes en tirant parti de l’ensemble de règles bien définies d’Arrow pour la coercition de type.

Comme indiqué ci-dessous, un UDF Python optimisé pour Arrow(useArrow=True) force avec succès les entiers stockés sous forme de chaîne à « int » comme spécifié, mais un UDF Python mariné (useArrow=False) revient à « NULL ».

>>> df = spark.createDataFrame(['1', '2'], schema='string')
>>> df.select(udf(lambda x: x, 'int', useArrow=True)('value').alias('str_to_int')).show()
+----------+                                                                    
|str_to_int|
+----------+
|         1|
|         2|
+----------+
>>> df.select(udf(lambda x: x, 'int', useArrow=False)('value').alias('str_to_int')).show()
+----------+
|str_to_int|
+----------+
|      NULL|
|      NULL|
+----------+

Un autre exemple est présenté ci-dessous, où un UDF Python optimisé pour Arrow (useArrow=True) a contraint correctement une date à une chaîne alors qu’un UDF Python mariné (useArrow=False) renvoie des résultats ambigus en exposant les objets Java sous-jacents.

>>> df = spark.createDataFrame([datetime.date(1970, 1, 1), datetime.date(1970, 1, 2)], schema='date')
>>> df.select(udf(lambda x: x, 'string', useArrow=True)('value').alias('date_in_string')).show()
+--------------+
|date_in_string|
+--------------+
|    1970-01-01|
|    1970-01-02|
+--------------+
>>> df.select(udf(lambda x: x, 'string', useArrow=False)('value').alias('date_in_string')).show()
+-----------------------------------------------------------------------+
|date_in_string                                                         |
+-----------------------------------------------------------------------+
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet..|
|java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet..|
+-----------------------------------------------------------------------+

Par rapport à Pickle, la coercition de type Arrow vise à conserver autant d’informations et de précision que possible pendant le processus de conversion.

Voir ici pour une comparaison complète entre les UDF Python Pickled et les UDF Python optimisées par Arrow concernant la coercition de type.

Conclusion

Les UDF Python optimisés par Arrow utilisent Apache Arrow pour la (dé)sérialisation des entrées et sorties UDF, ce qui entraîne une (dé)sérialisation beaucoup plus rapide par rapport à l’UDF Python mariné par défaut. De plus, il standardise les règles de coercition de type conformément aux spécifications Apache Arrow. Les UDF Python optimisées pour les flèches sont disponibles à partir de Spark 3.5 ; voir ÉTINCELLE-40307 pour plus d’informations.

Related Articles

LAISSER UN COMMENTAIRE

S'il vous plaît entrez votre commentaire!
S'il vous plaît entrez votre nom ici

Latest Articles