# Parallel processing simulations ################################################################################ # Simulate data options(width = 60) #Change width of output in R to help format my notes #Settings mu <- 2.713333 sigma <- 2.195581 sigma^2 n <- 9 num.sim <- 500 #Number of simulations - do not use R here because R is number of resamples in boot section #Simulate data all at once set.seed(9811) y.sim <- matrix(data = rnorm(n = n*num.sim, mean = mu, sd = sigma), nrow = num.sim, ncol = n) y.sim[1,] #Sample #1 var(y.sim[1,]) #t_1 ################################################################################ # PP #1 per.core1 <- function(X, alpha, R) { library(boot) calc.t2 <- function(data, i) { d2 <- data[i] var(d2) } calc.t <- function(data, i) { d <- data[i] n <- length(d) l.jack <- empinf(data = d, statistic = calc.t2, stype = "i", type = "jack") v.jack <- var.linear(L = l.jack) c(var(d), v.jack) } sim.func <- function(y, alpha, R) { n <- length(y) t <- var(y) normal.based <- (n - 1)*t / qchisq(p = c(1-alpha/2, alpha/2), df = n - 1) mu.hat4 <- 1/n*sum((y - mean(y))^4) asym <- t + qnorm(p = c(alpha/2, 1-alpha/2))*sqrt((mu.hat4 - t^2)/n) boot.res <- boot(data = y, statistic = calc.t, R = R, sim = "ordinary") save.int <- boot.ci(boot.out = boot.res, conf = 1-alpha, type = "all") basic <- c(save.int$basic[4], save.int$basic[5]) percentile <- c(save.int$perc[4], save.int$perc[5]) bca <- c(save.int$bca[4], save.int$bca[5]) student <- c(save.int$student[4], save.int$student[5]) c(normal.based, asym, basic, percentile, bca, student) } save.int<-t(apply(X = X, MARGIN = 1, FUN = sim.func, alpha, R)) save.int } library(parallel) #TESTING start.time<-proc.time() cl <- makeCluster(2) clusterSetRNGStream(cl = cl, 8881) sim.intervals.PP <- parLapply(cl = cl, X = list(y.sim[1:2,], y.sim[3:4,]), fun = per.core1, alpha = 0.05, R = 1999) save.all<-do.call(what = rbind, args = sim.intervals.PP) save.all stopCluster(cl) end.time<-proc.time() save.time<-end.time-start.time cat("\n Number of minutes running:", save.time[3]/60, "\n \n") ################################################################################ # PP #2 - perhaps a little better because most of the code for each core is an external program per.core2 <- function(my.data, alpha, R) { setwd(dir = "C:\\chris\\unl\\Dropbox\\NEW\\STAT_computing\\ParallelProcessing") source(file = "Run_on_each_core.R") save.int<-t(apply(X = my.data, MARGIN = 1, FUN = sim.func, alpha = alpha, R = R)) #Oddly, this code will not work if it is in the source program save.int } #Test it per.core2(my.data = y.sim[1:10,], alpha = 0.05, R = 1999) #TESTING start.time<-proc.time() cl <- makeCluster(2) clusterSetRNGStream(cl = cl, 8881) sim.intervals.PP <- parLapply(cl = cl, X = list(y.sim[1:2,], y.sim[3:4,]), fun = per.core2, alpha = 0.05, R = 1999) save.all<-do.call(what = rbind, args = sim.intervals.PP) head(save.all) stopCluster(cl) end.time<-proc.time() save.time<-end.time-start.time cat("\n Number of minutes running:", save.time[3]/60, "\n \n") #ALL 500 start.time<-proc.time() cl <- makeCluster(2) clusterSetRNGStream(cl = cl, 8881) sim.intervals.PP <- parLapply(cl = cl, X = list(y.sim[1:250,], y.sim[251:500,]), fun = per.core2, alpha = 0.05, R = 1999) save.all<-do.call(what = rbind, args = sim.intervals.PP) head(save.all) stopCluster(cl) end.time<-proc.time() save.time<-end.time-start.time cat("\n Number of minutes running:", save.time[3]/60, "\n \n") #NOTE: NO WARNING MESSAGES BECAUSE ON OTHER CORES #ALL 500 start.time<-proc.time() cl <- makeCluster(3) clusterSetRNGStream(cl = cl, 8881) sim.intervals.PP <- parLapply(cl = cl, X = list(y.sim[1:166,], y.sim[167:333,], y.sim[334:500,]), fun = per.core2, alpha = 0.05, R = 1999) save.all<-do.call(what = rbind, args = sim.intervals.PP) head(save.all) stopCluster(cl) end.time<-proc.time() save.time<-end.time-start.time cat("\n Number of minutes running:", save.time[3]/60, "\n \n") #ALL 500 start.time<-proc.time() cl <- makeCluster(4) clusterSetRNGStream(cl = cl, 8881) sim.intervals.PP <- parLapply(cl = cl, X = list(y.sim[1:125,], y.sim[126:250,], y.sim[251:375,], y.sim[376:500,]), fun = per.core2, alpha = 0.05, R = 1999) save.all<-do.call(what = rbind, args = sim.intervals.PP) head(save.all) stopCluster(cl) end.time<-proc.time() save.time<-end.time-start.time cat("\n Number of minutes running:", save.time[3]/60, "\n \n") ################################################################################ # PP #3 - can not get to work :( per.core3 <- function(my.data) { setwd(dir = "C:\\chris\\unl\\Dropbox\\NEW\\STAT_computing\\ParallelProcessing") source(file = "Run_on_each_core1.R") save.int<-t(apply(X = my.data, MARGIN = 1, FUN = sim.func, alpha = alpha, R = R)) save.int } cl <- makeCluster(2) clusterSetRNGStream(cl = cl, 8881) save.it<-parApply(cl = cl, X = y.sim[1:4,], MARGIN = 1, FUN = per.core3) head(save.it) stopCluster(cl) ################################################################################ # Summarize summarize <- function(low.up, sigma.sq) { true.conf <- mean(ifelse(test = sigma.sq > low.up[,1], yes = ifelse(test = sigma.sq < low.up[,2], yes = 1, no = 0), no = 0), na.rm = TRUE) exp.length <- mean(low.up[,2] - low.up[,1], na.rm = TRUE) exclude <- sum(is.na(low.up[,1])|is.na(low.up[,2])) data.frame(true.conf, exp.length, exclude) } normal.based <- summarize(low.up = save.all[,1:2], sigma.sq = sigma^2) asym <- summarize(low.up = save.all[,3:4], sigma.sq = sigma^2) basic <- summarize(low.up = save.all[,5:6], sigma.sq = sigma^2) percentile <- summarize(low.up = save.all[,7:8], sigma.sq = sigma^2) bca <- summarize(low.up = save.all[,9:10], sigma.sq = sigma^2) student <- summarize(low.up = save.all[,11:12], sigma.sq = sigma^2) interval.names <- c("Normal", "Asymptotic", "Basic", "Percentile", "BCa", "Student") data.frame(interval = interval.names, rbind(normal.based, asym, basic, percentile, bca, student)) ############################################################################### # Benchmark plot #Surface time.run<-c(13.56, 6.64, 5.4155, 5.150667) cores<-c(1, 2, 3, 4) #Dell time.run<-c(11.295, 6.254667, 3.964667, 3.259333, 2.668667, 2.338667) cores<-c(1, 2, 3, 4, 5, 6) plot(x = cores, y = time.run, main = "Comparisons of time", type = "o", col = "red", ylim = c(0, max(time.run)), ylab = "Time (minutes)", xlab = "Number of cores requested", panel.first = grid()) ################################################################################ # PP with the foreach package library(boot) calc.t2 <- function(data, i) { d2 <- data[i] var(d2) } calc.t <- function(data, i) { d <- data[i] n <- length(d) l.jack <- empinf(data = d, statistic = calc.t2, stype = "i", type = "jack") v.jack <- var.linear(L = l.jack) c(var(d), v.jack) } sim.func <- function(y, alpha, R) { n <- length(y) t <- var(y) normal.based <- (n - 1)*t / qchisq(p = c(1-alpha/2, alpha/2), df = n - 1) mu.hat4 <- 1/n*sum((y - mean(y))^4) asym <- t + qnorm(p = c(alpha/2, 1-alpha/2))*sqrt((mu.hat4 - t^2)/n) boot.res <- boot(data = y, statistic = calc.t, R = R, sim = "ordinary") save.int <- boot.ci(boot.out = boot.res, conf = 1-alpha, type = "all") basic <- c(save.int$basic[4], save.int$basic[5]) percentile <- c(save.int$perc[4], save.int$perc[5]) bca <- c(save.int$bca[4], save.int$bca[5]) student <- c(save.int$student[4], save.int$student[5]) c(normal.based, asym, basic, percentile, bca, student) } library(foreach) library(doParallel) cl<-makeCluster(spec = 2) #Don't need to load parallel package because doParallel does it registerDoParallel(cl = cl) start.time<-proc.time() clusterSetRNGStream(cl = cl, iseed = 9182) #Multiple streams of seeds #save.all2<-foreach(i = 1:4, .combine = rbind, .packages = "boot") %dopar% { # sim.func(y = y.sim[i,]) #} save.all2<-foreach(i = 1:500, .combine = rbind, .packages = "boot") %dopar% { sim.func(y = y.sim[i,]) } stopCluster(cl) end.time<-proc.time() save.time<-end.time-start.time cat("\n Number of minutes running:", save.time[3]/60, "\n \n") head(save.all2) normal.based <- summarize(low.up = save.all2[,1:2], sigma.sq = sigma^2) asym <- summarize(low.up = save.all2[,3:4], sigma.sq = sigma^2) basic <- summarize(low.up = save.all2[,5:6], sigma.sq = sigma^2) percentile <- summarize(low.up = save.all2[,7:8], sigma.sq = sigma^2) bca <- summarize(low.up = save.all2[,9:10], sigma.sq = sigma^2) student <- summarize(low.up = save.all2[,11:12], sigma.sq = sigma^2) interval.names <- c("Normal", "Asymptotic", "Basic", "Percentile", "BCa", "Student") data.frame(interval = interval.names, rbind(normal.based, asym, basic, percentile, bca, student)) getDoParWorkers() #Check number of workers #