Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Author and Project:

– Author: XAVIER CAPDEPON

– Xavier was a student of the Data Science Bootcamp#2 (B002) – Data Science, Data Mining and Machine Learning – from June 1st to August 24th 2015. Teachers: Andrew, Bryan, Jason, Sam, Vivian.

– The post is based on his “Spark, Hadoop and Parallel computing” project final submission.

——————————————————————————————————–

The entire code in Python, Spark and R for the project is available here.

## I. Project Context:

The idea for the project came from a data science web review displayed by our teacher Jason in the first month of the bootcamp. He showed us the following article called “Top 20 R package by popularity”.

Unfortunately, the title isn’t quite correct because the histogram is actually a simple count of the download of the R packages over the period from January to May 2015 and the download count of a package doesn’t always reflect the popularity of a package.

By taking a second look at the names displayed on the graph, what are the names which sound familiar to a R users ?

For the uninitiated in R, R is a statistical programming language for statistical computing and graphics. The R language is widely used among statisticians and data miners. The capabilities of R are extended through user-created packages, which, primarily, allow specialized statistical techniques and graphical devices.

The goal of this project is to come with a methodology to show the most popular R package. It will involve Python, R, and Spark parallel computation.

## II. Methodology proposed:

On one hand, the Cran website displays the R packages available in R studio on the following page.

Each package is displayed on a single page such as the “mgcv” package(see IV.A. below)

On the other hand, the CRAN package download logs are available on http://cran-logs.rstudio.com/:

The log files contain all hits to rstudio.com related to packages. The raw log files have been parsed into CSV and anonymised:

Since the daily logs provide unique ID for the R user, it is possible to regroup the packages in a list for each user on a daily basis and calculate one or more probable “root” packages within the list.

Methodology steps:

1. Packages available in R + dependency matrix construction:
1. Scrap Cran website to retrive all packages and dependency info
2. Build a Dependency matrix (+Spark parallelization)
2. Retrieve relevant info from logs and generate unique ID (date+ip)
3. Save the info from logs into SQL database (it wasn’t used)
1. Process logs with dependency matrix to extract “root” packages
1. use script (see #2)to generate unique ID to generate list of downloaded packages by unique ID
2. use matrix to simplify list generated above to get “root” packages
1. Data manipulation and Visualization

## III. Project Sizes and Challenges:

In order to distinguish the most popular packages from the most downloaded packages, it is necessary:

1. To dig into the dependency of all the packages by building a dependency package matrix ~7055 x 7055 (over 49 million items), by web-scraping about 7055 web pages from the CRAN website.
1. To use the dependency matrix to analyze the package downloaded daily logs over a period of 5 months and extract the “root” packages from the 47 million records.

Theoretically, the project isn’t quite complicated but the size of the web scraping involved, and the records to process require time and a technology such as Spark to parallelize the processes and get the results in a few hours instead of a few days using Python alone.

This project encompasses all the following technical challenges:

• Spark technology to parallelize the processes on the Linux server(s) using Linux commands.
• Specific design to write the Python script  in order to anticipate the parallel computation.
• Sparse Matrix concept using Numpy Python to write and store the dependency matrix.
• Multiple test implementations before running the scripts and collecting results on the Spark servers.
• Script to be run over night because of the web traffic during the day generates numerous errors either in http connections or in memory loads.
• The Cran website on which is based the work is constantly updating the content regarding the packages by adding, deleting or updating packages which makes the information capture and the work more difficult.

## IV. Dependency matrix construction:

### A. Data Source:

Each package is displayed on a single page such as the “mgcv” package:

#1: “Depends:” and “Imports:”

Depends:        R (≥ 2.14.0), nlme (≥ 3.1-64)

Imports:         methods, stats, graphics, Matrix

#2: “Reverse depends:” and “Reverse imports:”

Reverse depends:      bgeva, CAM, dlmap, dsm, DSsim, eHOF, fda.usc, gamlss.add, gamlss.spatial, […]

Reverse imports:       AdaptFitOS, analogue, apsimr, car, cSFM, demography, discSurv, drsmooth, […]

In this example, the “mgcv” depends of several basic package stats, methods, graphics and also nlme and Matrix. The two “reverse” corresponds to all the packages relying on the mgcv package to be run into R.

Since the basics package are not referenced by the cran website, only nlme and Matrix packages will be considered to build the mgcv package depencies. In the current example, the depency matrix will have a row of 0 except for the column indexes corresponding the packages nlme and Matrix.

All the packages corresponding to “reverse imports” and “reserve depends” have a dependency on mgcv package and they can be included to the dependency matrix by simply taking the matrix transpose.

### B. Sparse matrix concept:

The choice of the sparse matrix concept, which is not a mathematical, but a purely computational concept, is particularly powerful in the current work.

Since the matrix is made of 0 and 1 with more 0 than 1, it was much easier to use the sparse matrix concept from the Numpy package in Python to manipulate such characteristics

1. There is a significant gain of memory space to use sparse matrix
2. Given the programming functions that allow easy access and manipulation and classes that allow quick construction.

In definitive, since we only have 0 and 1 in the matrix, we just need to calculate the indexes of the 1 and regroup all the index columns and all index rows in two separate lists. Once the two lists of rows and columns are finalized, they can be combined in a sparse numpy matrix:

For later computations, the indexes of the 1 data-points can be retrieved by simply using the .nonzero( ) function available with the numpy package, which is a powerful tool in term of programming and time saved compared to a loop.

### C. Python / Spark Script to build the dependency matrix:

At first, the page https://cran.r-project.org/web/packages/available_packages_by_name.html is web-scraped using Python Beautiful Soup to extract the list of 7055 current package names and package links.

To work with this matrix and switch back and force to the package names, we just need to build a dictionary of packages name -> package index and a second dictionary of package index -> name to optimize the use of our matrix.

The package links and the two dictionaries are built and saved as text files. The following script wrote for Spark and Python was several time tested on my computer with small data sets given my configuration (6 GB of memory and 4 CPUs).

### corrected 8/23

#Before running, refresh:
# 1. list_package_url.txt (list of packages from web scrap)
# 2. and dictPackages_valueToIndex.txt (dictionnary of packages)

from pyspark import SparkContext
sc = SparkContext(master='local[10]', appName = 'learnSpark')

####################################################
### FINAL SCRIPT FOR SPARK FOR MATRIX CONSTRUCTION
####################################################
## All script to build depends/imports / Reverse matrix

import numpy as np
from bs4 import BeautifulSoup
import requests
import re
import csv

#definition of web scarping extracting function

def extract_url_content(urlv):
# Request by url and headers
demande = req.text
# Request status code
statut_demande = req.status_code
if statut_demande/100 in [4,5]:
return 'error on requests with error: ', statut_demande
return BeautifulSoup(demande)

## modification of the definition above for the purpose of Spark parallelisation

## for purpose of running through Spark, we change the structure of the sparce matrix into list of tuple
## that will be easier to deal with using Spark

# definition of list that will serve to built a sparce matrix
depends_row = []
depends_col = []
depends_data = []

Reverse_row = []
Reverse_col = []
Reverse_data = []

depends_tuple = tuple()
Reverse_tuple = tuple()
result_depends_rev = []

package_name2 = package_name.group()
package_name3 = re.sub('(https://cran.r-project.org/web/packages/)|(/index.html)','',package_name2)
#print package_name3

j=dictPackages_valueToIndex[package_name3]

# web scraping of the link using beautiful soup
#print textpackageindex.prettify

# extract all tag tr
textpackage = textpackageindex.find_all('tr')
#print textpackage

# extract text from in the tr tag
textPackageIndex = []
for part in textpackage:
textPackageIndex.append(part.get_text())
#print textPackageIndex

# determine where the 'depends' information is located:
for text in textPackageIndex:
#print 'TEXT TO BE EXAMINED', text
if re.search('(Depends)|(Imports)', text)!=None: # detect if the word "depends" is mentionned in text, if so:

# text is cleaned and the package "depends" are stored in a list
depends = re.search('(Depends:n.*)|(Imports:n.*)', text)
depends2 = depends.group()
#print depends2
depends3 = re.sub('(Depends:n)|([^)]*)| |(Imports:n)','',depends2)
depends4 = depends3.split(',')
#print depends3
#print depends4

for i in depends4:
try:
toto = dictPackages_valueToIndex[i]
depends_col.append(toto)
depends_row.append(j)
depends_data.append(1)
#print depends_col
new_items +=1
except:
pass
if len(depends_row)!=0:
depends_tuple = [((depends_row[i], depends_col[i]), depends_data[i]) for i in range(len(depends_row))]
#print depends_tuple

if re.search('(Reverse\xa0depends:n.*)|(Reverse\xa0imports:n.*)', text)!=None: # detect if the word "Reverse" is mentionned in text, if so:
# text is cleaned and the package "Imports" are stored in a list
Reverse= re.search('(Reverse\xa0depends:n.*)|(Reverse\xa0imports:n.*)', text)
Reverse2 = Reverse.group()
Reverse3 = re.sub('(Reverse\xa0depends:n)| |([^)]*)|(Reverse\xa0imports:n)','',Reverse2)
Reverse4 = Reverse3.split(',')
#print Imports3
#print Reverse4

new_items = 0
for i in Reverse4:
try:
toto = dictPackages_valueToIndex[i]
Reverse_col.append(toto)
Reverse_row.append(j)
Reverse_data.append(1)
#print Imports_col
new_items +=1
except:
pass
if len(Reverse_row)!=0:
Reverse_tuple = [((Reverse_col[i],Reverse_row[i]), Reverse_data[i]) for i in range(len(Reverse_row))]
#print Reverse_tuple

if len(depends_tuple)!=0:
result_depends_rev.append(depends_tuple)
if len(Reverse_tuple)!=0:
result_depends_rev.append(Reverse_tuple)
if len(result_depends_rev)!=0:
return [val for sublist in result_depends_rev for val in sublist]

url_package_list = [line.strip() for line in open("/home/xaviercapdepon/cran/list_package_url.txt", 'r')] #/home/xaviercapdepon/cran/list_package_url.txt

packages_url=url_package_list

import csv
dictPackages_valueToIndex = dict(x for x in reader)

## pure python loop for verification of results against spark
#for i in url_package_list[:7]:
#    print i
#    print pull_sparse_matrix_Depends_Imports_url(i)

## Spark parelleling computation
doc = sc.parallelize(url_package_list)
doc_len = doc.map(pull_sparse_matrix_Depends_Imports_url).filter(lambda x: x is not None)
a = doc_len.flatMap(lambda x:x)
a.cache()
print a.collect()

rows_depends = a.map(lambda x: x[0][0]).collect()

# save list of package url on disk in text file
with open('rows_sparse_mat', 'w'): pass ## clean file
with open('rows_sparse_mat', 'a') as f:
for item in rows_depends:
f.write("%sn" % item)

cols_depends = a.map(lambda x: x[0][1]).collect()

# save list of package url on disk in text file
with open('cols_sparse_mat', 'w'): pass ## clean file
with open('cols_sparse_mat', 'a') as f:
for item in cols_depends:
f.write("%sn" % item)

For some unexpected reasons, the script above had to be run two times:

– once with the loop “depends” detection only: it generated 14,196 indexes of both rows and columns

– once with the loop “reverse” detection only: it generated 14,458 indexes of both rows and columns

For http connection time out issues during the day, the script was run after 10pm NYT or it will run into errors.

After 30 min to 1 hour each time, the server (66 MB of memory and 12 CPUs) provides two times two lists which correspond to the sparse matrix rows and columns for future use. I estimated the processing time on my small computer to about 8 to 12 hours for each run.

The lists are merged and we can see appear for the first time the dependency matrix.

We can notice that among all the indexes generated above, they were multiple duplicates and after recalculating the entire matrix and using the non-zero( ) fonction, we can observe that only 14,435 are relevant.

There are effectively 14,435 “1”elements and numerous “0” elements in our entire 7055×7055 matrix containing a total of 49,773,025 elements.

With this matrix, a preliminary network graph would look like that (work in progress!!):

Fig 6.: the graph was generated with R by simply imputing the sparse row and column using the simpleNetwork fonction from the networkD3 package. As expected, the packages Lattice, Matrix and MASS are among the most reverse dependent packages.

Once the dependency matrix is built, we now need to download all the CRAN package download logs for the period of January to May 2015 like the original article.

Since there is 150 days or 150 files, we can also use the Spark parallel computation to download the files more rapidly on the server for future use.

The page ‘http://cran-logs.rstudio.com/’ contains all the package logs. Once fully webscrapt, we can isolate only the logs file path that are interesting to us in a txt file that we will call url_log_full_paths.

# Before running, update url_log_full_paths file.

from pyspark import SparkContext
sc = SparkContext(master='local[7]', appName = 'learnSpark')

####################################################
####################################################

import numpy as np
from bs4 import BeautifulSoup
import requests
import re
import csv

import urllib

path_with_log_name = './data/'+ re.sub('(http://cran-logs.rstudio.com/[0-9]+/)','',url_log)
urllib.urlretrieve(url_log, path_with_log_name)

url_log_full_paths = [line.strip() for line in open("url_log_full_paths.txt", 'r')]

print 'Spark started'

doc_log= sc.parallelize(url_log_full_paths)  #1004:1054

print 'Spark finished'

## VI. Process logs with dependency matrix to extract “root” packages:

At first, I was going to pre-screen all the records in the logs and store them in a SQL database as shown in the python script available on github but because of the time pressure to deliver on this project and the constraints due to the parallel computation and the time of processing, I decided to directly process the logs and aggregate the results in one script to gain in term of memory and space.

All the results are saved with integers that correspond to the package indexes defined earlier in the two dictionaries in order to lower the result file size which is already around 100MB.

I did a long and fastidious phase of testing on my local computer to try to access the accuracy of methodology and the accuracy of the script and came with the following final Spark script.

The idea for the final script is to generate a unique ID composed of the day tag and IP_ID tag and then for every unique ID, extract a set (in order to remove duplicates) of all the packages that have been downloaded by this single user. Then, for each items in the downloaded packages set, we look at all the dependent packages using the dependency matrix and subtract them from the downloaded package set. At the end of the process, only remain the “root” packages.

the Inputs of the script are: a.the sparse matrix rows and columns text files, b. the dictionary of the package and c. the list of logs from January to May 2105

the Output of the script are: a. a single list corresponding to the root and b. a list of the number of line items in each log in order to count them.

# Before running, need to update:
# 1. rows_matrix_simplified
#  + cols_matrix_simplified
# 2. dictPackages_valueToIndex
# 3. url_log_paths_jan_may_2015

###############################################################################################################################
### FINAL SPARK SCRIPT CODE TO REMOVE IMPORTED PACKAGES
###############################################################################################################################

from pyspark import SparkContext
sc = SparkContext(master='local[10]', appName = 'learnSpark')

import gzip
from time import strftime
from bs4 import BeautifulSoup
import numpy as np
import pandas as pd
import requests
import re

def extract_info_from_logs(filename):
with gzip.open(filename, 'rb') as f:

log_content['date_str']= pd.to_datetime(log_content.date).apply(lambda x: x.strftime('%Y%m%d')).map(str)
log_content['ip_id'] = log_content.ip_id.map(str).map(lambda x: x.zfill(6))
log_content['ip_date_id'] = log_content.date_str.map(str)+log_content.ip_id.map(str)

logs_db = log_content[['package','ip_date_id']]
return logs_db

def extract_package_id_after_mat_process(filename):

logs_db = extract_info_from_logs(filename)

## MATRIX DEFINITION
## definition of the sparse matrix
rows_matrix = [line.strip() for line in open("/home/xaviercapdepon/cran/rows_matrix_simplified", 'r')] #/home/xaviercapdepon/cran/rows_depends
cols_matrix = [line.strip() for line in open("/home/xaviercapdepon/cran/cols_matrix_simplified ", 'r')] #/home/xaviercapdepon/cran/cols_depends
#print len(rows_matrix)==len(cols_matrix)
data_matrix = np.ones(len(rows_matrix))
#print len(rows_matrix)==len(data_matrix)

## need the full dimension of the matrix by looking at the number of packages available
import csv
dictPackages_valueToIndex = dict(x for x in reader)
total_nb_packages = len(dictPackages_valueToIndex)

## regrouping all info together to form the matrix
from scipy.sparse import coo_matrix
depends_matrix = coo_matrix((data_matrix, (rows_matrix, cols_matrix)), shape=(total_nb_packages,total_nb_packages)).toarray()
depends_matrix

## from log file
## constitue array of unique IP
unique_IP = logs_db.ip_date_id.unique()
#print unique_IP

for IP in unique_IP:
## contitute set package for each unique IP
packages_name_set_by_IP = logs_db.package[logs_db.ip_date_id==IP]
#print packages_name_set_by_IP

packages_id_set_by_IP = []
## change the package names into id
for i in packages_name_set_by_IP:
try:     ## the two loops try/excepts are implemented to take care of the packages that are not referenced anymore by CrAN
individual_packages_id_set_by_IP = int(dictPackages_valueToIndex[i])
packages_id_set_by_IP.append(individual_packages_id_set_by_IP)
except:
packages_id_set_by_IP.append(i)
pass

packages_id_set_by_IP = set(packages_id_set_by_IP)
## column index of packages to be imported for a given package
# a[x].nonzero()[0] where x is a package id

# define a final set of package id that will be
packages_id_set_by_IP_final = packages_id_set_by_IP

for package_id in packages_id_set_by_IP:
try:  ## the two loops try/excepts are implemented to take care of the packages that are not referenced anymore by CRAN
toto = set(depends_matrix[package_id].nonzero()[0])
packages_id_set_by_IP_final = set(list(packages_id_set_by_IP_final - toto))
except:
pass

#for file_name in ['2012-10-02.csv.gz','2012-10-03.csv.gz','2012-10-04.csv.gz']:
#    print extract_package_id_after_mat_process(file_name)

list_of_files = [line.strip() for line in open("/home/xaviercapdepon/cran/url_log_paths_jan_may_2015", 'r')]
list_of_files = ['/data1/crandata/'+ i for i in list_of_files]

doc_log = sc.parallelize(list_of_files)
doc_len = doc_log.map(extract_package_id_after_mat_process)#.filter(lambda x: x is not None)
#print doc_len.collect()

packages_roots = doc_len.map(lambda x: x[0]).flatMap(lambda x:x).collect()
#print packages_roots

# save list of package url on disk in text file
with open('packages_roots', 'w'): pass ## clean file
with open('packages_roots', 'a') as f:
for item in packages_roots:
f.write("%sn" % item)

# save list of package url on disk in text file
f.write("%sn" % item)

Despite the absence of http connection in the script, it was quicker to run the script over night when nobody was using the server. Curiously, the first file of 100MB was generated after 5 hours and the second file of 500KB after 10 hours without any obvious reasons.

The second file show that the script went over 46,944,117 packages downloaded by users ffrom January to May 2015.

The first file contains the list of 18,716,714 “root” packages which would means 63% of the packages are downloaded in support of “root” packages.

## VII. Results and Visualization

I realized soon after the runs that the server had only one node and that all the results are actually ordered in the same one than the original due to the server configuration. The advantage is that I can use the data as is to draw the daily download on the period of January to May 2015:

It can also be noticed that the logs from 2012 are recording about 50,000 line items compared to the 300,000 average in the first semester of 2015 showing the increasing popularity of R among programmers.

### B. Most popular R packages

Finally, the most popular R packages graph would look like that:

The results look quite different from the original blog post and we can see appearing some packages such as RMySQL, RSQLite, RJSONIO that have most likely been the packages that users intended to download.

### C. The “CRAC” package:

Following a question from Alex, my classmate, I took a closer look at the package “CRAC”, about cosmological research, developed by our excellent teacher Jason.

According to the script written, the “CRAC” has been downloaded 1,423 times between January and May 2015 and it is ranked 1,170th according to the present methodology.

## VIII. Conclusion and remarks

The design took me longer than I was expecting and the testing took me way more time than I had anticipated. Following Jason’s advise at the beginning of the bootcamp, I started writing pieces of codes for the different parts of the projects when I had to think about one in particular and it actually helps a lot in term of time and design as long as I knew where I am going. One aspect that I did not take into account at the beginning of the testing is the constant changes made on the CRAN website that disturb the scraping and the calculation if they are not done exactly at the same and require to constantly update the entire python page.

The results are not what I was expecting as I was expecting to see more common packages such as ggplot or plyr but they are actually used by other packages a lot, so their download is often induced by other packages.

I am still a little skeptical about the final graph and in writing the script, I realized that my methodology could not be accurate at 100% for two different reasons:

1. the old packages are not referenced anymore as the CRAN website is contently changing, so the script is making an exception for these packages but they will make the results slightly different from the reality.

2. the R users are all over the world and their time zone may differ from the UTC day base from the logs. if they download packages between 11pm on one day and 1am on the following day, their unique ID will change from the first day to the second one and will provide a small error in the end results.

This work is still in progress, among which the design of the network for the package dependency and I would like to hear if you have any comments, suggestions or corrections on my work.

## Thank you:

A particular thank you to our teacher Jason for all his advise, correction and patience during this project.

Thanks to Sam, our teacher, for all his theoretical explanation and advise about the importance of the storage compression when using parallel computing.

Thanks to Bryan, our teacher, for his last minute debugging advise in the evening before the presentation.

Thanks again to Shu, my classmate, for his support and for his idea regarding network graphs during the late night before the final presentation.