Snook,
I wrote the function that can do median chat from RMark. As far as median chat is natural for parallel estimation use of these functions will be beneficial with many cores. I use 24 cores and it becomes really fast.
It has following limitations:
1. It works with Multistrata models only
2. It does not work with groups.
3. Watch your the RAM usage - RAM is not shared so every node will take part of it.
So the code is following:
- Code: Select all
## ver 0.4 from Nov 1, 2011
## 0.4 change Nov 1, 2011
## - changed function to: median.chat.parallel() and finalize.median.chat() to add port for extract.median.chat.output()
## - added list output and nice plot
## - finished error handler - now it will exclude failed model and will not fail BUT - will NOT let you know about it..
## 0.3 change - first version of error handling added
median.chat.parallel<-function(model, data, ddl, wd=getwd(), test.wd.name="median.chat", number.of.points=10, number.of.replicates=10, lower.bound=1, upper.bound=NA, external=T, parallel.R=F, parallel.mark=T, cpus=7, run=T, prefix="sim.model.", threads=1) {
cat("median chat parallel ver0.4 Nov, 1\n")
cat("PS do not use parallel.R=T as it need addition of random.seed handling between nodes\n")
if (any(list.files(getwd())[file.info(list.files(getwd()))$isdir]==test.wd.name)) stop(test.wd.name, " directory already exists, try another test.wd.name\n")
est.chat<-model$results$deviance/model$results$deviance.df
if(is.na(upper.bound)) upper.bound<-est.chat
cat ("estimated chat in the model is: ", est.chat, "\n")
if (est.chat<1) stop("my function or median chat test in general cannot work with highly underdispersed data\n")
chat.vec<-sort(rep((round(seq(lower.bound, upper.bound, length.out=number.of.points)*1000)/1000), number.of.replicates))
### create data.vec
data.list<-simulate.mstrata(model=model, orig.data=data, chat=chat.vec, parallel=parallel.R, cpus=cpus)
new.wd<-test.wd.name
dir.create(new.wd)
## ok now we can go for model.list creation
list.of.model.par<-lapply(data.list, FUN=function(x) {y=list(data=x, model.parameters=model$model.parameters, ddl=ddl, initial=model$results$beta[,1], wd=new.wd, parallel=parallel.R, run=T, invisible=FALSE, external=external, silent=FALSE, treads=threads); return(y)})
for (i in 1:length(list.of.model.par)) {
list.of.model.par[[i]]$model.name<-"model"
list.of.model.par[[i]]$filename<-paste("prefix", i, sep="")
}
if (parallel.mark) {
library(snowfall)
sfInit(parallel=parallel.mark, cpus=cpus)
cat("running models in parallel")
sfExport(list=c("make.run.mark.model.apply.int"), local=F)
list.of.models<-sfClusterApplyLB(list.of.model.par, make.run.mark.model.apply.int)
cat(" Done!\n")
#### here I am tring to exclude all possible errors
list.of.models<-list.of.models[!sapply(list.of.models, "[[", 1)=="error"]
if(length(list.of.models)==0) stop("all models failed to run\n")
## this will hopefully help
sfStop()
} else list.of.models<-lapply(list.of.model.par, make.run.mark.model.apply.int)
if(run) {
## estimate chat for each of the models:
if (external) {
main.model<-model # rename model object as models will be loaded with name = model
old.wd<-getwd()
setwd(new.wd)
chat.out<-data.frame(chat.in=NA, chat.est=NA, above=NA)
for (i in 1:length(list.of.models)) {
load(list.of.models[[i]])
chat.out[i,1]<-model$data$chat
chat.out[i,2]<-model$results$deviance/model$results$deviance.df
chat.out[i,3]<-ifelse((model$results$deviance/model$results$deviance.df)>(main.model$results$deviance/main.model$results$deviance.df), 1, 0)
rm(model)
}
setwd(old.wd)
model<-main.model
} else chat.out<-as.data.frame(t(rbind(sapply(list.of.models, function(x) cbind(chat.in=x$data$chat, chat.est=x$results$deviance/x$results$deviance.df, above=ifelse((x$results$deviance/x$results$deviance.df)>(model$results$deviance/model$results$deviance.df), 1, 0))))))
save(chat.out, file="chat.simulation.results.RData")
finalize.median.chat(chat.out, model) # this function willplot all the results
} else return(list.of.model.par)
}
finalize.median.chat<-function(chat.out=chat.out, model=model) {
plot(chat.out[,2]~chat.out[,1], type="n", xlim=c(min(chat.out[,1]-1),max(chat.out[,1]+1)), , xlab="simulated chat", ylab="deviance chat")
bxp(boxplot(chat.out[,2]~chat.out[,1], plot=F), at=unique(chat.out[,1]), show.names=F, add=T)
abline(h=model$results$deviance/model$results$deviance.df)
abline(v=1, lty=2, col="red")
abline(v=3, lty=2, col="red")
counts<-table(chat.out[,c(1,3)])
if (ncol(counts) <2)
stop("all of the simulated chats are ", ifelse((dimnames(counts)[[1]])=="0", "below", "above"), " range of estimated chat\n change simulation limits and try again\n")
dimnames(counts)<-list(simulated.chat=dimnames(counts)[[1]], count=c("below observed", "above.observed"))
print(counts)
proportions<-prop.table(table(chat.out[,c(1,3)]), margin=1) # here I get proportions
dimnames(proportions)<-list(simulated.chat=dimnames(proportions)[[1]], proportion=c("below observed", "above.observed"))
print(proportions)
num.points.used<-length(which(!proportions[,1] == "0" & !proportions[,1] == "1"))
if (num.points.used<=2) { warning("change input data to produce more meaningful points\n")
model.chat=NA
} else {
cat("estimation is based on", num.points.used, "chat values\n")
library(boot)
regr.model<-glm(counts~as.numeric(rownames(proportions)), family=binomial)
model.chat<-(inv.logit(0.5)-regr.model$coefficients[1])/regr.model$coefficients[2]
cat("estimated model chat is", model.chat, "\n")
abline(v=model.chat, lwd=2)
}
output<-list(model.chat=model.chat, chat.out=chat.out, counts=counts, proportions=proportions)
return(output)
}
# this function is exact copy of the one that I have in mark.wrapper.parallel
make.run.mark.model.apply.int<-function(x) {
old.wd<-getwd()
#require(RMark)
run=as.logical(as.character(x[["run"]]))
x<-x[-which(names(x)=="run")]
if (any(names(x)=="silent")) {
silent=as.logical(as.character(x[["silent"]]))
x<-x[-which(names(x)=="silent")]
}
wd=x[["wd"]]
x<-x[-which(names(x)=="wd")]
setwd(as.character(wd))
parallel=as.logical(as.character(x[["parallel"]]))
x<-x[-which(names(x)=="parallel")]
if(run) {
mymodel<-try(do.call(mark, x), silent=silent)
if(class(mymodel)[1]=="try-error") mymodel<-"error"
if (length(mymodel)>1)
if(is.null(mymodel$results))
mymodel<-"error"
}
else { names(x)[names(x)=="model.parameters"]<-"parameters"
mymodel<-try(do.call(make.mark.model, x),silent=silent) }
setwd(old.wd)
return(mymodel)
}
# test: median.chat.parallel(model, data, ddl, upper.bound=1.1, test.wd.name="median.chat.1", external=F, parallel=T)
Also the follwoing part is needed to simulate new data:
- Code: Select all
## Part 1.
## creating SPsi and p cubes
create.cubes.mstrata<-function(model) {
# ver 0.2 from Sep 6, 2001
call <- match.call()
require(abind)
s.list=get.real(model,"S")
psi.list<-get.real(model,"Psi")
from.strata<-unlist(lapply(psi.list, "[", i=2))
to.strata<-lapply(psi.list, "[", i=3)
alive.array<-simplify2array(sapply(s.list, "[", i=1))
dead.array<-1-alive.array
create.cube.internal<-function(strata) {
strata.array.temp.1<-simplify2array(sapply(psi.list[from.strata==strata], "[", i=1))
dimnames(strata.array.temp.1)[[3]]<-unlist(sapply(psi.list[from.strata==strata], "[", i=3))
## now we need to add subtract.stratum dimname
subtract.stratum.number<-which(!unique(from.strata) %in% dimnames(strata.array.temp.1)[[3]])
## now we need to generate additional layer for A to A transition
## 1-apply(z, c(1,2), FUN=sum) ## this a new layer and we need to add it to array
strata.array.temp.2<-abind(strata.array.temp.1, 1-apply(strata.array.temp.1, c(1,2), FUN=sum), along=3)
dimnames(strata.array.temp.2)[[3]][dimnames(strata.array.temp.2)[[3]]==""]<-subtract.stratum.number # here I add substract.str.number to new array dimnames
#### now we need to sort array by 3 dimension
strata.array.temp.3<-strata.array.temp.2[,,order(dimnames(strata.array.temp.2)[[3]])]
## ok, to normalize Psi by S we need to multiply Psi array by S array
strata.array.temp.4<- apply(strata.array.temp.3, 3, FUN=function(x, y) x*y, y=alive.array[,,strata])
temp1<-strata.array.temp.3*0
strata.array.temp.5<-temp1+as.vector(strata.array.temp.4)
#### ok, now we have a matrix of normalized probabilities for layer A
## we can add a layer of survival
strata.array.temp.6<-abind(dead.array[,,1], strata.array.temp.5, along=3)
dimnames(strata.array.temp.6)[[3]][dimnames(strata.array.temp.6)[[3]]==""]<-"0" # here I add 0 as a label for S
return(strata.array.temp.6)
# now we have all of the dimensions with correct names...
# > dimnames(strata.array.temp.6)
#[[1]] year
#[[2]] cohort
#[[3]] to layer [1] "0" "1" "2" "3" "4" "5" "6" "7"
}
# now we need to run this function with apply for all stratas
# in such way, that 4dimension will be from stratum..
big.cube.1<- abind(lapply(unique(from.strata), FUN=create.cube.internal), along=4)
# now we need to create correct dimension name for the 4 dim in the cube..
dimnames(big.cube.1)[[4]]<-unique(from.strata)
# ok, we need to create an additional layer for dead individuals
# we can copy 1 layer from dimension 1 - then set up 0 for all except s to s
dead.cube<-big.cube.1[,,,1]*0
dead.cube[,,1]<-1
big.cube.2<-abind(big.cube.1, dead.cube, along=4)
dimnames(big.cube.2)[[4]][dimnames(big.cube.2)[[4]]==""]<-"0"
big.cube.3<-big.cube.2[,,,order(dimnames(big.cube.2)[[4]])]
SPsi.cube<-aperm(big.cube.3, c(4,3,1,2)) # final is: from, to, cohort, year.
#################### p.cube#########################
p.list<-get.real(model, "p")
## it seems that this cube will be pretty simple to create
## p.cube[strata,cohort, year]
p.cube.1<-simplify2array(sapply(p.list, "[", i=1))
dimnames(p.cube.1)[[3]]<-unlist(sapply(p.list, "[", i=2))
p.cube.2<-aperm(p.cube.1, c(3,1,2))
## now we need to add 0 level for dead strata and order by first dimension
p.cube.3<-abind(p.cube.2, p.cube.2[1,,]*0, along=1)
dimnames(p.cube.3)[[1]][dimnames(p.cube.3)[[1]]==""]<-"0"
p.cube<-p.cube.3[order(dimnames(p.cube.3)[[1]]),,]
## it seems that the easiest way to solve our proble would be to add as additional
model$cubes.for.sim<-list(SPsi.cube=SPsi.cube, p.cube=p.cube)
assign(as.character(call[-1]), model, envir = parent.frame())
}
#####################################################################
## stage 2
## simulator
simulator.mstrata.int<-function(model, orig.data, chat=chat) {## we do assume that there are cubes in model object
## ver 0.2 from Sep 6, 2011
## 0.2 changes - chat option was added
## 1. create an input DF with columns: in.strata, in.time, cohort.
## let's take data file and extract data
starting.years<-apply(orig.data$data, 1, function(x) dimnames(model$cubes.for.sim$SPsi.cube)[[4]][which(!unlist(strsplit(x[1], ""))=="0")[1]])
starting.stratas<-apply(orig.data$data, 1, function(x) unlist(strsplit(x[1], ""))[which(!unlist(strsplit(x[1], ""))=="0")[1]])
orig.start<-data.frame(starting.years, starting.stratas,freq=orig.data$data$freq )
## now we need to multiply regarding to numbers in $data$freq
orig.start<-matrix(unlist(apply(orig.start, 1, FUN=function(x) rep.int(x, times=as.numeric(x[3])))), ncol=3, byrow=T)
orig.start<-orig.start[,-3]
dimnames(orig.start)[[2]]<-c("start.time","start.strata")
####################################################################
## 2. create an empty matrix for the result and create starting point for each individual
sim.input<-t(apply(orig.start, 1, FUN=function(x) rbind(x[1], which(model$strata.labels==x[2]))))
dimnames(sim.input)[[2]]<-c("start.time", "start.strata")
all.time<-unique(c(dimnames(model$cubes.for.sim$SPsi.cube)[[4]],dimnames(model$cubes.for.sim$p.cube)[[3]])) ##
vec<-as.array(rep(0, length(all.time)))
dimnames(vec)<-list(all.time)
all.res<-t(apply(sim.input, 1, FUN=function(x) {vec[x["start.time"]]=x["start.strata"]; return(vec)}))
## 4
## simulation with apply for each turn
simulator.1<-function(x) {
out.1<-cumsum(model$cubes.for.sim$SPsi.cube[as.character(x["stratas"]),,x["cohorts"],dimnames(all.res)[[2]][i]])
out.2<-names(out.1)[which.max(1/(out.1-as.numeric(x["sim.probs"])))]
return(out.2)
}
for (i in 1:(length(dimnames(all.res)[[2]])-1)) {
stratas<- all.res[,i][!all.res[,i]=="0"]
cohorts<- sim.input[,1][!all.res[,i]=="0"]
sim.probs<-runif(length(stratas))
in.data<-data.frame(stratas, cohorts, sim.probs)
all.res[,(i+1)][!all.res[,i]=="0"]<-unlist(apply(in.data, 1, FUN=simulator.1))
}
all.cohorts<-array(rep(sim.input[,1],dim(all.res)[2] ), dim=dim(all.res))
all.years<-t(matrix(rep(dimnames(all.res)[[2]],dim(all.res)[1]), nrow=ncol(all.res)))
all.res.array<-abind(all.res, all.cohorts, all.years, along=3)
all.res.array.temp<-all.res.array[,-1,] ## here is the problem - we do not have p fo rthe first year
all.res.array.temp.1<-apply(all.res.array.temp, c(1,2), FUN=function(x) ceiling(runif(1)-model$cubes.for.sim$p.cube[as.character(x[1]),as.character(x[2]),as.character(x[3])]))
all.res.array.temp.2<-abind((all.res.array.temp.1[,1]*NA), all.res.array.temp.1, along=2)
all.res.array.temp.2[is.na(all.res.array.temp.2)]<-1
all.fin<-array(as.vector(as.numeric(all.res))*as.vector(as.numeric(all.res.array.temp.2)), dim=dim(all.res), dimnames=dimnames(all.res))
########ok, done
# now we need to change numbers by letters
all.non.zero<-all.fin[all.fin>0]
all.fin[all.fin>0]<-sapply(all.fin[all.fin>0], FUN=function(x) model$strata.labels[x])
histories<-apply(all.fin, 1, FUN=function(x) paste(x, collapse=""))
##########
if (!chat==1) {
### changing number of individuals to create overdispersed dataset
mult<-floor(chat)
res<-chat-mult
histories<-c(rep(histories, mult), histories[runif(length(histories))<=res])
}
enc.hist<-data.frame(ch=histories, stringsAsFactors=F)
enc.hist$freq<-rep(1, nrow(enc.hist))
enc.hist.1<-aggregate(enc.hist[,2], by=list(enc.hist[,1]), FUN=sum)
names(enc.hist.1)<-c("ch", "freq")
enc.hist.1$ch<-as.character(enc.hist.1$ch)
##
sim.pr.data<-process.data(data=enc.hist.1, begin.time = orig.data$begin.time, model = orig.data$model, mixtures = orig.data$mixtures, groups = orig.data$groups, age.var = orig.data$age.var, initial.ages = orig.data$initial.ages ,age.unit = orig.data$age.unit, time.intervals = orig.data$time.intervals, nocc= orig.data$nocc, strata.labels= orig.data$strata.labels, counts= orig.data$counts, allgroups=TRUE)
## I am not sure about allgroups=TRUE - maybe need return to false
sim.pr.data$chat<-chat
return(sim.pr.data)
}
##################
## 3 - wrapper around first two
simulate.mstrata<-function(model, orig.data, number.of.simulations=NULL, chat=NULL) {
cat("version 0.2 from Sept 6, 2011\n")
require(RMark)
if (!"Multistrata" %in% class(model)) stop("dunction works only with multistata models\n")
if ( model$number.of.groups >1) stop("current version does not support groups, though you are welcome to change the code\n")
if (!length(names(model)[names(model)=="cubes.for.sim"])==1) create.cubes.mstrata(model)
if (is.null(number.of.simulations) & is.null(chat)) {number.of.simulations=1; chat=1}
if (is.null(number.of.simulations) & !is.null(chat)) number.of.simulations=length(chat)
if (!is.null(number.of.simulations) & is.null(chat)) chat=1
if (!number.of.simulations==length(chat))
if (length(chat)==1) {chat=rep(chat, length=number.of.simulations)
} else stop("you asked for ", length(number.of.simulations), "simulations and length of chat vactor was ", length(chat), "\n please, you need to specify only one of parameters or make chat length equal to number of simulations\n")
cat(" simulating ", number.of.simulations, "data sets with chats", chat )
out<-lapply(1:number.of.simulations, FUN=function(x) simulator.mstrata.int(model, orig.data, chat[x]))
cat(" ...Done\n")
return(out)
}
# example:
# q<-simulate.mstrata(model=S.time.plus.ageclass.p.fixed.Psi.stratum.tostratum.ageclass.distance, orig.data=orig.dat6str.proc, chat=c(0.3, 1, 3.3,10))
# str(q)
# or
# t<-simulate.mstrata(model=S.time.plus.ageclass.p.fixed.Psi.stratum.tostratum.ageclass.distance, orig.data=orig.dat6str.proc,number.of.simulations=3)
# str(t)
Eldar