Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

## Introduction

This model is an approximate version of knn model which is difficult to be implemented with large data set. In contrast to knn model that looks for the exact number of nearest neighbours, this model looks for neighbours with high probabilities. Spark provides two methods to find out the approximate neighbours that depend on the data type at hand, Bucketed random projection and Minhash for jaccard distance.

The first method projects the data in lower dimension hash in which similar hashes indicate that the associated points (or observations) are close to each other. The mathematical basis of this technique is the following formula.

$h^{x,b}(\vec\upsilon)=\lfloor \frac{\vec\upsilon.\vec x}{w}\rfloor$

Where $$h$$ is the hashing function, $$\vec\upsilon$$ is the feature vector, $$x$$ is standard normal vector that has the same length, and $$w$$ is the bin width of the hashing bins, and the symbol $$\lfloor \rfloor$$ to coerce the result to be integer value. The idea is simple, we take the dot product of each feature vector with noisy vector, then the resulted projections (which are random) will be grouped into buckets, these buckets are supposed to include similar points. This process can be repeated many times with different noisy vector at each time to fine the similarity. For more detail about this technique click here

## Data Preparation

For those who do not know much about sparklyr check my article introduction to sparklyr

First let’s call sparklyr and tidyverse packages, then we set the connection to spark and call the titanic data.

library(sparklyr, warn.conflicts = FALSE)
library(tidyverse, warn.conflicts = FALSE)
sc<-spark_connect(master="local")
glimpse(mydata)
Observations: ??
Variables: 12
Database: spark_connection
$PassengerId <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, ...$ Survived    <int> 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0...
$Pclass <int> 3, 1, 3, 1, 3, 3, 1, 3, 3, 2, 3, 1, 3, 3, 3, 2, 3, 2, 3...$ Name        <chr> "Braund, Mr. Owen Harris", "Cumings, Mrs. John Bradley ...
$Sex <chr> "male", "female", "female", "female", "male", "male", "...$ Age         <dbl> 22, 38, 26, 35, 35, NaN, 54, 2, 27, 14, 4, 58, 20, 39, ...
$SibSp <int> 1, 1, 0, 1, 0, 0, 0, 3, 0, 1, 1, 0, 0, 1, 0, 0, 4, 0, 1...$ Parch       <int> 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 1, 0, 0, 5, 0, 0, 1, 0, 0...
$Ticket <chr> "A/5 21171", "PC 17599", "STON/O2. 3101282", "113803", ...$ Fare        <dbl> 7.2500, 71.2833, 7.9250, 53.1000, 8.0500, 8.4583, 51.86...
$Cabin <chr> NA, "C85", NA, "C123", NA, NA, "E46", NA, NA, NA, "G6",...$ Embarked    <chr> "S", "C", "S", "S", "S", "Q", "S", "S", "S", "C", "S", ...

If wou notice this data is not large, but we intentially choose this data due to its familiarity and simplicity which make understanding the implementation of this model super easy. In other words, when we want to implement this model with very large data sets we repeat the same general basic steps.

Then we remove some varaibles that we think they are not much relevant for out puptose except for the PassengerId variable because we need it later (but we give it a shorter name).

newdata<- mydata%>%
select(c(1,2,3,5,6,7,8,10,12))%>%
rename(id=PassengerId) %>%
glimpse()
Observations: ??
Variables: 9
Database: spark_connection
$id <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17,...$ Survived <int> 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1...
$Pclass <int> 3, 1, 3, 1, 3, 3, 1, 3, 3, 2, 3, 1, 3, 3, 3, 2, 3, 2, 3, 3...$ Sex      <chr> "male", "female", "female", "female", "male", "male", "mal...
$Age <dbl> 22, 38, 26, 35, 35, NaN, 54, 2, 27, 14, 4, 58, 20, 39, 14,...$ SibSp    <int> 1, 1, 0, 1, 0, 0, 0, 3, 0, 1, 1, 0, 0, 1, 0, 0, 4, 0, 1, 0...
$Parch <int> 0, 0, 0, 0, 0, 0, 0, 1, 2, 0, 1, 0, 0, 5, 0, 0, 1, 0, 0, 0...$ Fare     <dbl> 7.2500, 71.2833, 7.9250, 53.1000, 8.0500, 8.4583, 51.8625,...
$Embarked <chr> "S", "C", "S", "S", "S", "Q", "S", "S", "S", "C", "S", "S"... May be the first thing we do in explaratory analysis is to check the missing values. newdata%>% mutate_all(is.na)%>% mutate_all(as.numeric)%>% summarise_all(sum) # Source: spark<?> [?? x 9] id Survived Pclass Sex Age SibSp Parch Fare Embarked <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> <dbl> 1 0 0 0 0 177 0 0 0 2 Since we have a large number of missing values it would be better to imput thes values rather than removing them. For the numeric variable Age we replace them by the median using the sparklyr function ft_imputer, and for categorical variable Embarked we use the most frequantly level which is here S port. But before this we should split the data first into training and testing sets to make sure that the testing set is completely isolated from the training set, then we impute each separately. Since the data are a little bit imbalanced we randomly split the data separately with respect to the target variable Survived in order to preserve the same proportions of the Survived varaible levels as the original data, then we rebind the corresponding sets again. data_surv<-newdata%>% filter(Survived==1) data_not<-newdata%>% filter(Survived==0) partition_surv<-data_surv%>% sdf_random_split(training=0.8,test=0.2,seed = 123) partition_not<-data_not%>% sdf_random_split(training=0.8,test=0.2,seed = 123) train<-sdf_bind_rows(partition_surv$training,partition_not$training)%>% ft_imputer(input_cols = "Age",output_cols="Age",strategy="median")%>% na.replace(Embarked="S")%>% compute("train") test<-sdf_bind_rows(partition_surv$test,partition_not$test)%>% ft_imputer(input_cols = "Age",output_cols="Age",strategy="median")%>% na.replace(Embarked="S")%>% compute("test") Not that we use compute function to cache the output into spark memory. Before fitting any model the data must be processed in a way that can be consumed by the model. For our model, such as the most machine learning models, requires numeric features, we convert thus categorical variables to integers using the function ft_string_indexer, after that we convert them to dumy variables using the function ft_one hot_encoder_estimator, because the last function expects the inputs to be numeric. For models build in sparklyr, the input variables should be stacked into one column vector on each other, this can be easily done by using the function ft_vector_assembler. However, this step does not prevent us to apply some other transformation even the features are in one column. For instance, to run efficiently our model we can transform the variables to be of the same scale, to do so we can either use standardization (sa we do here) or normalization method. It is a good practice to save this preocessed set into the spark memory under an object name using the function compute trained<- train%>% ft_string_indexer(input_col = "Sex",output_col="Sex_indexed")%>% ft_string_indexer(input_col = "Embarked",output_col="Embarked_indexed")%>% ft_one_hot_encoder_estimator( input_cols = c("Pclass","Sex_indexed","Embarked_indexed"), output_cols=c("Pc_encod","Sex_encod","Emb_encod") )%>% ft_vector_assembler(input_cols = c("Pc_encod","Sex_encod","Age","SibSp", "Parch","Fare","Emb_encod"), output_col="features")%>% ft_standard_scaler(input_col = "features",output_col="scaled", with_mean=TRUE)%>% select(id,Survived,scaled)%>% compute("trained") The same transformations above will be applied to the testing set test as follows. tested<-test%>% ft_string_indexer(input_col = "Sex",output_col="Sex_indexed")%>% ft_string_indexer(input_col = "Embarked",output_col="Embarked_indexed")%>% ft_one_hot_encoder_estimator( input_cols = c("Pclass","Sex_indexed","Embarked_indexed"), output_cols=c("Pc_encod","Sex_encod","Emb_encod") )%>% ft_vector_assembler(input_cols = c("Pc_encod","Sex_encod","Age","SibSp", "Parch","Fare","Emb_encod"), output_col="features")%>% ft_standard_scaler(input_col = "features",output_col="scaled", with_mean=TRUE)%>% select(id,Survived,scaled)%>% compute("tested") Now we are ready to project the data on the lower dimension hash using the function ft_bucketed_random_projection_lsh with buckets of length 3 and 5 hash tables. lsh_vector<-ft_bucketed_random_projection_lsh(sc, input_col = "scaled", output_col = "hash", bucket_length = 3, num_hash_tables = 5, seed=444) To fit this model we feed the function ml_fit by the training data trained. model_lsh<-ml_fit(lsh_vector,trained) ## Prediction At the prediction stage this model of classification gives us two alternatives for how we define the nearest neighbours: • define a threshold value from which we decide if two observations are considered as nearest neighbours or not, small value leads to take small number of neighbours. in sparklyr we can achive that using the function ml_approx_similarity_join and we specify the the threshold value for the minimum distance. the distance used by this function is the classical euclidien distance. • prespecify the number of the nearest neighbours regardeless of the distance between observations. This second alternative can be achieved using ml_approx_nearest_neighbors. Each of which has its advantages and drawbacks depending on the problem at hand. for instance in medecine if you are more interested to check the similarities among patients at some level then the first option would be your choice but you may not be able to predict new cases that are not similar to any of the training cases constrained by this threshold value. In contrast, if your goal is to predict all your new cases then you would opt for the second option, but with the cost of including neighbours that are far a way constrained by the fixed number of neighbours. To better understand what hppens with each option, let’s use the following data. suppressPackageStartupMessages(library(plotrix)) X<-c(55,31,35,34,15,28,8,38,35,19,27,40,39,19,66,28,42,21,18,14,40,27,3,19,21,32,13,18,7,21,49) Y<-c(16,18,26,13,8.0292,35.5,21.075,31.3875,7.225,263,7.8958,27.7208,146.5208,7.75,10.5,82.1708,52,8.05,18,11.2417,9.475,21,41.5792,7.8792,8.05,15.5,7.75,17.8,39.6875,7.8,76.7292) Z<-factor(c(1,0,0,1,1,1,0,1,0,0,0,0,1,1,0,0,0,0,0,1,0,0,1,1,0,0,1,0,0,0,1)) plot(X,Y,col=Z,ylim = c(0,55),pch=16) points(x=32,y=20,col="blue",pch=8) draw.circle(x=32,y=20,nv=1000,radius = 6,lty=2) points(x=55,y=42,col="green",pch=8) draw.circle(x=55,y=42,nv=1000,radius = 6,lty=2) Using the fake data above to illustrate the difference between the two methods. Setting the threshold at 6 for the first option, we see the blue dot has 5 neighbours and this dot would be predicted as black using the majority vote. However, with this threshold the green dot does not have any neighbour around and hence it will be left without prediction. plot(X,Y,pch=16,col=Z,ylim = c(0,55)) points(x=32,y=20,col="blue",pch=8) points(x=55,y=42,col="green",pch=8) draw.circle(x=55,y=42,nv=1000,radius = 21.8,lty=2) draw.circle(x=32,y=20,nv=1000,radius = 6,lty=2) In contrast to the above plot, using the second option, the green dot can be predicted as black since it has 5 neighbours in whcih 3 are block, but this prediction casts doubt about its quality since all the neighbours are far a way from the dot of interest, and this is the major drawback of this method. Note: In fact we can overcome the drawbacks of each method by tuning the hyperparameters. To get predictions of all the new cases we can increase the distance threshold using the first method so that all the cases will be predicted (but we may lose accuracy if we have any single outlier). And we can reduce the number for the nearest neighbours number to get meaningful similarities (but we may lose accuracy with dots spread out from each other). ### Similarity based on distance To show the neighbours of each point we use the function ml_approx_similarity_join provided that the data has an id column, this is thus the reason why we have created this id before. approx_join<-ml_approx_similarity_join(model_lsh, trained, trained, threshold=1, dist_col="dist") approx_join # Source: spark<?> [?? x 3] id_a id_b dist <int> <int> <dbl> 1 2 376 0.813 2 11 11 0 3 16 773 0.189 4 16 707 0.787 5 20 368 0.0000809 6 23 290 0.550 7 23 157 0.0787 8 24 873 0.707 9 24 448 0.502 10 24 84 0.224 # ... with more rows This function joined the data trained with itself to get the similar observations. The threshold determines the value from which we consider two observations as similar. In othe words, cases that has dist value less than 1 will be similar. let’s for instance pick up some similar observations and check out how they are similar. train%>% filter(id %in% c(29,654,275,199,45)) # Source: spark<?> [?? x 9] id Survived Pclass Sex Age SibSp Parch Fare Embarked <int> <int> <int> <chr> <dbl> <int> <int> <dbl> <chr> 1 29 1 3 female 28 0 0 7.88 Q 2 45 1 3 female 19 0 0 7.88 Q 3 199 1 3 female 28 0 0 7.75 Q 4 275 1 3 female 28 0 0 7.75 Q 5 654 1 3 female 28 0 0 7.83 Q  As we see all these passengers are all survived females in the same class (third class) without children or parents or siblings embarked from the same port, approximately paied the same ticket price, have the same age (except for 45 with age 19), so they are higly likely to be friends traveling togather. To predict the test set tested we use the function ml_predict, then we extrat the similarities with the fuction ml_approx_similarity_join. hashed<-ml_predict(model_lsh,tested)%>% ml_approx_similarity_join(model_lsh, trained, ., threshold=1, dist_col="dist") hashed # Source: spark<?> [?? x 3] id_a id_b dist <int> <int> <dbl> 1 12 863 0.904 2 16 459 0.557 3 29 728 0.266 4 29 33 0.265 5 37 245 0.479 6 45 33 0.788 7 48 728 0.265 8 48 369 0.265 9 48 187 0.848 10 54 519 0.564 # ... with more rows we can now shoose a particular person, say id_b=33, and then find his/her similar persons in the training set. By using the majority vote we decide if that person is survived or not. m<-33 ids_train<-hashed%>% filter(id_b==m)%>% pull(id_a) df1<-train%>% filter(id %in% ids_train) df2<-test%>% filter(id==m) df<-sdf_bind_rows(df1,df2) df # Source: spark<?> [?? x 9] id Survived Pclass Sex Age SibSp Parch Fare Embarked <int> <int> <int> <chr> <dbl> <int> <int> <dbl> <chr> 1 29 1 3 female 28 0 0 7.88 Q 2 45 1 3 female 19 0 0 7.88 Q 3 48 1 3 female 28 0 0 7.75 Q 4 83 1 3 female 28 0 0 7.79 Q 5 199 1 3 female 28 0 0 7.75 Q 6 275 1 3 female 28 0 0 7.75 Q 7 290 1 3 female 22 0 0 7.75 Q 8 301 1 3 female 28 0 0 7.75 Q 9 360 1 3 female 28 0 0 7.88 Q 10 574 1 3 female 28 0 0 7.75 Q # ... with more rows The last row in this table contains our test instance 33, and it has 17 neighbours from the training data with mixture of died and survived persons. df%>% filter(id!=m)%>% select(Survived)%>% collect()%>% table() ## . ## 0 1 ## 5 12 Using the majority vote this person will be classified as survived since the non survived persons number (5) is less than survived persons number (12), and hence this person is correctly classified. ### The similarity based on the number of nearest neighbours Using the same above steps but here with the function ml_approx_nearest_neighbors we can predict any point. for example let’s take our previous passenger 120 in the testing set. But first we have to extract the values related to this person from the transformed testing set tested. id_input <- tested %>% filter(id==m)%>% pull(scaled) %>% unlist() id_input ## [1] 0.00000000 -0.56054485 -0.50652969 -1.42132034 -0.07744921 -0.49874843 ## [7] -0.47508853 -0.54740838 -1.79973402 -0.41903250 These are the values of all the standardized vectors in the column scaled that will be used to get its closest neighbours in the training data, and here we specify the number of neighbours to be 7. knn<-ml_approx_nearest_neighbors( model_lsh, trained, key = id_input, dist_col = 'dist', num_nearest_neighbors = 7 ) knn # Source: spark<?> [?? x 5] id Survived scaled hash dist <int> <int> <list> <list> <dbl> 1 698 1 <dbl [10]> <list [5]> 0.265 2 48 1 <dbl [10]> <list [5]> 0.265 3 275 1 <dbl [10]> <list [5]> 0.265 4 199 1 <dbl [10]> <list [5]> 0.265 5 301 1 <dbl [10]> <list [5]> 0.265 6 574 1 <dbl [10]> <list [5]> 0.265 7 265 0 <dbl [10]> <list [5]> 0.265 Theses are the neighbours of our passenger with thir id’s. We can get the fraction of surived ones as follows. n<-sdf_nrow(knn) pred<-knn %>% select(Survived)%>% summarise(p=sum(Survived)/n) pred ## # Source: spark<?> [?? x 1] ## p ## <dbl> ## 1 0.857 Since this probability is greater than 0.5 we predict this passenger as survived, and here also is correctly classified. however, in some cases we can get different predictions. To get the accuracy of the whole testing set, we use the following for loop, which requires a lot of computing time since at the end of each iteration we collect the results into R. Consequently it will not usefull with large dataset. mypred<-numeric(0) M <- tested %>% collect() %>% .$id
for (i in M) {
id_input <- tested %>%
filter(id==i)%>%
pull(scaled) %>%
unlist()
knn<-ml_approx_nearest_neighbors(
model_lsh,
trained,
key = id_input,
dist_col = 'dist',
num_nearest_neighbors = 7
)
n<-sdf_nrow(knn)
pred<-knn%>%select(Survived)%>%
summarise(p=sum(Survived)/n)%>%
collect()
mypred<-rbind(mypred,pred)
}
mypred
# A tibble: 200 x 1
p
<dbl>
1 0.286
2 1
3 0.571
4 0
5 0.143
6 0.857
7 0.429
8 1
9 1
10 0.857
# ... with 190 more rows

Now first we convert the probabilities into class labels, next we join this data frame with the testing data, and finally we use the function confusionmatrix from caret package.

tested_R<-tested%>%
select(Survived)%>%
collect()
new<-cbind(mypred,tested_R)%>%
mutate(predicted=ifelse(p>0.5,"1","0"))
caret::confusionMatrix(as.factor(new$Survived),as.factor(new$predicted))
Confusion Matrix and Statistics

Reference
Prediction   0   1
0 109  12
1  30  49

Accuracy : 0.79
95% CI : (0.7269, 0.8443)
No Information Rate : 0.695
P-Value [Acc > NIR] : 0.001704

Kappa : 0.5425

Mcnemar's Test P-Value : 0.008712

Sensitivity : 0.7842
Specificity : 0.8033
Pos Pred Value : 0.9008
Neg Pred Value : 0.6203
Prevalence : 0.6950
Detection Rate : 0.5450
Detection Prevalence : 0.6050
Balanced Accuracy : 0.7937

'Positive' Class : 0


The accuracy rate is pretty good with 79%.

Finally, do not forget to dsiconnect when your work is completed.

spark_disconnect(sc)

## Conclusion

The LSH model is an approximation of knn when we have large dataset. We could increase the model performance by playing around with the threshold value or the number of the neighbours.