calcobj.py 5.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # -*- coding: utf-8 -*-
  2. ###############################################################################
  3. import multiprocessing
  4. MAX_PROC_NUM = 4 # TODO should be dynamically set to the number of cores - or set in an configuration file.
  5. ###############################################################################
  6. class CalcObj:
  7. data = {} # data that have to be processed by the calc function
  8. results = None # the resultset after the calculation
  9. jobs = [] # the list of the running jobs
  10. func = None # the function to use for calculation
  11. ###############
  12. def __init__(self, d, func):
  13. self.data = d
  14. self.results = None
  15. self.jobs = []
  16. self.calcFunc = func
  17. ###############
  18. def calc(self):
  19. if self.data['data'] != None:
  20. thread_num = MAX_PROC_NUM
  21. # TODO hier prüfen on eine Teilung auf MAX_PROC_NUM sinnvoll ist
  22. found = False
  23. while found == False:
  24. if len(self.data['data']) % thread_num != 0:
  25. thread_num = thread_num - 1
  26. else:
  27. # check the count of the elements per thread
  28. if (len(self.data['data']) / thread_num) % 2 != 0:
  29. thread_num = thread_num - 1
  30. else:
  31. found = True
  32. # check that at least one thread is used
  33. if thread_num < 1:
  34. found = True
  35. thread_num = 1
  36. #print "thread_num: " + str(thread_num)
  37. # manage the to be created processes
  38. for i in range(0, thread_num):
  39. queue = multiprocessing.Queue()
  40. # define a new process
  41. p = multiprocessing.Process(target=self.worker, args=(queue,))
  42. # set the parameter and thread number into the container that
  43. # will be send to the new process using a queue
  44. outdata = { 'thread' : i, 'threads' : thread_num, 'param' : self.data['param'] }
  45. # split the data which should be sent to the new process wisely
  46. if self.data['data'] != None:
  47. l = len(self.data['data']) / thread_num
  48. if l == 0 or len(self.data['data']) - (l * thread_num) == 0 or i < thread_num-1:
  49. if l == 0:
  50. outdata['data'] = self.data['data']
  51. else:
  52. #print str(l) + " " + str(i)
  53. # just split the data and fill it into an new container dictionary
  54. outdata['data'] = self.data['data'][i*l: l+i*l]
  55. # the data is not dividable by two without a rest
  56. if i >= thread_num-1:
  57. if l > 0 and len(self.data['data']) - (l * thread_num) > 0:
  58. #print self.data[i*l:]
  59. outdata['data'] = self.data['data'][i*l:]
  60. # send the data to the process using the process specific queue
  61. queue.put(outdata)
  62. # start the new process
  63. p.start()
  64. self.jobs.append([p, queue])
  65. if l == 0:
  66. break
  67. else: # no data in data['data']
  68. # no data should be sent to the processes
  69. for i in range(0, MAX_PROC_NUM):
  70. queue = multiprocessing.Queue()
  71. # create a new process
  72. p = multiprocessing.Process(target=self.worker, args=(queue,))
  73. # just put the number of the threads and the parameter to the process using a queue
  74. queue.put({'thread' : i, 'threads' : MAX_PROC_NUM, 'data:' : None, 'param' : self.data['param']}, )
  75. p.start()
  76. self.jobs.append([p, queue])
  77. # wait for the jobs to finish
  78. running = True
  79. while running == True and len(self.jobs) > 0:
  80. tmp = False
  81. # check if there is a job running
  82. for e in self.jobs:
  83. if e[0].is_alive() == True:
  84. tmp = True
  85. running = tmp
  86. first = True
  87. # stop the jobs and close the queues
  88. # merge the resulting data from the queues into one output dataset
  89. for e in self.jobs:
  90. tdata = e[1].get()
  91. #print tdata
  92. if first == True:
  93. # fix the tdata datatype
  94. if type(tdata) == list:
  95. self.results = []
  96. elif type(tdata) == dict:
  97. try:
  98. if len(self.results['dnaList']) == 0:
  99. pass
  100. except:
  101. self.results = {}
  102. self.results['dnaList'] = []
  103. else:
  104. self.results = None
  105. first = False
  106. # copy data as related to the datatype of self.results and tdata
  107. if type(tdata) == list:
  108. self.results = self.results + tdata # concatenate two lists
  109. elif type(tdata) == dict:
  110. self.results['dnaList'] = self.results['dnaList'] + tdata['dnaList']
  111. try:
  112. # try to copy the found key to the resulting dataset
  113. if self.results['found'] == False:
  114. self.results['found'] = tdata['found']
  115. except:
  116. try:
  117. # the found key is not part of the results dataset so we should try to add it
  118. self.results['found'] = tdata['found']
  119. except:
  120. # Looks like that there is no found key in the tdata dictionary.
  121. # This happens if tdata was not received by the hasSolution_MP_Wrap function.
  122. # So we have to copy the mutCnt key because tData contain a result set
  123. # which was received by the evolve_MP_Wrap function.
  124. # This part of the generic calcobj class should be much more generalized.
  125. try:
  126. # TODO at this point the mutCnt value should be checkd for summarize problems.
  127. #self.results['mutCnt'] = self.results['mutCnt'] + tdata['mutCnt']
  128. self.results['mutCnt'] = tdata['mutCnt']
  129. pass
  130. except:
  131. # a big problem occured because a this point the key found was not found and the key mutCnt also
  132. print "Big problem..."
  133. exit(-1)
  134. # finally be sure that the process has finished and the queue is closed
  135. e[0].join()
  136. e[1].close()
  137. e[1].join_thread()
  138. ###############
  139. def worker(self, queue):
  140. d = queue.get()
  141. tmp = self.calcFunc(d)
  142. queue.put(tmp)
  143. ###############################################################################