Using R and H2O Isolation Forest anomaly detection for data quality, further analysis.

[This article was first published on R-Analytics, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

This is the second article on data quality, for the first part, please go to: http://laranikalranalytics.blogspot.com/2019/11/using-r-and-h2o-isolation-forest-for.html
Since Isolation Forest is building an ensemble of isolation trees, and these trees are created randomly, there is a lot of randomness in the isolation forest training, so, to have a more robust result, 3 isolation forest models will be trained for a better anomaly detection.
I will also use Apache Spark for data handling.
For a full example, testing data will be used after training the 3 IF(Isolation Forest) models.
This way of using Isolation Forest is kind of a general usage also for maintenance prediction.
# Set Java parameters, enough memory for Java.
options( java.parameters = c( "-Xmx40G" ) ) # 40GB Ram for Java

# Loading libraries
suppressWarnings(suppressMessages(library(sparklyr)))
suppressWarnings(suppressMessages(library(h2o)))
suppressWarnings(suppressMessages(library(dplyr)))
suppressWarnings(suppressMessages(library(xts)))
suppressWarnings(suppressMessages(library(rsparkling))) # Version 3.26.10-2.4
suppressWarnings(suppressMessages(library(DT)))
suppressWarnings(suppressMessages(library(dygraphs))) # For interactive plotting

Sys.setenv(TZ = "America/Chicago") # R environment time zone.

# Connecting to Spark, local mode.
# For reference go to: https://spark.rstudio.com/guides/connections/

# Set Spark Config Parameters
config <- spark_config()
config["sparklyr.shell.driver-memory"] = "40g" # I created more swap, I must have more memory available.
config["sparklyr.cores.local"] = 4 # Using all cores on my Intel i5
config$sparklyr.cancellable = TRUE
config$spark.executor.cores = 4
config$spark.cores.max = 4
config$spark.ext.h2o.nthreads = -1 # Ensure all threads when using H2O

# Connecting to Spark.
sc = spark_connect(master = "local", version = "2.4.3", hadoop_version="2.7", config=config)

# Setting Java TimeZone to GMT after initializing spark allow us to have a better 
# date time data handling.
sparklyr::invoke_static(sc, "java.util.TimeZone",  "getTimeZone", "GMT") %>%
  sparklyr::invoke_static(sc, "java.util.TimeZone", "setDefault", .)
## NULL
# Start importing data to Spark and doing some data cleaning
startTime = Sys.time()
# Start Time:
startTime
## [1] "2019-12-16 12:56:51 CST"
allDataF = spark_read_csv( sc, "allDataF"
                          , path = "/home/ckassab/Development/R/DataQuality/Data/PGA_Tour_Golf_Data_2019_Kaggle.csv"
                          , memory = FALSE # Map the file, but not make a copy of it in memory, this saves 1g ram.
                          , header = TRUE
                          , delimiter = ","
                          , quote = "\""
                          , infer_schema = TRUE
                          , null_value = NULL )

# Data cleaning
allData = allDataF %>%
  na.omit() %>% # Dropping all NAs from dataset
  mutate(Date = as.Date(substr(Date,1,10))) # Set date format as needed.
## * Dropped 174167 rows with 'na.omit' (9720529 => 9546362)
# End importing data to Spark and doing some data cleaning
# End Time:
Sys.time()
## [1] "2019-12-16 12:58:33 CST"
# Total time:
Sys.time() - startTime
## Time difference of 1.701981 mins
# Inspect the H2OContext for our Spark connection
# This will also start an H2O cluster
h2o_context(sc)
## <jobj[55]>
##   org.apache.spark.h2o.H2OContext
##   
## Sparkling Water Context:
##  * Sparkling Water Version: 3.26.10-2.4
##  * H2O name: sparkling-water-ckassab_local-1576522610564
##  * cluster size: 1
##  * list of used nodes:
##   (executorId, host, port)
##   ------------------------
##   (driver,127.0.0.1,54321)
##   ------------------------
## 
##   Open H2O Flow in browser: http://127.0.0.1:54321 (CMD + click in Mac OSX)
## 
## 
h2o.removeAll() # Removes all data from h2o cluster, ensuring it is clean.
h2o.no_progress()  # Turn off progress bars for notebook readability

# Setting H2O timezone for proper date data type handling
h2o.setTimezone("US/Central")
## [1] "US/Central"
# Convert dataset to H2O format.
allData_hex = as_h2o_frame( sc, allData )

# Converting certain columns to factor.
allData_hex[,1] = as.factor(allData_hex[,1])
allData_hex[,3] = as.factor(allData_hex[,3])
allData_hex[,4] = as.factor(allData_hex[,4])
allData_hex[,5] = as.factor(allData_hex[,5])

# Getting numeric codes from factors, so we can use them to build
# IF(Isolation Forest) models, I am doing this because data has no codes
# In a real model, the best is to have data with integer IDs.
# Getting the codes using H2O is easier, becuase Spark does not have factor data type.
allData_hex$Player_Code = as.numeric(allData_hex[,1])
allData_hex$Statistic_Code = as.numeric(allData_hex[,3])
allData_hex$Variable_Code = as.numeric(allData_hex[,4])
allData_hex$Value_Code = as.numeric(allData_hex[,5])


# split into train and validation sets
allData_hex_split = h2o.splitFrame(data = allData_hex, ratios = 0.9, seed = 1234)
trainData_hex = allData_hex_split[[1]]
testData_hex = allData_hex_split[[2]]

# Save training and testing datasets to kepp coded data backup.
h2o.exportFile(trainData_hex
               , force = TRUE
               , sep = "|"
               , path = "/home/ckassab/Development/R/DataQuality/Data/PGA_Tour_trainData_hex.csv" )

h2o.exportFile(testData_hex
               , force = TRUE
               , sep = "|"
               , path = "/home/ckassab/Development/R/DataQuality/Data/PGA_Tour_testData_hex.csv" )

# Variable names to be used when creating models.
featureNames = c( "Player_Code", "Statistic_Code", "Variable_Code", "Value_Code" )

################################################################################
# Building 3 Isolation forest models:
# http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/if.html
# Parameter values set:
  # sample_rate: 
  # Specify the row sampling rate (x-axis). (Note that this method is sample without replacement.) 
  # Without replacement meaning: 
  # Each sample unit of the population has only one chance to be selected in the sample.
  # I understand you take a sample of the population and then take a new sample
  # without putting the first sample on the population, this means without replacement.
  # in this way you avoid taking the same individual(record) more than once.
  # Reference: 
  # https://methods.sagepub.com/reference/encyclopedia-of-survey-research-methods/n516.xml
  # https://stats.stackexchange.com/questions/69744/why-at-all-consider-sampling-without-replacement-in-a-practical-application
  # The sample_rate range is 0.0 to 1.0. Higher values may improve training accuracy. 
  # Test accuracy improves when either columns or rows are sampled. 
  # For details, refer to “Stochastic Gradient Boosting” (Friedman, 1999). 
  # If set to -1 (default), then sample_size parameter will be used instead.
  # 
  # For this analysis I am setting up sample_rate=.8 
  #
  # From H2O docs:http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/sample_rate.html
  #
  # In GBM and XGBoost, this value defaults to 1; in DRF, this value defaults to 0.6320000291. 
  # Row and column sampling (sample_rate and col_sample_rate) can improve generalization 
  # and lead to lower validation and test set errors. 
  # Good general values for large datasets are around 0.7 to 0.8 (sampling 70-80 percent of the data) 
  # for both parameters, as higher values generally improve training accuracy.


  # max_depth: Specify the maximum tree depth. Higher values will make the model
  # more complex and can lead to overfitting. Setting this value to 0 specifies no limit. 
  # This value defaults to 8.
  # http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/max_depth.html

  # seed: Specify the random number generator (RNG) seed for algorithm components 
  # dependent on randomization. The seed is consistent for each H2O instance so 
  # that you can create models with the same starting conditions in alternative configurations.
  # The meaning is fix a random number generator seed for reproducibility.
  # here I am creating 9 different models with 9 different seeds on the same data.

  # x: Specify a vector containing the names or indices of the predictor variables to use when building the model.
################################################################################
startTime = Sys.time()
# Start Time:
startTime
## [1] "2019-12-16 13:05:50 CST"
trainingModel1 = h2o.isolationForest( training_frame = trainData_hex
                                      , x = featureNames
                                      , model_id = "trainingIFModel1"
                                      , sample_rate = 0.8
                                      , max_depth = 32
                                      , ntrees = 100
                                      , seed = 1260 )
## Warning in .h2o.startModelJob(algo, params, h2oRestApiVersion): Stopping tolerance is ignored for _stopping_rounds=0..
trainingModel2 = h2o.isolationForest( training_frame = trainData_hex
                                      , x = featureNames
                                      , model_id = "trainingIFModel2"
                                      , sample_rate = 0.8
                                      , max_depth = 32
                                      , ntrees = 100
                                      , seed = 1634 )
## Warning in .h2o.startModelJob(algo, params, h2oRestApiVersion): Stopping tolerance is ignored for _stopping_rounds=0..
trainingModel3 = h2o.isolationForest( training_frame = trainData_hex
                                      , x = featureNames
                                      , model_id = "trainingIFModel3"
                                      , sample_rate = 0.8
                                      , max_depth = 32
                                      , ntrees = 100
                                      , seed = 1235 )
## Warning in .h2o.startModelJob(algo, params, h2oRestApiVersion): Stopping tolerance is ignored for _stopping_rounds=0..
# End Time:
Sys.time()
## [1] "2019-12-16 21:15:34 CST"
# Total time to train IF(Isolation Forest) models:
Sys.time() - startTime
## Time difference of 8.162204 hours
# Saving models for possible use with some future testing data.
h2o.saveModel( trainingModel1
               , "/home/ckassab/Development/R/DataQuality/Models"
               , force = TRUE )
## [1] "/home/ckassab/Development/R/DataQuality/Models/trainingIFModel1"
h2o.saveModel( trainingModel2
               , "/home/ckassab/Development/R/DataQuality/Models"
               , force = TRUE )
## [1] "/home/ckassab/Development/R/DataQuality/Models/trainingIFModel2"
h2o.saveModel( trainingModel3
               , "/home/ckassab/Development/R/DataQuality/Models"
               , force = TRUE )
## [1] "/home/ckassab/Development/R/DataQuality/Models/trainingIFModel3"
################################################################################
# Calculate scores.
startTime = Sys.time()
# Start Time:
startTime
## [1] "2019-12-16 21:16:05 CST"
score1 = h2o.predict( trainingModel1, trainData_hex )
score2 = h2o.predict( trainingModel2, trainData_hex )
score3 = h2o.predict( trainingModel3, trainData_hex )
# End Time:
Sys.time()
## [1] "2019-12-16 22:33:48 CST"
# Total time to get IF(Isolation Forest) models scores:
Sys.time() - startTime
## Time difference of 1.295181 hours
################################################################################
# Setting desired threshold percentage.
threshold = .999 # Let's say we want the .001% data different than the rest.

# Using this threshold to get score limit to filter data anomalies.
# These score limits will be also used to get testing data anomalies.
scoreLimit1 = round( h2o.quantile( score1[,1], threshold ), 4 )
scoreLimit2 = round( h2o.quantile( score2[,1], threshold ), 4 )
scoreLimit3 = round( h2o.quantile( score3[,1], threshold ), 4 )

# Saving score limits to file.
scoreLimitNames = c( "scoreLimit1", "scoreLimit2", "scoreLimit3" )

scoreLimitValues = c( scoreLimit1, scoreLimit2, scoreLimit3 )

scoreLimits = data.frame(scoreLimitNames, scoreLimitValues)

write.table( scoreLimits
           , file = "/home/ckassab/Development/R/DataQuality/Data/scoreLimits.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )


################################################################################
# Once we have our score limits, let's use them to get data anomalies.
################################################################################

# Add row score at the beginning of dataset
trainData_hexScores = h2o.cbind( round( score1[,1], 4 )
                                 , round( score2[,1], 4 )
                                 , round( score3[,1], 4 )
                                 , trainData_hex )

# Get data anomalies from training dataset.
anomalies1 = trainData_hexScores[ trainData_hexScores[,1] > scoreLimits[1,2], ]
anomalies2 = trainData_hexScores[ trainData_hexScores[,2] > scoreLimits[2,2], ]
anomalies3 = trainData_hexScores[ trainData_hexScores[,3] > scoreLimits[3,2], ]


################################################################################
# All anomalies have been detected using 3 IF(Isolation Forest) models.

# As mentioned, using Spark for data handling, easier than H2O data handling

anomaliesS1 = as_spark_dataframe( sc, anomalies1, name = "anomaliesS1" )
anomaliesS2 = as_spark_dataframe( sc, anomalies2, name = "anomaliesS2" )
anomaliesS3 = as_spark_dataframe( sc, anomalies3, name = "anomaliesS3" )

# Grouping and counting anomalies
anomaliesS1 = anomaliesS1 %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(AnomCount = count()) %>%
  mutate(ModelNumber = "1")


anomaliesS2 = anomaliesS2 %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(AnomCount = count()) %>%
  mutate(ModelNumber = "2")


anomaliesS3 = anomaliesS3 %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(AnomCount = count()) %>%
  mutate(ModelNumber = "3")

anomaliesS = sdf_bind_rows( anomaliesS1, anomaliesS2, anomaliesS3 )
anomaliesS = sdf_sort(anomaliesS, c("Date", "Player_Code", "Variable_Code"))

anomsInAllModels = anomaliesS %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(TotalAnomalies = count()) %>%
  filter(TotalAnomalies==(AnomCount*3)) %>% # Filtering anomalies found in 3 models.
  collect() # Copy to R to create chart.

# Save anomsInAllModels to pipe delimited file.
write.table( anomsInAllModels
           , file = "/home/ckassab/Development/R/DataQuality/Data/anomsInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )

# Just for reference and future study, getting anomalies not in all models.
# The consideration here is that if the anomaly is present in less than 3
# models, it is more possible not to be a "real" anomaly.
anomsNOtInAllModels = anomaliesS %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(TotalAnomalies = count()) %>%
  filter(TotalAnomalies<(AnomCount*3)) %>% # Filtering anomalies found in less than 3 models.
  collect() # Copy to R to create chart.

# Save anomsNOtInAllModels to pipe delimited file.
write.table( anomsNOtInAllModels
           , file = "/home/ckassab/Development/R/DataQuality/Data/anomsNOtInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )


# Since we have data processed with 3 models, it is needed to keep just unique values.
distinctAnomalies = anomsInAllModels %>%
  distinct(Date, Player_Code, Player_Name, Statistic, Variable_Code, Variable, Value)

write.table( distinctAnomalies
           , file = "/home/ckassab/Development/R/DataQuality/Data/distinctAnomalies_PGA_Tour_Golf_Data_2019_Kaggle.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )


cat( "Anomalies found in training dataset: ", dim(distinctAnomalies)[1] )
## Anomalies found in training dataset:  3895
################################################################################
# If anomalies found, create chart
################################################################################

if( dim(distinctAnomalies)[1] > 0 ) {

  # Creating a time series with player codes
  players_xts <- xts( distinctAnomalies$Player_Code, order.by=as.Date(distinctAnomalies$Date))
  
  # Creating a time series with variable codes
  variables_xts <- xts( distinctAnomalies[,5], order.by=as.Date(distinctAnomalies$Date))
  
  # Binding time series.
  allAnomalies_xts <- cbind(players_xts, variables_xts)
  
  # Displaying the chart.
  anomaliesGraph = dygraph( allAnomalies_xts, main = ''
                            , xlab = "Date", ylab = "Player Code." ) %>%
    dyAxis("y", label = "Player Code.") %>%
    dyAxis("y2", label = "Variable Code.", independentTicks = TRUE) %>%
    dySeries( name = "players_xts", label = "Player Code", drawPoints = TRUE, pointShape = "dot"
              , color = "blue", pointSize = 2 ) %>%
    dySeries( name = "Variable_Code", label = "Variable Code", drawPoints = TRUE, pointShape = "dot"
              , color = "green", pointSize = 2, axis = 'y2' ) %>%
    dyRangeSelector()
  dyOptions( anomaliesGraph, digitsAfterDecimal = 0 )

  
}








*************************************************Checking Testing Data Anomalies.

*************************************************

# Calculate scores
testScore1 = h2o.predict( trainingModel1, testData_hex )
testScore2 = h2o.predict( trainingModel2, testData_hex )
testScore3 = h2o.predict( trainingModel3, testData_hex )

# Add row scores at the beginning of dataset
testData_hexScores = h2o.cbind( round( testScore1[,1], 4 )
                                , round( testScore2[,1], 4 )
                                , round( testScore3[,1], 4 )
                                , testData_hex )

# Get data anomalies by filtering using scorelimits.
testAnomalies1 = testData_hexScores[ testData_hexScores[,1] > scoreLimits[1,2], ]
testAnomalies2 = testData_hexScores[ testData_hexScores[,2] > scoreLimits[2,2], ]
testAnomalies3 = testData_hexScores[ testData_hexScores[,3] > scoreLimits[3,2], ]

# Convert H2O dataframes to spark dataframes.
testAnomaliesS1 = as_spark_dataframe(sc, testAnomalies1)
testAnomaliesS2 = as_spark_dataframe(sc, testAnomalies2)
testAnomaliesS3 = as_spark_dataframe(sc, testAnomalies3)


# Grouping and counting anomalies
testAnomaliesS1 = testAnomaliesS1 %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(AnomCount = count()) %>%
  mutate(ModelNumber = "1")


testAnomaliesS2 = testAnomaliesS2 %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(AnomCount = count()) %>%
  mutate(ModelNumber = "2")


testAnomaliesS3 = testAnomaliesS3 %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(AnomCount = count()) %>%
  mutate(ModelNumber = "3")


testAnomaliesS = sdf_bind_rows( testAnomaliesS1, testAnomaliesS2, testAnomaliesS3 )
testAnomaliesS = sdf_sort(testAnomaliesS, c("Date", "Player_Code", "Variable_Code"))

testAnomsInAllModels = testAnomaliesS %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(TotalAnomalies = count()) %>%
  filter(TotalAnomalies==(AnomCount*3)) %>% # Filtering anomalies found in 3 models.
  collect() # Copy to R to create chart.

# Save anomsInAllModels to pipe delimited file.
write.table( testAnomsInAllModels
           , file = "/home/ckassab/Development/R/DataQuality/Data/testAnomsInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )

# Just for reference and future study, getting anomalies not in all models.
# The consideration here is that if the anomaly is present in less than 3
# models, it is more possible not to be a "real" anomaly.
testAnomsNOtInAllModels = testAnomaliesS %>%
  group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>%
  select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% 
  mutate(TotalAnomalies = count()) %>%
  filter(TotalAnomalies<(AnomCount*3)) %>% # Filtering anomalies found in less than 3 models.
  collect() # Copy to R to create chart.

# Save testAnomsNOtInAllModels to pipe delimited file.
write.table( testAnomsNOtInAllModels
           , file = "/home/ckassab/Development/R/DataQuality/Data/testAnomsNOtInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )

testDistinctAnomalies = testAnomsInAllModels %>%
  distinct(Date, Player_Code, Player_Name, Statistic, Variable_Code, Variable, Value)

write.table( testDistinctAnomalies
           , file = "/home/ckassab/Development/R/DataQuality/Data/testDistinctAnomalies_PGA_Tour_Golf_Data_2019_Kaggle.csv"
           , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE )


cat( "Anomalies found in testing dataset: ", dim(testDistinctAnomalies)[1] )
## Anomalies found in testing dataset:  425
# Now we disconnect from Spark, this will result in the H2OContext being stopped as
# well since it's owned by the spark shell process used by our Spark connection:
spark_disconnect(sc)
## NULL
################################################################################
# If anomalies found, create chart
################################################################################

if( dim(testDistinctAnomalies)[1] > 0 ) {

  # Creating a time series with player codes
  testPlayers_xts <- xts( testDistinctAnomalies$Player_Code, order.by=as.Date(testDistinctAnomalies$Date))
  
  # Creating a time series with variable codes
  testVariables_xts <- xts( testDistinctAnomalies[,5], order.by=as.Date(testDistinctAnomalies$Date))
  
  # Binding time series.
  testAllAnomalies_xts <- cbind(testPlayers_xts, testVariables_xts)
  
  # Displaying the chart.
  anomaliesGraph = dygraph( testAllAnomalies_xts, main = ''
                            , xlab = "Date", ylab = "Player Code." ) %>%
    dyAxis("y", label = "Player Code.") %>%
    dyAxis("y2", label = "Variable Code.", independentTicks = TRUE) %>%
    dySeries( name = "testPlayers_xts", label = "Player Code", drawPoints = TRUE, pointShape = "dot"
              , color = "blue", pointSize = 2 ) %>%
    dySeries( name = "Variable_Code", label = "Variable Code", drawPoints = TRUE, pointShape = "dot"
              , color = "green", pointSize = 2, axis = 'y2' ) %>%
    dyRangeSelector()
  dyOptions( anomaliesGraph, digitsAfterDecimal = 0 )

  
}






To leave a comment for the author, please follow the link and comment on their blog: R-Analytics.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)