lundi, décembre 4, 2023

meilleure interface dplyr, plus de fonctions sdf_* et routines de sérialisation basées sur RDS



Nous sommes ravis d’annoncer sparklyr 1.5 est maintenant disponible sur CRAN!

À installer sparklyr 1.5 de CRAN, exécutez

Dans cet article de blog, nous mettrons en évidence les aspects suivants de sparklyr 1.5 :

Meilleure interface dplyr

Une grande partie des demandes d’extraction introduites dans le sparklyr La version 1.5 visait à faire fonctionner les dataframes Spark avec divers dplyr verbes de la même manière que les dataframes R. La liste complète des dplyr-bogues liés et demandes de fonctionnalités qui ont été résolus dans
sparklyr 1.5 se trouve dans ici.

Dans cette section, nous présenterons trois nouvelles fonctionnalités dplyr livrées avec sparklyr 1.5.

Échantillonnage stratifié

L’échantillonnage stratifié sur une trame de données R peut être réalisé avec une combinaison de dplyr::group_by() suivi de
dplyr::sample_n() ou dplyr::sample_frac()où les variables de regroupement spécifiées dans le dplyr::group_by()
Les étapes sont celles qui définissent chaque strate. Par exemple, la requête suivante regroupera mtcars par nombre de cylindres et renvoyer un échantillon aléatoire pondéré de taille deux de chaque groupe, sans remplacement, et pondéré par le mpg colonne:

## # A tibble: 6 x 11
## # Groups:   cyl [3]
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
## 1  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
## 2  22.8     4 108      93  3.85  2.32  18.6     1     1     4     1
## 3  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
## 4  21       6 160     110  3.9   2.62  16.5     0     1     4     4
## 5  15.5     8 318     150  2.76  3.52  16.9     0     0     3     2
## 6  19.2     8 400     175  3.08  3.84  17.0     0     0     3     2

A partir de sparklyr 1.5, la même chose peut également être faite pour les dataframes Spark avec Spark 3.0 ou supérieur, par exemple :

library(sparklyr)

sc <- spark_connect(master = "local", version = "3.0.0")
mtcars_sdf <- copy_to(sc, mtcars, replace = TRUE, repartition = 3)

mtcars_sdf %>%
  dplyr::group_by(cyl) %>%
  dplyr::sample_n(size = 2, weight = mpg, replace = FALSE) %>%
  print()
# Source: spark<?> [?? x 11]
# Groups: cyl
    mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
  <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
1  21       6 160     110  3.9   2.62  16.5     0     1     4     4
2  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
3  27.3     4  79      66  4.08  1.94  18.9     1     1     4     1
4  32.4     4  78.7    66  4.08  2.2   19.5     1     1     4     1
5  16.4     8 276.    180  3.07  4.07  17.4     0     0     3     3
6  18.7     8 360     175  3.15  3.44  17.0     0     0     3     2

ou

## # Source: spark<?> [?? x 11]
## # Groups: cyl
##     mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb
##   <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl>
## 1  21       6 160     110  3.9   2.62  16.5     0     1     4     4
## 2  21.4     6 258     110  3.08  3.22  19.4     1     0     3     1
## 3  22.8     4 141.     95  3.92  3.15  22.9     1     0     4     2
## 4  33.9     4  71.1    65  4.22  1.84  19.9     1     1     4     1
## 5  30.4     4  95.1   113  3.77  1.51  16.9     1     1     5     2
## 6  15.5     8 318     150  2.76  3.52  16.9     0     0     3     2
## 7  18.7     8 360     175  3.15  3.44  17.0     0     0     3     2
## 8  16.4     8 276.    180  3.07  4.07  17.4     0     0     3     3

Sommes de lignes

Le rowSums() fonctionnalité offerte par dplyr est pratique lorsqu’il faut résumer un grand nombre de colonnes dans une trame de données R qui ne sont pas pratiques à énumérer individuellement. Par exemple, nous avons ici une base de données à six colonnes de nombres réels aléatoires, où le
partial_sum la colonne dans le résultat contient la somme des colonnes b à travers d dans chaque ligne :

## # A tibble: 5 x 7
##         a     b     c      d     e      f partial_sum
##     <dbl> <dbl> <dbl>  <dbl> <dbl>  <dbl>       <dbl>
## 1 0.781   0.801 0.157 0.0293 0.169 0.0978        1.16
## 2 0.696   0.412 0.221 0.941  0.697 0.675         2.27
## 3 0.802   0.410 0.516 0.923  0.190 0.904         2.04
## 4 0.200   0.590 0.755 0.494  0.273 0.807         2.11
## 5 0.00149 0.711 0.286 0.297  0.107 0.425         1.40

Commençant par sparklyr 1.5, la même opération peut être effectuée avec les dataframes Spark :

## # Source: spark<?> [?? x 7]
##         a     b     c      d     e      f partial_sum
##     <dbl> <dbl> <dbl>  <dbl> <dbl>  <dbl>       <dbl>
## 1 0.781   0.801 0.157 0.0293 0.169 0.0978        1.16
## 2 0.696   0.412 0.221 0.941  0.697 0.675         2.27
## 3 0.802   0.410 0.516 0.923  0.190 0.904         2.04
## 4 0.200   0.590 0.755 0.494  0.273 0.807         2.11
## 5 0.00149 0.711 0.286 0.297  0.107 0.425         1.40

En prime de la mise en œuvre du rowSums fonctionnalité pour les dataframes Spark,
sparklyr 1.5 offre désormais également une prise en charge limitée de l’opérateur de sous-ensemble de colonnes sur les dataframes Spark. Par exemple, tous les extraits de code ci-dessous renverront un sous-ensemble de colonnes du dataframe nommé sdf:

# select columns `b` through `e`
sdf[2:5]
# select columns `b` and `c`
sdf[c("b", "c")]
# drop the first and third columns and return the rest
sdf[c(-1, -3)]

Récapitulateur de moyenne pondérée

Semblable aux deux dplyr fonctions mentionnées ci-dessus, le weighted.mean() Le résumé est une autre fonction utile qui fait désormais partie du dplyr interface pour les dataframes Spark dans sparklyr 1.5. On peut le voir en action, par exemple, en comparant les résultats de ce qui suit

avec sortie de l’opération équivalente sur mtcars dans R :

les deux devraient évaluer ce qui suit :

##     cyl mpg_wm
##   <dbl>  <dbl>
## 1     4   25.9
## 2     6   19.6
## 3     8   14.8

De nouveaux ajouts au sdf_* famille de fonctions

sparklyr fournit un grand nombre de fonctions pratiques pour travailler avec les dataframes Spark, et toutes ont des noms commençant par sdf_ préfixe.

Dans cette section, nous mentionnerons brièvement quatre nouveaux ajouts et montrerons quelques exemples de scénarios dans lesquels ces fonctions sont utiles.

sdf_expand_grid()

Comme le nom le suggère, sdf_expand_grid() est simplement l’équivalent Spark de expand.grid(). Plutôt que de courir expand.grid() dans R et en important la trame de données R résultante dans Spark, on peut maintenant exécuter sdf_expand_grid(), qui accepte à la fois les vecteurs R et les trames de données Spark et prend en charge les astuces pour les jointures de hachage de diffusion. L’exemple ci-dessous montre sdf_expand_grid() création d’une grille de 100 x 100 x 10 x 10 dans Spark sur 1 000 partitions Spark, avec des conseils de jointure de hachage diffusés sur les variables avec de petites cardinalités :

library(sparklyr)

sc <- spark_connect(master = "local")

grid_sdf <- sdf_expand_grid(
  sc,
  var1 = seq(100),
  var2 = seq(100),
  var3 = seq(10),
  var4 = seq(10),
  broadcast_vars = c(var3, var4),
  repartition = 1000
)

grid_sdf %>% sdf_nrow() %>% print()
## [1] 1e+06

sdf_partition_sizes()

Comme sparklyr utilisateur @sbottelli suggéré iciune chose qui serait formidable d’avoir sparklyr est un moyen efficace d’interroger les tailles de partition d’une trame de données Spark. Dans sparklyr 1.5, sdf_partition_sizes() fait exactement ça :

library(sparklyr)

sc <- spark_connect(master = "local")

sdf_len(sc, 1000, repartition = 5) %>%
  sdf_partition_sizes() %>%
  print(row.names = FALSE)
##  partition_index partition_size
##                0            200
##                1            200
##                2            200
##                3            200
##                4            200

sdf_unnest_longer() et sdf_unnest_wider()

sdf_unnest_longer() et sdf_unnest_wider() sont les équivalents de
tidyr::unnest_longer() et tidyr::unnest_wider() pour les dataframes Spark.
sdf_unnest_longer() développe tous les éléments d’une colonne de structure en plusieurs lignes, et
sdf_unnest_wider() les développe en plusieurs colonnes. Comme illustré avec un exemple de trame de données ci-dessous,

library(sparklyr)

sc <- spark_connect(master = "local")
sdf <- copy_to(
  sc,
  tibble::tibble(
    id = seq(3),
    attribute = list(
      list(name = "Alice", grade = "A"),
      list(name = "Bob", grade = "B"),
      list(name = "Carol", grade = "C")
    )
  )
)
sdf %>%
  sdf_unnest_longer(col = record, indices_to = "key", values_to = "value") %>%
  print()

évalue à

## # Source: spark<?> [?? x 3]
##      id value key
##   <int> <chr> <chr>
## 1     1 A     grade
## 2     1 Alice name
## 3     2 B     grade
## 4     2 Bob   name
## 5     3 C     grade
## 6     3 Carol name

alors que

sdf %>%
  sdf_unnest_wider(col = record) %>%
  print()

évalue à

## # Source: spark<?> [?? x 3]
##      id grade name
##   <int> <chr> <chr>
## 1     1 A     Alice
## 2     2 B     Bob
## 3     3 C     Carol

Routines de sérialisation basées sur RDS

Certains lecteurs doivent se demander pourquoi un tout nouveau format de sérialisation devrait être implémenté dans sparklyr du tout. Pour faire court, la raison en est que la sérialisation RDS est un meilleur remplacement que son prédécesseur CSV. Il possède tous les attributs souhaitables du format CSV, tout en évitant un certain nombre d’inconvénients courants parmi les formats de données textuels.

Dans cette section, nous expliquerons brièvement pourquoi sparklyr doit prendre en charge au moins un format de sérialisation autre que arrowapprofondissez les problèmes liés à la sérialisation basée sur CSV, puis montrez comment la nouvelle sérialisation basée sur RDS est exempte de ces problèmes.

Pourquoi arrow ce n’est pas pour tout le monde ?

Pour transférer les données entre Spark et R correctement et efficacement, sparklyr doit s’appuyer sur un format de sérialisation de données bien pris en charge à la fois par Spark et R. Malheureusement, peu de formats de sérialisation satisfont à cette exigence, et parmi ceux qui le font figurent les formats textuels tels que CSV et JSON, et les formats binaires tels que Apache Arrow, Protobuf et, depuis peu, un petit sous-ensemble de RDS version 2. La situation est encore compliquée par la considération supplémentaire selon laquelle
sparklyr doit prendre en charge au moins un format de sérialisation dont la mise en œuvre peut être entièrement autonome au sein du sparklyr base de code, c’est-à-dire qu’une telle sérialisation ne doit dépendre d’aucun package R externe ou d’une bibliothèque système, afin qu’elle puisse accueillir les utilisateurs qui souhaitent utiliser sparklyr mais qui ne disposent pas nécessairement de la chaîne d’outils du compilateur C++ requise et d’autres dépendances système pour configurer les packages R tels que arrow ou
protolite. Avant sparklyr 1.5, la sérialisation basée sur CSV était l’alternative par défaut vers laquelle les utilisateurs ne disposaient pas de l’option de secours. arrow package installé ou lorsque le type de données transportées de R vers Spark n’est pas pris en charge par la version de arrow disponible.

Pourquoi le format CSV n’est-il pas idéal ?

Il y a au moins trois raisons de croire que le format CSV n’est pas le meilleur choix lorsqu’il s’agit d’exporter des données de R vers Spark.

L’une des raisons est l’efficacité. Par exemple, un nombre à virgule flottante double précision tel que .Machine$double.eps doit être exprimé comme "2.22044604925031e-16" au format CSV afin de ne subir aucune perte de précision, occupant ainsi 20 octets au lieu de 8 octets.

Mais les problèmes d’exactitude sont plus importants que l’efficacité. Dans une dataframe R, on peut stocker à la fois NA_real_ et
NaN dans une colonne de nombres à virgule flottante. NA_real_ devrait idéalement se traduire par null dans une trame de données Spark, alors que
NaN devrait continuer à être NaN lorsqu’il est transporté de R à Spark. Malheureusement, NA_real_ dans R devient impossible à distinguer de NaN une fois sérialisé au format CSV, comme le montre la démo rapide ci-dessous :

##     x is_nan
## 1  NA  FALSE
## 2 NaN   TRUE
csv_file <- "/tmp/data.csv"
write.csv(original_df, file = csv_file, row.names = FALSE)
deserialized_df <- read.csv(csv_file)
deserialized_df %>% dplyr::mutate(is_nan = is.nan(x)) %>% print()
##    x is_nan
## 1 NA  FALSE
## 2 NA  FALSE

Un autre problème d’exactitude très similaire à celui ci-dessus était le fait que
"NA" et NA dans une colonne de chaîne d’une trame de données R devient impossible à distinguer une fois sérialisé au format CSV, comme indiqué à juste titre dans
ce problème Github
par @caewok et d’autres.

RDS à la rescousse !

Le format RDS est l’un des formats binaires les plus utilisés pour sérialiser des objets R. Il est décrit de manière assez détaillée au chapitre 1, section 8 de
ce document. Parmi les avantages du format RDS figurent l’efficacité et la précision : il a une implémentation raisonnablement efficace en base R et prend en charge tous les types de données R.

Il convient également de noter le fait que lorsqu’une trame de données R contenant uniquement des types de données avec des équivalents raisonnables dans Apache Spark (par exemple, RAWSXP, LGLSXP, CHARSXP, REALSXPetc.) est enregistré à l’aide de RDS version 2 (par exemple, serialize(mtcars, connection = NULL, version = 2L, xdr = TRUE)), seul un petit sous-ensemble du format RDS sera impliqué dans le processus de sérialisation, et implémenter des routines de désérialisation dans Scala capables de décoder un sous-ensemble aussi restreint de constructions RDS est en fait une tâche raisonnablement simple et directe (comme le montre
ici
).

Enfin et surtout, le RDS étant un format binaire, il permet NA_character_, "NA",
NA_real_et NaN que tous soient codés de manière sans ambiguïté, permettant ainsi sparklyr
1.5 pour éviter tous les problèmes d’exactitude détaillés ci-dessus en non-arrow cas d’utilisation de la sérialisation.

Autres avantages de la sérialisation RDS

En plus des garanties d’exactitude, le format RDS offre également de nombreux autres avantages.

L’un des avantages est bien sûr la performance : par exemple, importer un ensemble de données de taille non triviale tel que nycflights13::flights de R à Spark en utilisant le format RDS dans sparklyr 1.5 est environ 40 à 50 % plus rapide que la sérialisation basée sur CSV dans sparklyr 1.4. La mise en œuvre actuelle basée sur RDS est encore loin d’être aussi rapide que arrow-la sérialisation basée sur (arrow est environ 3 à 4 fois plus rapide), donc pour les tâches sensibles aux performances impliquant une sérialisation importante, arrow devrait toujours être le premier choix.

Un autre avantage est qu’avec la sérialisation RDS, sparklyr peut importer des dataframes R contenant
raw colonnes directement dans des colonnes binaires dans Spark. Ainsi, des cas d’utilisation tels que celui ci-dessous fonctionneront dans sparklyr 1,5

Alors que la plupart sparklyr les utilisateurs ne trouveront probablement pas cette fonctionnalité d’importation de colonnes binaires dans Spark immédiatement utile dans leur travail habituel. sparklyr::copy_to() ou sparklyr::collect()
utilisations, il joue un rôle crucial dans la réduction des frais de sérialisation dans le système basé sur Spark.
foreach backend parallèle qui a été introduit pour la première fois dans sparklyr 1.2. En effet, les travailleurs Spark peuvent récupérer directement les fermetures R sérialisées à calculer à partir d’une colonne Spark binaire au lieu d’extraire ces octets sérialisés à partir de représentations intermédiaires telles que des chaînes codées en base64. De même, les résultats R issus de l’exécution des fermetures de travailleurs seront directement disponibles au format RDS qui peut être efficacement désérialisé dans R, plutôt que d’être livrés dans d’autres formats moins efficaces.

Reconnaissance

Par ordre chronologique, nous tenons à remercier les contributeurs suivants pour avoir intégré leurs demandes d’extraction dans sparklyr 1.5 :

Nous souhaitons également exprimer notre gratitude envers les nombreux rapports de bogues et demandes de fonctionnalités pour
sparklyr d’une fantastique communauté open source.

Enfin, l’auteur de ce billet de blog est redevable à
@javierluraschi,
@batpigandmeet @skeydan pour leurs précieuses contributions éditoriales.

Si vous souhaitez en savoir plus sur sparklyrvérifier sparklyr.ai,
spark.rstudio.comet certains des articles de la version précédente tels que
Sparklyr 1.4 et
Sparklyr 1.3.

Merci d’avoir lu!

Related Articles

LAISSER UN COMMENTAIRE

S'il vous plaît entrez votre commentaire!
S'il vous plaît entrez votre nom ici

Latest Articles