ÑAAS

Por motivos que no vienen al caso un compañero del curro necesita hacer en spark una regresión lineal para los datos de cada cliente y extraer el coeficiente de una variable. Así que vamos a hacer algo que denomino ÑASS (Ñapas As A Service)

Cómo lo haríamos en R

library(tidyverse)
## ── Attaching packages ────────────────────── tidyverse 1.2.1 ──
## ✔ ggplot2 3.1.0     ✔ purrr   0.3.0
## ✔ tibble  2.0.1     ✔ dplyr   0.7.8
## ✔ tidyr   0.8.2     ✔ stringr 1.3.1
## ✔ readr   1.3.1     ✔ forcats 0.3.0
## ── Conflicts ───────────────────────── tidyverse_conflicts() ──
## ✖ dplyr::filter() masks stats::filter()
## ✖ dplyr::lag()    masks stats::lag()
# simulto datos
x <- rnorm(100, 2, 10)
y <- 2 + 3 * x + rnorm(length(x), 1, 3)
df <- data.frame(id = rep(1:100, each = 3),
                 y = y ,
                 x = x)

Hacemos un group_by y un do, ya sé que en R haríamos un lm(y ~ factor(id) * x, data =df ) y punto, pero es por algo lo de hacerlo de otra forma.

coeficientes <- df %>%
  group_by(id) %>%
  do(coef_x = coef(lm(y ~ x, .))[2]) %>%
  unnest(coef_x)
coeficientes
## # A tibble: 100 x 2
##       id coef_x
##    <int>  <dbl>
##  1     1   3.73
##  2     2   2.00
##  3     3   3.08
##  4     4   2.56
##  5     5   2.92
##  6     6   2.75
##  7     7   3.26
##  8     8   2.74
##  9     9   2.97
## 10    10   3.11
## # … with 90 more rows

En Spark

Iniciamos sparklyr y creamos el spark dataframe a partir de df

library(sparklyr)
## 
## Attaching package: 'sparklyr'
## The following object is masked from 'package:purrr':
## 
##     invoke
sc <- spark_connect(master = "local")
df_tbl <- sc %>% sdf_copy_to(df, name = "df_spark", overwrite = TRUE)
df_tbl
## # Source: spark<df_spark> [?? x 3]
##       id      y        x
##    <int>  <dbl>    <dbl>
##  1     1   8.95   2.35  
##  2     1  12.5    2.14  
##  3     1  21.4    5.03  
##  4     2  33.3   11.1   
##  5     2  40.8   11.1   
##  6     2  27.3    6.28  
##  7     3  -6.29  -2.95  
##  8     3   1.98   0.0571
##  9     3 -15.8   -5.70  
## 10     4 -11.3   -6.87  
## # … with more rows

Y usamos ahora casi la misma sintaxis que antes, pero en vez de usar lm usamos ml_linear_regression de MLlib.

coeficientes_spark <-  df_tbl %>% group_by(id) %>% 
  do(coeficientes = coef(ml_linear_regression(., y ~ x))[[2]])  %>% 
  unnest(coeficientes)

coeficientes_spark
## # A tibble: 100 x 2
##       id coeficientes
##    <int>        <dbl>
##  1    12         3.04
##  2    13         3.53
##  3    14         2.90
##  4    18         2.39
##  5    25         2.84
##  6    37         2.64
##  7    38         2.96
##  8    46         3.47
##  9    50         2.98
## 10    52         2.85
## # … with 90 more rows

Y lo llamo “ÑAAS” porque aunque es verdad que me hace los modelos en spark para cada id el resultado de aplicar coef al modelo me devuelve el coeficiente pero en estructura de datos de R, no en spark y lo que queremos es que los coeficientes estén en otro sparkdataframes. Así que si alguien sabe como hacer esto en scala usando UDF’s o UDAF’s que lo comente por aquí.

 
comments powered by Disqus