No Description

mpi_text_evolve.py 18KB


  1. # -*- coding: utf-8 -*-
  2. ################################################################################
  3. import os
  4. import sys
  5. import time
  6. from optparse import OptionParser
  7. from threading import Thread
  8. #from multiprocessing import Process
  9. from random import randint
  10. from mpi4py import MPI
  11. try:
  12. sys.path.append('generic')
  13. except:
  14. pass
  15. try:
  16. sys.path.append('../generic')
  17. except:
  18. pass
  19. try:
  20. sys.path.append('/mnt/cluster/dev/evolutionary/genetic_algorithm/generic/')
  21. except:
  22. pass
  23. import evolve
  24. from calcobj import CalcObj
  25. ################################################################################
  26. parser = OptionParser()
  27. parser.add_option('--mt', '--maxtry'
  28. ,type=int
  29. ,help='The maximum number of try till the end of the search for a solution.'
  30. ,default=evolve.getDefaultTryMax()
  31. ,dest='maxtry')
  32. parser.add_option('--mr', '--mutationrate'
  33. ,type=float
  34. ,help='The probability as % that one chromosome gets mutated after the DNA recombinmation.'
  35. ,default=evolve.getDefaultMutRate()
  36. ,dest='mutationrate')
  37. parser.add_option('--pm', '--populationmax'
  38. ,type=int
  39. ,help='The number of DNA structures were generated per try.'
  40. ,default=evolve.getDefaultPopMax()
  41. ,dest='populationmax')
  42. parser.add_option('--d', '--dna'
  43. ,type=str
  44. ,default=""
  45. ,dest='dna')
  46. parser.add_option('--hi', '--historytoinfluxdb' # TODO implement that feature
  47. ,action='store_true'
  48. ,default=False
  49. ,dest='historytoinfluxdb')
  50. parser.add_option('--sh', '--savehistory'
  51. ,action='store_true'
  52. ,default=False
  53. ,dest='savehistory')
  54. parser.add_option('--nt', '--noterminaloutput'
  55. ,action='store_true'
  56. ,default=False
  57. ,dest='noterminaloutput')
  58. ################################################################################
  59. CHROMOSOME_MIN_VALUE = 0
  60. CHROMOSOME_MAX_VALUE = 0
  61. DNA_LEN = 0
  62. DEST_DNA = list(ord(i) for i in list("Hello World - genetic algorithm"))
  63. DNA_DICTIONARY = []
  64. ################################################################################
  65. mpi_comm = None
  66. mpi_rank = None # the own
  67. mpi_size = None # the number of mpi processes
  68. evolve_mpi_size = None
  69. genDNAPop_mpi_size = None
  70. ##################################################################################
  71. def hasSolution_MP_Wrap(d):
  72. # just call the function because the dataset was already split before
  73. return evolve.hasSolution(d['data'])
  74. ####################################################################################
  75. def genDNAPop_MP_Wrap(d):
  76. if d['param'] / d['threads'] < d['param'] and d['thread'] == d['threads'] - 1:
  77. # add the rest to the last process and return the result of the modified data range
  78. return genDNAPop((d['param'] / d['threads']) + (d['param'] - ((d['param'] / d['threads']) * (d['thread']+1))))
  79. return genDNAPop(d['param'] / d['threads'])
  80. ################################################################################
  81. def evolve_MP_Wrap(d):
  82. r = evolve.evolve(d['data'], d['param'] / d['threads'])
  83. # also safe the mutation counter
  84. # TODO is responding value of evolve.getMutCnt() the value which was count in the single thread or is it a semi summary of the counted mutations?
  85. #print str(evolve)
  86. #print evolve.getMutCnt()
  87. return { 'dnaList' : r, 'mutCnt' : evolve.getMutCnt() }
  88. ##############################################################################
  89. def mpi_hasSolution(dnaList):
  90. # mx is not used in this function
  91. global mpi_size
  92. global evolve_mpi_size
  93. outList = []
  94. n = len(dnaList) / evolve_mpi_size
  95. r = 1
  96. #print "ld=" + str(len(dnaList)) + " n=" + str(n) + " ems=" + str(evolve_mpi_size)
  97. for i in xrange(0, len(dnaList), n):
  98. tData = dnaList[i:i+n]
  99. mpi_comm.send({ 'cmd' : 'hasSolution', 'dnaList' : tData }, dest=r, tag=11)
  100. if r == evolve_mpi_size:
  101. # abort sending data to the mpi ranks because all data were send to the mpi slaves for now
  102. break
  103. r = r + 1
  104. # wait for the data from the mpi ranks
  105. tr = r + 1
  106. found = False
  107. for r in range(1, tr): # walk through all mpi nodes
  108. tmp = mpi_comm.recv(source=r, tag=11) # concatenate the result lists
  109. outList = outList + tmp['dnaList']
  110. if tmp['found'] == True:
  111. found = True
  112. mpi_comm.send({ 'cmd' : 'showProgress', 'progress' : evolve.getProgress(), 'bestDNA' : evolve.getBestDNA() }, dest=mpi_size-1, tag=11)
  113. return { 'found' : found, 'dnaList' : outList }
  114. ############################################################################
  115. def mpi_evolve(dnaList, mx):
  116. # mx is not used in this function
  117. global mpi_size
  118. global evolve_mpi_size
  119. outList = []
  120. n = len(dnaList) / evolve_mpi_size
  121. r = 1
  122. #print "ld=" + str(len(dnaList)) + " n=" + str(n) + " ems=" + str(evolve_mpi_size)
  123. for i in xrange(0, len(dnaList), n):
  124. tData = dnaList[i:i+n]
  125. mpi_comm.send({ 'cmd' : 'evolve', 'dnaList' : tData, 'max' : n }, dest=r, tag=11)
  126. if r == evolve_mpi_size:
  127. # abort sending data to the mpi ranks because all data were send to the mpi slaves for now
  128. break
  129. r = r + 1
  130. # 2. wait for the data from the mpi ranks
  131. tr = r + 1
  132. for r in range(1, tr): # walk through all mpi nodes
  133. tmp = mpi_comm.recv(source=r, tag=11) # concatenate the result lists
  134. outList = outList + tmp['dnaList']
  135. return outList
  136. ################################################################################
  137. def mpi_genDNAPop(cnt):
  138. global genDNAPop_mpi_size
  139. outList = []
  140. # cnt is the number of the dna objects to create
  141. # 1. spread the number of tbc objects to the number of mpi slave ranks
  142. n = cnt / genDNAPop_mpi_size
  143. #print str(cnt) + " " + str(genDNAPop_mpi_size) + " " + str(n)
  144. for r in range(1, genDNAPop_mpi_size+1): # walk through the mpi slave ranks
  145. #print "r=" + str(r) + " n=" + str(n)
  146. mpi_comm.send({ 'cmd' : 'genDNAPop', 'cnt' : n }, dest=r, tag=11)
  147. # 2. wait for the data has been finished
  148. for r in range(1, genDNAPop_mpi_size+1):
  149. tmp = mpi_comm.recv(source=r, tag=11)
  150. #print len(tmp['data'])
  151. outList = outList + tmp['dnaList']
  152. #print len(outList)
  153. return outList
  154. ################################################################################
  155. def genDNAPop(cnt):
  156. global CHROMOSOME_MIN_VALUE
  157. global CHROMOSOME_MAX_VALUE
  158. global DNA_DICTIONARY
  159. #print "DNA to generate: " + str(cnt)
  160. if cnt <= 0:
  161. return []
  162. data = []
  163. # generate a new population
  164. for i in range(0, cnt):
  165. dna = { 'data' : [], 'fit' : 0 }
  166. # generate a new the dna
  167. for j in range(0, DNA_LEN):
  168. n = randint(CHROMOSOME_MIN_VALUE, CHROMOSOME_MAX_VALUE - 1)
  169. c = DNA_DICTIONARY[n]
  170. dna['data'].append(c)
  171. data.append(dna)
  172. return data
  173. ################################################################################
  174. # mutate
  175. def mutateDNA(dna):
  176. global DNA_DICTIONARY
  177. global CHROMOSOME_MIN_VALUE
  178. global CHROMOSOME_MAX_VALUE
  179. # mit einer wahrscheinlichkeit von mutationRate ein Chromosom wählen und zufällig verändern
  180. # die wahrscheinlighkeit wird für jeden der dna stränge neu ermittelt
  181. if evolve.shouldBeMutated() == True:
  182. n = evolve.chooseRandomChromosome(dna['data'])
  183. dna['data'][n] = DNA_DICTIONARY[randint(CHROMOSOME_MIN_VALUE, CHROMOSOME_MAX_VALUE)]
  184. #dna['mut'] = n
  185. return dna
  186. ################################################################################
  187. # crossover
  188. def recombineDNA(a, b):
  189. ret = []
  190. l = len(a['data']) / 2
  191. # left left
  192. # ab cd => ac
  193. dna = a['data'][:l] + b['data'][:l]
  194. ret.append({ 'data' : dna, 'fit' : 0 })
  195. # left right
  196. # ab cd => ad
  197. dna = a['data'][:l] + b['data'][l:]
  198. ret.append({ 'data' : dna, 'fit' : 0 })
  199. # right left
  200. # ab cd => bc
  201. dna = a['data'][l:] + b['data'][:l]
  202. ret.append({ 'data' : dna, 'fit' : 0 })
  203. # right right
  204. # ab cd => bd
  205. dna = a['data'][l:] + b['data'][l:]
  206. ret.append({ 'data' : dna, 'fit' : 0 })
  207. # right left
  208. # ab cd => cb
  209. dna = b['data'][:l] + a['data'][l:]
  210. ret.append({ 'data' : dna, 'fit' : 0 })
  211. return ret
  212. ################################################################################
  213. # evaluate
  214. def fitness(dna):
  215. global DEST_DNA
  216. f = 0
  217. t = 0
  218. for i in range(0, len(dna['data'])):
  219. if dna['data'][i] == DEST_DNA[i]:
  220. t = t + 1
  221. f = f + (2 ** (t + 1))
  222. dna['fit'] = f
  223. if t == len(dna['data']):
  224. dna['sol'] = True
  225. return True
  226. return False
  227. ################################################################################
  228. def percentualSolved(dna):
  229. global DEST_DNA
  230. t = 0
  231. for i in range(0, len(dna['data'])):
  232. if dna['data'][i] == DEST_DNA[i]:
  233. t = t + 1
  234. return (100.0 * t) / len(dna['data'])
  235. ################################################################################
  236. def dnaToString(dna):
  237. s = ""
  238. try:
  239. for n in range(0, len(dna['data'])):
  240. s = s + chr(dna['data'][n])
  241. except:
  242. print "ERROR: There is a problem, the bestDNA is set to \'" + str(dna) + "\'"
  243. exit(-10)
  244. return s
  245. ################################################################################
  246. def strToList(s):
  247. return list(ord(i) for i in list(s))
  248. ################################################################################
  249. def init():
  250. global DNA_DICTIONARY
  251. global DEST_DNA
  252. global DNA_LEN
  253. global CHROMOSOME_MAX_VALUE
  254. if len(DEST_DNA) % 2 != 0:
  255. # append a space at the end if the length of the list is not module 2 == 0
  256. DEST_DNA.append(ord(" "))
  257. DNA_DICTIONARY = sorted(list(set(DEST_DNA)))
  258. DNA_LEN = len(DEST_DNA)
  259. CHROMOSOME_MAX_VALUE = len(DNA_DICTIONARY) - 1
  260. # recalculate the number of "evolve_mpi_size"
  261. return getBestMPIRankNum()
  262. ################################################################################
  263. def getBestMPIRankNum():
  264. global mpi_size
  265. global evolve_mpi_size
  266. global genDNAPop_mpi_size
  267. intSize = mpi_size - 1
  268. pm = evolve.getPopMax()
  269. # evaluate the correct number of mpi ranks to use for data processing during a mpi evolve function call
  270. pm_mod_2 = float(pm) % 2
  271. for i in range(intSize, 0, -1):
  272. if pm_mod_2 == 0 and (float(pm) / i) % 2 == 0:
  273. evolve_mpi_size = i
  274. #print "evolve_mpi_size = " + str(evolve_mpi_size)
  275. break
  276. for i in range(intSize, 0, -1):
  277. if float(pm) % i == 0:
  278. genDNAPop_mpi_size = i
  279. #print "genDNAPop_mpi_size = " + str(genDNAPop_mpi_size)
  280. break
  281. if genDNAPop_mpi_size == None or evolve_mpi_size == None:
  282. print "evolve_mpi_size = " + str(evolve_mpi_size)
  283. print "genDNAPop_mpi_size = " + str(genDNAPop_mpi_size)
  284. return False
  285. return True
  286. ############################################################################
  287. if __name__ == '__main__':
  288. sys.stdout.flush()
  289. sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
  290. mpi_comm = MPI.COMM_WORLD
  291. mpi_rank = mpi_comm.Get_rank() # the own
  292. mpi_size = mpi_comm.Get_size() # the number of mpi processes
  293. if mpi_rank == 0:
  294. for a in sys.argv:
  295. sys.stdout.write(a + " ")
  296. print "\n===="
  297. pm = evolve.getDefaultPopMax()
  298. mr = evolve.getDefaultMutRate()
  299. tm = evolve.getDefaultTryMax()
  300. #DEST_DNA = strToList("Hello World - genetic algorithm")
  301. #DEST_DNA = strToList("Hello World")
  302. DEST_DNA = strToList("1234")
  303. #DEST_DNA = [ 0, 1, 2 ]
  304. ##########
  305. (options, args) = parser.parse_args() # parse sys.argv
  306. if evolve.getDefaultPopMax() != options.populationmax:
  307. if options.populationmax <= 0 or options.populationmax % 2 != 0:
  308. print "ERROR Population maximum is wrong (>0 and %2 == 0)!"
  309. exit(-1)
  310. else:
  311. pm = options.populationmax
  312. if evolve.getDefaultMutRate() != options.mutationrate:
  313. if options.mutationrate <= 0:
  314. print "ERROR Mutation rate is wrong (>0)!"
  315. exit(-2)
  316. else:
  317. mr = options.mutationrate
  318. if evolve.getDefaultTryMax() != options.maxtry:
  319. if options.maxtry <= 0:
  320. print "ERROR Maximum try number is wrong (>0)!"
  321. exit(-3)
  322. else:
  323. tm = options.maxtry
  324. if "" != options.dna:
  325. DEST_DNA = list(ord(i) for i in list(options.dna))
  326. ##########
  327. evolve.init(fitness, mutateDNA, recombineDNA, mpi_genDNAPop, pm, mr, tm, mpi_evolve, mpi_hasSolution)
  328. evolve.setStoreHistory(options.savehistory)
  329. ##########
  330. if init() == False:
  331. print "ERROR Init calculations failed!"
  332. exit(-4)
  333. #mpi master rank activities
  334. if mpi_rank == 0:
  335. ##########
  336. print "Destination DNA: " + str(DEST_DNA) + " => \'" + dnaToString({'data' : DEST_DNA}) + "\'"
  337. print "DNA dictionary: " + str(DNA_DICTIONARY) + " => \'" + dnaToString({'data' : DNA_DICTIONARY}) + "\'"
  338. calPosSol = len(DNA_DICTIONARY) ** len(DEST_DNA)
  339. print "Possible solutions: " + str(calPosSol)
  340. print "----------"
  341. ##########
  342. # run the evolve process in a separate task
  343. # this should maybe run as a separate thread
  344. t = Thread(target=evolve.autorun)
  345. #t = Process(target=evolve.autorun) # the process is not an good idea
  346. t.start()
  347. print "Generate initial DNA population..."
  348. sys.stdout.write("Processing data...\r")
  349. while True:
  350. # wait until there is a solution found and or the maximum tries are done
  351. if t.is_alive() == False:
  352. break
  353. time.sleep(1)
  354. print
  355. # wait for the thread to exit
  356. t.join()
  357. # get the best solution
  358. bestDNA = evolve.getBestDNA()
  359. print "----------"
  360. print "Get mutation counter..."
  361. # get mutation counter
  362. for r in range(1, mpi_size): # walk through the mpi slave ranks
  363. mpi_comm.send({ 'cmd' : 'getMutCnt' }, dest=r, tag=11)
  364. # 2. wait for the data has been finished
  365. for r in range(1, mpi_size):
  366. tmp = mpi_comm.recv(source=r, tag=11)
  367. evolve.setMutCnt(evolve.getMutCnt() + tmp['mutCnt'])
  368. print "----------"
  369. print "Exiting all mpi slave ranks"
  370. # force all threads to quit
  371. for r in range(1, mpi_size):
  372. mpi_comm.send({ 'cmd' : 'exit' }, dest=r, tag=11)
  373. print "----------"
  374. ##########
  375. # print summary
  376. print "Best DNA is " + str(bestDNA) + " => \'" + dnaToString(bestDNA) + "\'"
  377. try:
  378. if 'sol' in bestDNA and bestDNA['sol'] == True:
  379. print "A solution was found"
  380. else:
  381. print "Best solution has reached %0.2f" % (percentualSolved(bestDNA)) + "%"
  382. except:
  383. print "ERROR: There is a problem, the bestDNA is set to \'" + str(bestDNA) + "\'"
  384. exit(-5)
  385. print str(evolve.getTryCnt()) + " tries of max " + str(evolve.getTryMax()) + " allowed generations/tries - which is %0.2f" % (evolve.getProgress()) + "%"
  386. print str(evolve.getPopMax()) + " DNA data objects per generation"
  387. print "Mutation rate: " + str(evolve.getMutRate()) + "%"
  388. print "Count of mutations: " + str(evolve.getMutCnt())
  389. if evolve.getStoreHistory() == True:
  390. history = evolve.getStoredHistory()
  391. print "Generated " + str(len(history)) + " DNA objects"
  392. print "Number of MPI ranks used for evolve/hasSolution: " + str(evolve_mpi_size)
  393. print "Number of MPI ranks used for initial population generation: " + str(genDNAPop_mpi_size)
  394. ##########
  395. # end of rank 0
  396. else: ### other rank than 0 ###
  397. while True:
  398. data = mpi_comm.recv(source=0, tag=11)
  399. # test if the master has send the a exit condition
  400. if 'cmd' in data:
  401. #####
  402. if data['cmd'] == 'exit':
  403. break
  404. #####
  405. elif data['cmd'] == 'hasSolution':
  406. tmpData = data['dnaList']
  407. # process these data with multiprocessing
  408. #tmp = evolve.hasSolution(tmpData)
  409. o = CalcObj({'data' : tmpData, 'param' : None}, hasSolution_MP_Wrap)
  410. o.calc()
  411. out = { 'found' : o.results['found'], 'dnaList' : o.results['dnaList'] }
  412. mpi_comm.send(out, dest=0, tag=11)
  413. #####
  414. elif data['cmd'] == 'genDNAPop':
  415. cnt = data['cnt']
  416. # generate the DNA population
  417. # process these data with multiprocessing
  418. #tmpData = genDNAPop(cnt)
  419. o = CalcObj({'data' : None, 'param' : cnt}, genDNAPop_MP_Wrap)
  420. o.calc()
  421. out = { 'dnaList' : o.results }
  422. mpi_comm.send(out, dest=0, tag=11)
  423. #####
  424. elif data['cmd'] == 'evolve':
  425. dnaList = data['dnaList']
  426. mx = data['max'] # use like the maximum population
  427. evolve.setPopMax(mx) # set the mpi rank local maximum population
  428. evolve.recalcToMutatePerPop()
  429. # process these data with multiprocessing
  430. #tmpData = evolve.evolve(dnaList, mx)
  431. o = CalcObj({'data' : dnaList, 'param' : mx}, evolve_MP_Wrap)
  432. o.calc()
  433. # TODO check if to add the mutation counter again and again is the
  434. # root of the evil wron mutation counter value problem
  435. #evolve.setMutCnt(evolve.getMutCnt() + o.results['mutCnt'])
  436. evolve.setMutCnt(o.results['mutCnt'])
  437. # also send the count of the mutations which were done during the autorun to the mpi master rank
  438. out = { 'dnaList' : o.results['dnaList'] }
  439. mpi_comm.send(out, dest=0, tag=11)
  440. #####
  441. elif data['cmd'] == 'getMutCnt':
  442. out = { 'mutCnt' : evolve.getMutCnt() }
  443. mpi_comm.send(out, dest=0, tag=11)
  444. #####
  445. elif data['cmd'] == 'showProgress':
  446. if not options.noterminaloutput:
  447. # normal terminal output
  448. sys.stdout.write("Processing data %0.2f" % (data['progress']) + "%")
  449. tmp = data['bestDNA']
  450. if type(tmp) == dict:
  451. sys.stdout.write(" - \'" + dnaToString(tmp) + "\'\r")
  452. else:
  453. sys.stdout.write("\r")
  454. else:
  455. # non terminal output e.g. into a logging file
  456. if data['progress'] > 0 and data['progress'] % 10 == 0:
  457. sys.stdout.write(" %0.2f" % (data['progress']) + "%")
  458. #print "rank: " + str(mpi_rank) + " exited"
  459. # end