multithreading - MPI4PY Python Error 11 Creating too many threads -
i working code in python , mpi4py throwing strange error. when try run code below throws following:
error; return code pthread_create() 11 error detail: resource temporarily unavailable sh: fork: retry: resource temporarily unavailable /home/sfortney/anaconda/lib/python2.7/site-packages/numexpr/cpuinfo.py:40: userwarning: [errno 11] resource temporarily unavailable warnings.warn(str(e), userwarning, stacklevel=stacklevel) i scaled code simpler, working mpi4py script have posted below. research on error seems creating many threads. seems odd me not calling threading, multiple processors (my basic understanding threads intra-core phenomenon wouldn't touched if calling multiple cores , doing 1 thing on each. sorry if not true.).
i can't make sense of why code @ bottom works code below uses same structure not. why code below running thread constraints? , in code calling multiple threads?
i have posted whole code below reproducibility of error. if relevant running on 32 core linuxbox.
#to run call "mpiexec -n 10 python par_implement_wavefront.py" in terminal __future__ import division import pandas pd import numpy np import itertools import os itertools import chain, combinations operator import add collections import counter home="/home/sfortney" np.set_printoptions(precision=2, suppress=true) #choose dimensionality , granularity dim=2 gran=5 mpi4py import mpi mpi4py.mpi import any_source comm = mpi.comm_world rank = comm.get_rank() size = comm.get_size() command_buffer = np.zeros(3) # first entry boolean, second tuple objective function inputs, third array index result_buffer=np.zeros(3) # first position node, if rank==0: #defining of our functions need on root node first #makes ax1 axes of n dim array def axis_fitter(arr, dim, gran, start=1, stop=101): ax1=np.linspace(start,stop, num=gran) in range(dim): indexlist=[0]*dim indexlist[i]= slice(none) arr[indexlist]=ax1 return arr #this used make inital queues #fix me work nan's! def queue_init(arr): queue=[] queueposs=[] queuedone=np.argwhere(arr >0).tolist() return queue,queueposs,queuedone #this used in queue updating function def queue_sorter(queue): queue.sort(key=lambda x: np.linalg.norm(np.array(x))) # using l1 norm # queue.sort(key=lambda x: sum(x)) return queue #this finds indicies "back" of our box def back_index(dim): standardbasis=[] in range(dim): vec=[0]*dim vec[i]=vec[i]+1 standardbasis.append(vec) powerset=[] z in chain.from_iterable(combinations(standardbasis,r) r in range(len(standardbasis)+1)): powerset.append(z) powersetnew=[] in range(len(powerset)): powersetnew.append([sum(x) x in zip(*list(powerset[i]))]) powersetnew.remove([]) powersetnew=[[i*(-1) in x] x in powersetnew] return powersetnew #this takes completed index , updates our queue of possible values #as our done queue def queue_update(queue,queueposs,queuedone, arr,dim,comp_idx=[0,0]): queuedone.append(comp_idx) if comp_idx==[0,0]: init_index=[1]*dim queue.append(init_index) in range(dim): poss_index=[1]*dim poss_index[i]=2 queueposs.append(poss_index) return queue,queueposs,queuedone else: queuedone.append(comp_idx) try: queueposs.remove(comp_idx) except: pass in range(dim): new_idx=comp_idx[:] new_idx[i]=new_idx[i]+1 back_list=back_index(dim) back_list2=[] x in back_list: back_list2.append(list(np.add(np.asarray(new_idx),np.asarray(x)))) if set(tuple(x) x in back_list2).issubset(set(tuple(x) x in queuedone)): queueposs.append(new_idx) queueposs=list(set(tuple(x) x in queueposs)-set(tuple(x) x in queuedone)) queueposs=[list(x) x in queueposs] queueposs=queue_sorter(queueposs) try: x in range(len(queueposs)): queueappender=(queueposs).pop(x) queue.append(queueappender) except: print "queueposs empty" queue=queue_sorter(queue) return queue,queueposs,queuedone #this function makes dont have pass whole array through mpi pertinent information def objectivefuncprimer(arr, queue_elem, dim): inputs=back_index(dim) inputs2=[] x in inputs: inputs2.append(list(np.add(np.asarray(queue_elem),np.asarray(x)))) inputs3=[] x in range(len(inputs2)): inputs3.append(arr[tuple(inputs2[x])]) return inputs3 #this function takes value , index , assigns array value @ index def arrupdater(val,idx): arr[tuple(idx)]=val return arr, idx #########initializing all_finished=false #make our empty array sizer=tuple([gran]*dim) arr=np.zeros(shape=sizer) nodes_avail=range(1, size) # 0 not worker #assumes axes start @ same place ax1=np.linspace(20,30, num=gran) arr=axis_fitter(arr, dim, gran) #fitting axes , initializing queues arr=axis_fitter(arr, dim, gran, start=20, stop=30) queue,queueposs,queuedone =queue_init(arr) #running first updater queue,queueposs,queuedone=queue_update(queue,queueposs,queuedone,arr,dim) def sender(queue): send_num=min(len(queue),len(nodes_avail)) k in range(send_num): node=nodes_avail.pop() queue_elem=queue.pop(k) command_buffer[0]=int(all_finished) command_buffer[1]=queue_elem command_buffer[2]=objectivefuncprimer(arr,queue_elem,dim) comm.send(command_buffer, dest=node) while all_finished==false: sender(queue) comm.recv(result_buffer,source=mpi.any_source) arr,comp_idx=arrupdater(result_buffer[1],result_buffer[2]) queue,queueposs,queuedone=queue_update(queue,queueposs,queuedone,arr,dim,comp_idx) nodes_avail.append(result_buffer[0]) if len(queuedone)==gran**2: n in range(1, size): comm.send(np.array([true,0,0]), dest=n) all_finished=true print arr if rank>0: all_finished_worker=false #this test function work in 2d def objectivefunc2d_2(inputs): #this important more complicated functions later #backnum=(2**dim)-1 val=sum(inputs) return val while all_finished_worker==false: comm.recv(command_buffer, source=0) all_finished_worker=bool(command_buffer[0]) if all_finished_worker==false: result=objectivefunc2d_2(command_buffer[2]) # print str(result) +" "+str(rank) result_buffer=np.array([rank,result,command_buffer[1]]) comm.send(result_buffer, dest=0) this code works , of basic structure used above on much, more simple example.
from __future__ import division import numpy np import os itertools import chain, combinations mpi4py import mpi mpi4py.mpi import any_source comm = mpi.comm_world rank = comm.get_rank() size = comm.get_size() command_buffer = np.zeros(2) # first entry boolean, rest data result_buffer=np.zeros(2) # first position node, rest data if rank==0: all_finished=false nodes_avail=range(1, size) # 0 not worker arr=[] q=range(20) def primer(q): return int(all_finished),q def sender(q): send_num=min(len(q),len(nodes_avail)) k in range(send_num): node=nodes_avail.pop() queue_init=q.pop() command_buffer[0]=primer(queue_init)[0] command_buffer[1]=primer(queue_init)[1] comm.send(command_buffer, dest=node) while all_finished==false: sender(q) # update q comm.recv(result_buffer,source=mpi.any_source) arr.append(result_buffer[1]) nodes_avail.append(result_buffer[0]) if len(arr)==20: n in range(1, size): comm.send(np.array([true,0]), dest=n) all_finished=true print arr if rank>0: all_finished_worker=false while all_finished_worker==false: comm.recv(command_buffer, source=0) all_finished_worker=bool(command_buffer[0]) if all_finished_worker==false: result=command_buffer[1]*2 # print str(result) +" "+str(rank) result_buffer=np.array([rank,result]) comm.send(result_buffer, dest=0)
Comments
Post a Comment