# sparklyr 1.4: Weighted Sampling, Tidyr Verbs, Robust Scaler, RAPIDS, and more

**RStudio AI Blog**, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)

Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

In this blog post, we will showcase the following much-anticipated new functionalities from the `sparklyr`

1.4 release:

- Parallelized Weighted Sampling with Spark
- Support for Tidyr Verbs on Spark Dataframes
`ft_robust_scaler`

as the R interface for RobustScaler from Spark 3.0- Option for enabling
`RAPIDS`

GPU acceleration plugin in`spark_connect()`

- Higher-order functions and
`dplyr`

-related improvements

## Parallelized Weighted Sampling

Readers familiar with `dplyr::sample_n()`

and `dplyr::sample_frac()`

functions may have noticed that both of them support weighted-sampling use cases on R dataframes, e.g.,

dplyr::sample_n(mtcars, size = 3, weight = mpg, replace = FALSE)

mpg cyl disp hp drat wt qsec vs am gear carb Fiat 128 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1 Merc 280C 17.8 6 167.6 123 3.92 3.440 18.90 1 0 4 4 Mazda RX4 Wag 21.0 6 160.0 110 3.90 2.875 17.02 0 1 4 4

and

dplyr::sample_frac(mtcars, size = 0.1, weight = mpg, replace = FALSE)

mpg cyl disp hp drat wt qsec vs am gear carb Honda Civic 30.4 4 75.7 52 4.93 1.615 18.52 1 1 4 2 Merc 450SE 16.4 8 275.8 180 3.07 4.070 17.40 0 0 3 3 Fiat X1-9 27.3 4 79.0 66 4.08 1.935 18.90 1 1 4 1

will select some random subset of `mtcars`

using the `mpg`

attribute as the sampling weight for each row. If `replace = FALSE`

is set, then a row is removed from the sampling population once it gets selected, whereas when setting `replace = TRUE`

, each row will always stay in the sampling population and can be selected multiple times.

Now the exact same use cases are supported for Spark dataframes in `sparklyr`

1.4! For example:

library(sparklyr) sc <- spark_connect(master = "local") mtcars_sdf <- copy_to(sc, mtcars, repartition = 4L) dplyr::sample_n(mtcars_sdf, size = 5, weight = mpg, replace = FALSE)

will return a random subset of size 5 from the Spark dataframe `mtcars_sdf`

.

More importantly, the sampling algorithm implemented in `sparklyr`

1.4 is something that fits perfectly into the MapReduce paradigm: as we have split our `mtcars`

data into 4 partitions of `mtcars_sdf`

by specifying `repartition = 4L`

, the algorithm will first process each partition independently and in parallel, selecting a sample set of size up to 5 from each, and then reduce all 4 sample sets into a final sample set of size 5 by choosing records having the top 5 highest sampling priorities among all.

How is such parallelization possible, especially for the sampling without replacement scenario, where the desired result is defined as the outcome of a sequential process? A detailed answer to this question is in this blog post, which includes a definition of the problem (in particular, the exact meaning of sampling weights in term of probabilities), a high-level explanation of the current solution and the motivation behind it, and also, some mathematical details all hidden in one link to a PDF file, so that non-math-oriented readers can get the gist of everything else without getting scared away, while math-oriented readers can enjoy working out all the integrals themselves before peeking at the answer.

## Tidyr Verbs

The specialized implementations of the following `tidyr`

verbs that work efficiently with Spark dataframes were included as part of `sparklyr`

1.4:

`tidyr::fill`

`tidyr::nest`

`tidyr::unnest`

`tidyr::pivot_wider`

`tidyr::pivot_longer`

`tidyr::separate`

`tidyr::unite`

We can demonstrate how those verbs are useful for tidying data through some examples.

Let’s say we are given `mtcars_sdf`

, a Spark dataframe containing all rows from `mtcars`

plus the name of each row:

library(sparklyr) sc <- spark_connect(master = "local") mtcars_sdf <- cbind( data.frame(model = rownames(mtcars)), data.frame(mtcars, row.names = NULL) ) %>% copy_to(sc, ., repartition = 4L) print(mtcars_sdf, n = 5)

# Source: spark [?? x 12] model mpg cyl disp hp drat wt qsec vs am gear carb1 Mazda RX4 21 6 160 110 3.9 2.62 16.5 0 1 4 4 2 Mazda RX4 W… 21 6 160 110 3.9 2.88 17.0 0 1 4 4 3 Datsun 710 22.8 4 108 93 3.85 2.32 18.6 1 1 4 1 4 Hornet 4 Dr… 21.4 6 258 110 3.08 3.22 19.4 1 0 3 1 5 Hornet Spor… 18.7 8 360 175 3.15 3.44 17.0 0 0 3 2 # … with more rows

and we would like to turn all numeric attributes in `mtcar_sdf`

(in other words, all columns other than the `model`

column) into key-value pairs stored in 2 columns, with the `key`

column storing the name of each attribute, and the `value`

column storing each attribute’s numeric value. One way to accomplish that with `tidyr`

is by utilizing the `tidyr::pivot_longer`

functionality:

mtcars_kv_sdf <- mtcars_sdf %>% tidyr::pivot_longer(cols = -model, names_to = "key", values_to = "value") print(mtcars_kv_sdf, n = 5)

# Source: spark [?? x 3] model key value1 Mazda RX4 am 1 2 Mazda RX4 carb 4 3 Mazda RX4 cyl 6 4 Mazda RX4 disp 160 5 Mazda RX4 drat 3.9 # … with more rows

To undo the effect of `tidyr::pivot_longer`

, we can apply `tidyr::pivot_wider`

to our `mtcars_kv_sdf`

Spark dataframe, and get back the original data that was present in `mtcars_sdf`

:

tbl <- mtcars_kv_sdf %>% tidyr::pivot_wider(names_from = key, values_from = value) print(tbl, n = 5)

# Source: spark [?? x 12] model carb cyl drat hp mpg vs wt am disp gear qsec1 Mazda RX4 4 6 3.9 110 21 0 2.62 1 160 4 16.5 2 Hornet 4 Dr… 1 6 3.08 110 21.4 1 3.22 0 258 3 19.4 3 Hornet Spor… 2 8 3.15 175 18.7 0 3.44 0 360 3 17.0 4 Merc 280C 4 6 3.92 123 17.8 1 3.44 0 168. 4 18.9 5 Merc 450SLC 3 8 3.07 180 15.2 0 3.78 0 276. 3 18 # … with more rows

Another way to reduce many columns into fewer ones is by using `tidyr::nest`

to move some columns into nested tables. For instance, we can create a nested table `perf`

encapsulating all performance-related attributes from `mtcars`

(namely, `hp`

, `mpg`

, `disp`

, and `qsec`

). However, unlike R dataframes, Spark Dataframes do not have the concept of nested tables, and the closest to nested tables we can get is a `perf`

column containing named structs with `hp`

, `mpg`

, `disp`

, and `qsec`

attributes:

mtcars_nested_sdf <- mtcars_sdf %>% tidyr::nest(perf = c(hp, mpg, disp, qsec))

We can then inspect the type of `perf`

column in `mtcars_nested_sdf`

:

sdf_schema(mtcars_nested_sdf)$perf$type

[1] "ArrayType(StructType(StructField(hp,DoubleType,true), StructField(mpg,DoubleType,true), StructField(disp,DoubleType,true), StructField(qsec,DoubleType,true)),true)"

and inspect individual struct elements within `perf`

:

perf <- mtcars_nested_sdf %>% dplyr::pull(perf) unlist(perf[[1]])

hp mpg disp qsec 110.00 21.00 160.00 16.46

Finally, we can also use `tidyr::unnest`

to undo the effects of `tidyr::nest`

:

mtcars_unnested_sdf <- mtcars_nested_sdf %>% tidyr::unnest(col = perf) print(mtcars_unnested_sdf, n = 5)

# Source: spark [?? x 12] model cyl drat wt vs am gear carb hp mpg disp qsec1 Mazda RX4 6 3.9 2.62 0 1 4 4 110 21 160 16.5 2 Hornet 4 Dr… 6 3.08 3.22 1 0 3 1 110 21.4 258 19.4 3 Duster 360 8 3.21 3.57 0 0 3 4 245 14.3 360 15.8 4 Merc 280 6 3.92 3.44 1 0 4 4 123 19.2 168. 18.3 5 Lincoln Con… 8 3 5.42 0 0 3 4 215 10.4 460 17.8 # … with more rows

## Robust Scaler

RobustScaler is a new functionality introduced in Spark 3.0 (SPARK-28399). Thanks to a pull request by @zero323, an R interface for `RobustScaler`

, namely, the `ft_robust_scaler()`

function, is now part of `sparklyr`

.

It is often observed that many machine learning algorithms perform better on numeric inputs that are standardized. Many of us have learned in stats 101 that given a random variable \(X\), we can compute its mean \(\mu = E[X]\), standard deviation \(\sigma = \sqrt{E[X^2] - (E[X])^2}\), and then obtain a standard score \(z = \frac{X - \mu}{\sigma}\) which has mean of 0 and standard deviation of 1.

However, notice both \(E[X]\) and \(E[X^2]\) from above are quantities that can be easily skewed by extreme outliers in \(X\), causing distortions in \(z\). A particular bad case of it would be if all non-outliers among \(X\) are very close to \(0\), hence making \(E[X]\) close to \(0\), while extreme outliers are all far in the negative direction, hence dragging down \(E[X]\) while skewing \(E[X^2]\) upwards.

An alternative way of standardizing \(X\) based on its median, 1st quartile, and 3rd quartile values, all of which are robust against outliers, would be the following:

\(\displaystyle z = \frac{X - \text{Median}(X)}{\text{P75}(X) - \text{P25}(X)}\)

and this is precisely what RobustScaler offers.

To see `ft_robust_scaler()`

in action and demonstrate its usefulness, we can go through a contrived example consisting of the following steps:

- Draw 500 random samples from the standard normal distribution

sample_values <- rnorm(500) print(sample_values)

[1] -0.626453811 0.183643324 -0.835628612 1.595280802 0.329507772 [6] -0.820468384 0.487429052 0.738324705 0.575781352 -0.305388387 ...

- Inspect the minimal and maximal values among the \(500\) random samples:

print(min(sample_values))

[1] -3.008049

print(max(sample_values))

[1] 3.810277

- Now create \(10\) other values that are extreme outliers compared to the \(500\) random samples above. Given that we know all \(500\) samples are within the range of \((-4, 4)\), we can choose \(-501, -502, \ldots, -509, -510\) as our \(10\) outliers:

outliers <- -500L - seq(10)

- Copy all \(510\) values into a Spark dataframe named
`sdf`

library(sparklyr) sc <- spark_connect(master = "local", version = "3.0.0") sdf <- copy_to(sc, data.frame(value = c(sample_values, outliers)))

- We can then apply
`ft_robust_scaler()`

to obtain the standardized value for each input:

scaled <- sdf %>% ft_vector_assembler("value", "input") %>% ft_robust_scaler("input", "scaled") %>% dplyr::pull(scaled) %>% unlist()

- Plotting the result shows the non-outlier data points being scaled to values that still more or less form a bell-shaped distribution centered around \(0\), as expected, so the scaling is robust against influence of the outliers:

library(ggplot2) ggplot(data.frame(scaled = scaled), aes(x = scaled)) + xlim(-7, 7) + geom_histogram(binwidth = 0.2)

- Finally, we can compare the distribution of the scaled values above with the distribution of z-scores of all input values, and notice how scaling the input with only mean and standard deviation would have caused noticeable skewness – which the robust scaler has successfully avoided:

all_values <- c(sample_values, outliers) z_scores <- (all_values - mean(all_values)) / sd(all_values) ggplot(data.frame(scaled = z_scores), aes(x = scaled)) + xlim(-0.05, 0.2) + geom_histogram(binwidth = 0.005)

- From the 2 plots above, one can observe while both standardization processes produced some distributions that were still bell-shaped, the one produced by
`ft_robust_scaler()`

is centered around \(0\), correctly indicating the average among all non-outlier values, while the z-score distribution is clearly not centered around \(0\) as its center has been noticeably shifted by the \(10\) outlier values.

## RAPIDS

Readers following Apache Spark releases closely probably have noticed the recent addition of RAPIDS GPU acceleration support in Spark 3.0. Catching up with this recent development, an option to enable RAPIDS in Spark connections was also created in `sparklyr`

and shipped in `sparklyr`

1.4. On a host with RAPIDS-capable hardware (e.g., an Amazon EC2 instance of type ‘p3.2xlarge’), one can install `sparklyr`

1.4 and observe RAPIDS hardware acceleration being reflected in Spark SQL physical query plans:

library(sparklyr) sc <- spark_connect(master = "local", version = "3.0.0", packages = "rapids") dplyr::db_explain(sc, "SELECT 4")

== Physical Plan == *(2) GpuColumnarToRow false +- GpuProject [4 AS 4#45] +- GpuRowToColumnar TargetSize(2147483647) +- *(1) Scan OneRowRelation[]

## Higher-Order Functions and `dplyr`

-Related Improvements

All newly introduced higher-order functions from Spark 3.0, such as `array_sort()`

with custom comparator, `transform_keys()`

, `transform_values()`

, and `map_zip_with()`

, are supported by `sparklyr`

1.4.

In addition, all higher-order functions can now be accessed directly through `dplyr`

rather than their `hof_*`

counterparts in `sparklyr`

. This means, for example, that we can run the following `dplyr`

queries to calculate the square of all array elements in column `x`

of `sdf`

, and then sort them in descending order:

library(sparklyr) sc <- spark_connect(master = "local", version = "3.0.0") sdf <- copy_to(sc, tibble::tibble(x = list(c(-3, -2, 1, 5), c(6, -7, 5, 8)))) sq_desc <- sdf %>% dplyr::mutate(x = transform(x, ~ .x * .x)) %>% dplyr::mutate(x = array_sort(x, ~ as.integer(sign(.y - .x)))) %>% dplyr::pull(x) print(sq_desc)

[[1]] [1] 25 9 4 1 [[2]] [1] 64 49 36 25

## Acknowledgement

In chronological order, we would like to thank the following individuals for their contributions to `sparklyr`

1.4:

We also appreciate bug reports, feature requests, and valuable other feedback about `sparklyr`

from our awesome open-source community (e.g., the weighted sampling feature in `sparklyr`

1.4 was largely motivated by this Github issue filed by @ajing, and some `dplyr`

-related bug fixes in this release were initiated in #2648 and completed with this pull request by @wkdavis).

Last but not least, the author of this blog post is extremely grateful for fantastic editorial suggestions from @javierluraschi, @batpigandme, and @skeydan.

If you wish to learn more about `sparklyr`

, we recommend checking out sparklyr.ai, spark.rstudio.com, and also some of the previous release posts such as sparklyr 1.3 and sparklyr 1.2.

Thanks for reading!

**leave a comment**for the author, please follow the link and comment on their blog:

**RStudio AI Blog**.

R-bloggers.com offers

**daily e-mail updates**about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.

Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.