Using R and H2O Isolation Forest anomaly detection for data quality, further analysis.
[This article was first published on R-Analytics, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Introduction:
This is the second article on data quality, for the first part, please go to: http://laranikalranalytics.blogspot.com/2019/11/using-r-and-h2o-isolation-forest-for.html
Since Isolation Forest is building an ensemble of isolation trees, and these trees are created randomly, there is a lot of randomness in the isolation forest training, so, to have a more robust result, 3 isolation forest models will be trained for a better anomaly detection.
I will also use Apache Spark for data handling.
For a full example, testing data will be used after training the 3 IF(Isolation Forest) models.
This way of using Isolation Forest is kind of a general usage also for maintenance prediction.
I am working with data from file:
https://www.kaggle.com/bradklassen/pga-tour-20102018-data
https://www.kaggle.com/bradklassen/pga-tour-20102018-data
# Set Java parameters, enough memory for Java. options( java.parameters = c( "-Xmx40G" ) ) # 40GB Ram for Java # Loading libraries suppressWarnings(suppressMessages(library(sparklyr))) suppressWarnings(suppressMessages(library(h2o))) suppressWarnings(suppressMessages(library(dplyr))) suppressWarnings(suppressMessages(library(xts))) suppressWarnings(suppressMessages(library(rsparkling))) # Version 3.26.10-2.4 suppressWarnings(suppressMessages(library(DT))) suppressWarnings(suppressMessages(library(dygraphs))) # For interactive plotting Sys.setenv(TZ = "America/Chicago") # R environment time zone. # Connecting to Spark, local mode. # For reference go to: https://spark.rstudio.com/guides/connections/ # Set Spark Config Parameters config <- spark_config() config["sparklyr.shell.driver-memory"] = "40g" # I created more swap, I must have more memory available. config["sparklyr.cores.local"] = 4 # Using all cores on my Intel i5 config$sparklyr.cancellable = TRUE config$spark.executor.cores = 4 config$spark.cores.max = 4 config$spark.ext.h2o.nthreads = -1 # Ensure all threads when using H2O # Connecting to Spark. sc = spark_connect(master = "local", version = "2.4.3", hadoop_version="2.7", config=config) # Setting Java TimeZone to GMT after initializing spark allow us to have a better # date time data handling. sparklyr::invoke_static(sc, "java.util.TimeZone", "getTimeZone", "GMT") %>% sparklyr::invoke_static(sc, "java.util.TimeZone", "setDefault", .) ## NULL # Start importing data to Spark and doing some data cleaning startTime = Sys.time() # Start Time: startTime ## [1] "2019-12-16 12:56:51 CST" allDataF = spark_read_csv( sc, "allDataF" , path = "/home/ckassab/Development/R/DataQuality/Data/PGA_Tour_Golf_Data_2019_Kaggle.csv" , memory = FALSE # Map the file, but not make a copy of it in memory, this saves 1g ram. , header = TRUE , delimiter = "," , quote = "\"" , infer_schema = TRUE , null_value = NULL ) # Data cleaning allData = allDataF %>% na.omit() %>% # Dropping all NAs from dataset mutate(Date = as.Date(substr(Date,1,10))) # Set date format as needed. ## * Dropped 174167 rows with 'na.omit' (9720529 => 9546362) # End importing data to Spark and doing some data cleaning # End Time: Sys.time() ## [1] "2019-12-16 12:58:33 CST" # Total time: Sys.time() - startTime ## Time difference of 1.701981 mins # Inspect the H2OContext for our Spark connection # This will also start an H2O cluster h2o_context(sc) ## <jobj[55]> ## org.apache.spark.h2o.H2OContext ## ## Sparkling Water Context: ## * Sparkling Water Version: 3.26.10-2.4 ## * H2O name: sparkling-water-ckassab_local-1576522610564 ## * cluster size: 1 ## * list of used nodes: ## (executorId, host, port) ## ------------------------ ## (driver,127.0.0.1,54321) ## ------------------------ ## ## Open H2O Flow in browser: http://127.0.0.1:54321 (CMD + click in Mac OSX) ## ## h2o.removeAll() # Removes all data from h2o cluster, ensuring it is clean. h2o.no_progress() # Turn off progress bars for notebook readability # Setting H2O timezone for proper date data type handling h2o.setTimezone("US/Central") ## [1] "US/Central" # Convert dataset to H2O format. allData_hex = as_h2o_frame( sc, allData ) # Converting certain columns to factor. allData_hex[,1] = as.factor(allData_hex[,1]) allData_hex[,3] = as.factor(allData_hex[,3]) allData_hex[,4] = as.factor(allData_hex[,4]) allData_hex[,5] = as.factor(allData_hex[,5]) # Getting numeric codes from factors, so we can use them to build # IF(Isolation Forest) models, I am doing this because data has no codes # In a real model, the best is to have data with integer IDs. # Getting the codes using H2O is easier, becuase Spark does not have factor data type. allData_hex$Player_Code = as.numeric(allData_hex[,1]) allData_hex$Statistic_Code = as.numeric(allData_hex[,3]) allData_hex$Variable_Code = as.numeric(allData_hex[,4]) allData_hex$Value_Code = as.numeric(allData_hex[,5]) # split into train and validation sets allData_hex_split = h2o.splitFrame(data = allData_hex, ratios = 0.9, seed = 1234) trainData_hex = allData_hex_split[[1]] testData_hex = allData_hex_split[[2]] # Save training and testing datasets to kepp coded data backup. h2o.exportFile(trainData_hex , force = TRUE , sep = "|" , path = "/home/ckassab/Development/R/DataQuality/Data/PGA_Tour_trainData_hex.csv" ) h2o.exportFile(testData_hex , force = TRUE , sep = "|" , path = "/home/ckassab/Development/R/DataQuality/Data/PGA_Tour_testData_hex.csv" ) # Variable names to be used when creating models. featureNames = c( "Player_Code", "Statistic_Code", "Variable_Code", "Value_Code" ) ################################################################################ # Building 3 Isolation forest models: # http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/if.html # Parameter values set: # sample_rate: # Specify the row sampling rate (x-axis). (Note that this method is sample without replacement.) # Without replacement meaning: # Each sample unit of the population has only one chance to be selected in the sample. # I understand you take a sample of the population and then take a new sample # without putting the first sample on the population, this means without replacement. # in this way you avoid taking the same individual(record) more than once. # Reference: # https://methods.sagepub.com/reference/encyclopedia-of-survey-research-methods/n516.xml # https://stats.stackexchange.com/questions/69744/why-at-all-consider-sampling-without-replacement-in-a-practical-application # The sample_rate range is 0.0 to 1.0. Higher values may improve training accuracy. # Test accuracy improves when either columns or rows are sampled. # For details, refer to “Stochastic Gradient Boosting” (Friedman, 1999). # If set to -1 (default), then sample_size parameter will be used instead. # # For this analysis I am setting up sample_rate=.8 # # From H2O docs:http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/sample_rate.html # # In GBM and XGBoost, this value defaults to 1; in DRF, this value defaults to 0.6320000291. # Row and column sampling (sample_rate and col_sample_rate) can improve generalization # and lead to lower validation and test set errors. # Good general values for large datasets are around 0.7 to 0.8 (sampling 70-80 percent of the data) # for both parameters, as higher values generally improve training accuracy. # max_depth: Specify the maximum tree depth. Higher values will make the model # more complex and can lead to overfitting. Setting this value to 0 specifies no limit. # This value defaults to 8. # http://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/max_depth.html # seed: Specify the random number generator (RNG) seed for algorithm components # dependent on randomization. The seed is consistent for each H2O instance so # that you can create models with the same starting conditions in alternative configurations. # The meaning is fix a random number generator seed for reproducibility. # here I am creating 9 different models with 9 different seeds on the same data. # x: Specify a vector containing the names or indices of the predictor variables to use when building the model. ################################################################################ startTime = Sys.time() # Start Time: startTime ## [1] "2019-12-16 13:05:50 CST" trainingModel1 = h2o.isolationForest( training_frame = trainData_hex , x = featureNames , model_id = "trainingIFModel1" , sample_rate = 0.8 , max_depth = 32 , ntrees = 100 , seed = 1260 ) ## Warning in .h2o.startModelJob(algo, params, h2oRestApiVersion): Stopping tolerance is ignored for _stopping_rounds=0.. trainingModel2 = h2o.isolationForest( training_frame = trainData_hex , x = featureNames , model_id = "trainingIFModel2" , sample_rate = 0.8 , max_depth = 32 , ntrees = 100 , seed = 1634 ) ## Warning in .h2o.startModelJob(algo, params, h2oRestApiVersion): Stopping tolerance is ignored for _stopping_rounds=0.. trainingModel3 = h2o.isolationForest( training_frame = trainData_hex , x = featureNames , model_id = "trainingIFModel3" , sample_rate = 0.8 , max_depth = 32 , ntrees = 100 , seed = 1235 ) ## Warning in .h2o.startModelJob(algo, params, h2oRestApiVersion): Stopping tolerance is ignored for _stopping_rounds=0.. # End Time: Sys.time() ## [1] "2019-12-16 21:15:34 CST" # Total time to train IF(Isolation Forest) models: Sys.time() - startTime ## Time difference of 8.162204 hours # Saving models for possible use with some future testing data. h2o.saveModel( trainingModel1 , "/home/ckassab/Development/R/DataQuality/Models" , force = TRUE ) ## [1] "/home/ckassab/Development/R/DataQuality/Models/trainingIFModel1" h2o.saveModel( trainingModel2 , "/home/ckassab/Development/R/DataQuality/Models" , force = TRUE ) ## [1] "/home/ckassab/Development/R/DataQuality/Models/trainingIFModel2" h2o.saveModel( trainingModel3 , "/home/ckassab/Development/R/DataQuality/Models" , force = TRUE ) ## [1] "/home/ckassab/Development/R/DataQuality/Models/trainingIFModel3" ################################################################################ # Calculate scores. startTime = Sys.time() # Start Time: startTime ## [1] "2019-12-16 21:16:05 CST" score1 = h2o.predict( trainingModel1, trainData_hex ) score2 = h2o.predict( trainingModel2, trainData_hex ) score3 = h2o.predict( trainingModel3, trainData_hex ) # End Time: Sys.time() ## [1] "2019-12-16 22:33:48 CST" # Total time to get IF(Isolation Forest) models scores: Sys.time() - startTime ## Time difference of 1.295181 hours ################################################################################ # Setting desired threshold percentage. threshold = .999 # Let's say we want the .001% data different than the rest. # Using this threshold to get score limit to filter data anomalies. # These score limits will be also used to get testing data anomalies. scoreLimit1 = round( h2o.quantile( score1[,1], threshold ), 4 ) scoreLimit2 = round( h2o.quantile( score2[,1], threshold ), 4 ) scoreLimit3 = round( h2o.quantile( score3[,1], threshold ), 4 ) # Saving score limits to file. scoreLimitNames = c( "scoreLimit1", "scoreLimit2", "scoreLimit3" ) scoreLimitValues = c( scoreLimit1, scoreLimit2, scoreLimit3 ) scoreLimits = data.frame(scoreLimitNames, scoreLimitValues) write.table( scoreLimits , file = "/home/ckassab/Development/R/DataQuality/Data/scoreLimits.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) ################################################################################ # Once we have our score limits, let's use them to get data anomalies. ################################################################################ # Add row score at the beginning of dataset trainData_hexScores = h2o.cbind( round( score1[,1], 4 ) , round( score2[,1], 4 ) , round( score3[,1], 4 ) , trainData_hex ) # Get data anomalies from training dataset. anomalies1 = trainData_hexScores[ trainData_hexScores[,1] > scoreLimits[1,2], ] anomalies2 = trainData_hexScores[ trainData_hexScores[,2] > scoreLimits[2,2], ] anomalies3 = trainData_hexScores[ trainData_hexScores[,3] > scoreLimits[3,2], ] ################################################################################ # All anomalies have been detected using 3 IF(Isolation Forest) models. # As mentioned, using Spark for data handling, easier than H2O data handling anomaliesS1 = as_spark_dataframe( sc, anomalies1, name = "anomaliesS1" ) anomaliesS2 = as_spark_dataframe( sc, anomalies2, name = "anomaliesS2" ) anomaliesS3 = as_spark_dataframe( sc, anomalies3, name = "anomaliesS3" ) # Grouping and counting anomalies anomaliesS1 = anomaliesS1 %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(Date, Player_Name, Statistic, Variable, Value) %>% mutate(AnomCount = count()) %>% mutate(ModelNumber = "1") anomaliesS2 = anomaliesS2 %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(Date, Player_Name, Statistic, Variable, Value) %>% mutate(AnomCount = count()) %>% mutate(ModelNumber = "2") anomaliesS3 = anomaliesS3 %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(Date, Player_Name, Statistic, Variable, Value) %>% mutate(AnomCount = count()) %>% mutate(ModelNumber = "3") anomaliesS = sdf_bind_rows( anomaliesS1, anomaliesS2, anomaliesS3 ) anomaliesS = sdf_sort(anomaliesS, c("Date", "Player_Code", "Variable_Code")) anomsInAllModels = anomaliesS %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% mutate(TotalAnomalies = count()) %>% filter(TotalAnomalies==(AnomCount*3)) %>% # Filtering anomalies found in 3 models. collect() # Copy to R to create chart. # Save anomsInAllModels to pipe delimited file. write.table( anomsInAllModels , file = "/home/ckassab/Development/R/DataQuality/Data/anomsInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) # Just for reference and future study, getting anomalies not in all models. # The consideration here is that if the anomaly is present in less than 3 # models, it is more possible not to be a "real" anomaly. anomsNOtInAllModels = anomaliesS %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% mutate(TotalAnomalies = count()) %>% filter(TotalAnomalies<(AnomCount*3)) %>% # Filtering anomalies found in less than 3 models. collect() # Copy to R to create chart. # Save anomsNOtInAllModels to pipe delimited file. write.table( anomsNOtInAllModels , file = "/home/ckassab/Development/R/DataQuality/Data/anomsNOtInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) # Since we have data processed with 3 models, it is needed to keep just unique values. distinctAnomalies = anomsInAllModels %>% distinct(Date, Player_Code, Player_Name, Statistic, Variable_Code, Variable, Value) write.table( distinctAnomalies , file = "/home/ckassab/Development/R/DataQuality/Data/distinctAnomalies_PGA_Tour_Golf_Data_2019_Kaggle.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) cat( "Anomalies found in training dataset: ", dim(distinctAnomalies)[1] ) ## Anomalies found in training dataset: 3895 ################################################################################ # If anomalies found, create chart ################################################################################ if( dim(distinctAnomalies)[1] > 0 ) { # Creating a time series with player codes players_xts <- xts( distinctAnomalies$Player_Code, order.by=as.Date(distinctAnomalies$Date)) # Creating a time series with variable codes variables_xts <- xts( distinctAnomalies[,5], order.by=as.Date(distinctAnomalies$Date)) # Binding time series. allAnomalies_xts <- cbind(players_xts, variables_xts) # Displaying the chart. anomaliesGraph = dygraph( allAnomalies_xts, main = '' , xlab = "Date", ylab = "Player Code." ) %>% dyAxis("y", label = "Player Code.") %>% dyAxis("y2", label = "Variable Code.", independentTicks = TRUE) %>% dySeries( name = "players_xts", label = "Player Code", drawPoints = TRUE, pointShape = "dot" , color = "blue", pointSize = 2 ) %>% dySeries( name = "Variable_Code", label = "Variable Code", drawPoints = TRUE, pointShape = "dot" , color = "green", pointSize = 2, axis = 'y2' ) %>% dyRangeSelector() dyOptions( anomaliesGraph, digitsAfterDecimal = 0 ) }
*************************************************Checking Testing Data Anomalies.
*************************************************
# Calculate scores testScore1 = h2o.predict( trainingModel1, testData_hex ) testScore2 = h2o.predict( trainingModel2, testData_hex ) testScore3 = h2o.predict( trainingModel3, testData_hex ) # Add row scores at the beginning of dataset testData_hexScores = h2o.cbind( round( testScore1[,1], 4 ) , round( testScore2[,1], 4 ) , round( testScore3[,1], 4 ) , testData_hex ) # Get data anomalies by filtering using scorelimits. testAnomalies1 = testData_hexScores[ testData_hexScores[,1] > scoreLimits[1,2], ] testAnomalies2 = testData_hexScores[ testData_hexScores[,2] > scoreLimits[2,2], ] testAnomalies3 = testData_hexScores[ testData_hexScores[,3] > scoreLimits[3,2], ] # Convert H2O dataframes to spark dataframes. testAnomaliesS1 = as_spark_dataframe(sc, testAnomalies1) testAnomaliesS2 = as_spark_dataframe(sc, testAnomalies2) testAnomaliesS3 = as_spark_dataframe(sc, testAnomalies3) # Grouping and counting anomalies testAnomaliesS1 = testAnomaliesS1 %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(Date, Player_Name, Statistic, Variable, Value) %>% mutate(AnomCount = count()) %>% mutate(ModelNumber = "1") testAnomaliesS2 = testAnomaliesS2 %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(Date, Player_Name, Statistic, Variable, Value) %>% mutate(AnomCount = count()) %>% mutate(ModelNumber = "2") testAnomaliesS3 = testAnomaliesS3 %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(Date, Player_Name, Statistic, Variable, Value) %>% mutate(AnomCount = count()) %>% mutate(ModelNumber = "3") testAnomaliesS = sdf_bind_rows( testAnomaliesS1, testAnomaliesS2, testAnomaliesS3 ) testAnomaliesS = sdf_sort(testAnomaliesS, c("Date", "Player_Code", "Variable_Code")) testAnomsInAllModels = testAnomaliesS %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% mutate(TotalAnomalies = count()) %>% filter(TotalAnomalies==(AnomCount*3)) %>% # Filtering anomalies found in 3 models. collect() # Copy to R to create chart. # Save anomsInAllModels to pipe delimited file. write.table( testAnomsInAllModels , file = "/home/ckassab/Development/R/DataQuality/Data/testAnomsInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) # Just for reference and future study, getting anomalies not in all models. # The consideration here is that if the anomaly is present in less than 3 # models, it is more possible not to be a "real" anomaly. testAnomsNOtInAllModels = testAnomaliesS %>% group_by(Player_Code, Statistic_Code, Variable_Code, Value_Code) %>% select(ModelNumber, AnomCount, Date, Player_Name, Statistic, Variable, Value) %>% mutate(TotalAnomalies = count()) %>% filter(TotalAnomalies<(AnomCount*3)) %>% # Filtering anomalies found in less than 3 models. collect() # Copy to R to create chart. # Save testAnomsNOtInAllModels to pipe delimited file. write.table( testAnomsNOtInAllModels , file = "/home/ckassab/Development/R/DataQuality/Data/testAnomsNOtInAllModels_PGA_Tour_Golf_Data_2019_Kaggle.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) testDistinctAnomalies = testAnomsInAllModels %>% distinct(Date, Player_Code, Player_Name, Statistic, Variable_Code, Variable, Value) write.table( testDistinctAnomalies , file = "/home/ckassab/Development/R/DataQuality/Data/testDistinctAnomalies_PGA_Tour_Golf_Data_2019_Kaggle.csv" , append = FALSE, quote = TRUE, sep = "|", row.names = FALSE ) cat( "Anomalies found in testing dataset: ", dim(testDistinctAnomalies)[1] ) ## Anomalies found in testing dataset: 425 # Now we disconnect from Spark, this will result in the H2OContext being stopped as # well since it's owned by the spark shell process used by our Spark connection: spark_disconnect(sc) ## NULL ################################################################################ # If anomalies found, create chart ################################################################################ if( dim(testDistinctAnomalies)[1] > 0 ) { # Creating a time series with player codes testPlayers_xts <- xts( testDistinctAnomalies$Player_Code, order.by=as.Date(testDistinctAnomalies$Date)) # Creating a time series with variable codes testVariables_xts <- xts( testDistinctAnomalies[,5], order.by=as.Date(testDistinctAnomalies$Date)) # Binding time series. testAllAnomalies_xts <- cbind(testPlayers_xts, testVariables_xts) # Displaying the chart. anomaliesGraph = dygraph( testAllAnomalies_xts, main = '' , xlab = "Date", ylab = "Player Code." ) %>% dyAxis("y", label = "Player Code.") %>% dyAxis("y2", label = "Variable Code.", independentTicks = TRUE) %>% dySeries( name = "testPlayers_xts", label = "Player Code", drawPoints = TRUE, pointShape = "dot" , color = "blue", pointSize = 2 ) %>% dySeries( name = "Variable_Code", label = "Variable Code", drawPoints = TRUE, pointShape = "dot" , color = "green", pointSize = 2, axis = 'y2' ) %>% dyRangeSelector() dyOptions( anomaliesGraph, digitsAfterDecimal = 0 ) }
To leave a comment for the author, please follow the link and comment on their blog: R-Analytics.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.