Changeset 472 for trunk/CifFile/__init__.py
- Timestamp:
- Feb 6, 2012 2:03:00 PM (13 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified trunk/CifFile/__init__.py ΒΆ
r471 r472 1 """ 2 1.This Software copyright \u00A9 Australian Synchrotron Research Program Inc, ("ASRP"). 3 4 2.Subject to ensuring that this copyright notice and licence terms 5 appear on all copies and all modified versions, of PyCIFRW computer 6 code ("this Software"), a royalty-free non-exclusive licence is hereby 7 given (i) to use, copy and modify this Software including the use of 8 reasonable portions of it in other software and (ii) to publish, 9 bundle and otherwise re-distribute this Software or modified versions 10 of this Software to third parties, provided that this copyright notice 11 and terms are clearly shown as applying to all parts of software 12 derived from this Software on each occasion it is published, bundled 13 or re-distributed. You are encouraged to communicate useful 14 modifications to ASRP for inclusion for future versions. 15 16 3.No part of this Software may be sold as a standalone package. 17 18 4.If any part of this Software is bundled with Software that is sold, 19 a free copy of the relevant version of this Software must be made 20 available through the same distribution channel (be that web server, 21 tape, CD or otherwise). 22 23 5.It is a term of exercise of any of the above royalty free licence 24 rights that ASRP gives no warranty, undertaking or representation 25 whatsoever whether express or implied by statute, common law, custom 26 or otherwise, in respect of this Software or any part of it. Without 27 limiting the generality of the preceding sentence, ASRP will not be 28 liable for any injury, loss or damage (including consequential loss or 29 damage) or other loss, loss of profits, costs, charges or expenses 30 however caused which may be suffered, incurred or arise directly or 31 indirectly in respect of this Software. 32 33 6. This Software is not licenced for use in medical applications. 34 """ 35 36 from types import * 37 import re 38 import StarFile 39 import sys 40 class CifLoopBlock(StarFile.LoopBlock): 41 def __init__(self,data=(),dimension=0,**kwargs): 42 self.loopclass = CifLoopBlock 43 if dimension > 1: 44 raise CifError( 'Attempt to nest loops, loop level %d' % dimension) 45 StarFile.LoopBlock.__init__(self,data,dimension=dimension,**kwargs) 46 # self.__iter__ = self.recursive_iter 47 48 def __iter__(self): 49 return self.recursive_iter() 50 51 def AddLoopItem(self,data,precheck=False): 52 StarFile.LoopBlock.AddLoopItem(self,data,precheck,maxlength=75) 53 54 def insert_loop(self,newloop,**kwargs): 55 if newloop.dimension > 1: 56 raise CifError( 'Attempt to insert inner loop, loop level %d' % dimension) 57 StarFile.LoopBlock.insert_loop(self,newloop,**kwargs) 58 59 class CifBlock(CifLoopBlock): 60 def __init__(self,data = (), strict = 1, maxoutlength=2048,wraplength=80,overwrite=True,dimension=0): 61 self.strict = strict 62 CifLoopBlock.__init__(self,data=data,dimension=0,maxoutlength=maxoutlength,wraplength=wraplength,overwrite=overwrite) 63 if isinstance(data,(StarFile.StarBlock,CifBlock)): 64 self.saves = StarFile.BlockCollection(datasource=data["saves"],element_class=CifBlock,type_tag="save") 65 else: 66 self.saves = StarFile.BlockCollection(element_class=CifLoopBlock,type_tag="save") 67 if self.strict: 68 self.checklengths() 69 self.dictionary = None 70 71 def RemoveCifItem(self,itemname): 72 CifLoopBlock.RemoveLoopItem(self,itemname) 73 74 def __getitem__(self,key): 75 if key == "saves": 76 return self.saves 77 try: 78 rawitem = CifLoopBlock.__getitem__(self,key) 79 except KeyError: 80 if self.dictionary: 81 # send the dictionary the required key and a pointer to us 82 rawitem = self.dictionary.derive_item(key,self) 83 else: 84 raise KeyError, 'No such item: %s' % key 85 # we now have an item, we can try to convert it to a number if that is appropriate 86 if not self.dictionary or not self.dictionary.has_key(key): return rawitem 87 return self.dictionary.change_type(key,rawitem) 88 89 def __setitem__(self,key,value): 90 if key == "saves": 91 self.saves[key] = value 92 else: 93 self.AddCifItem((key,value)) 94 95 def clear(self): 96 CifLoopBlock.clear(self) 97 self.saves = StarFile.BlockCollection(element_class=CifLoopBlock,type_tag="save_") 98 99 def copy(self): 100 newblock = CifLoopBlock.copy(self) 101 newblock.saves = self.saves.copy() 102 return self.copy.im_class(newblock) #catch inheritance 103 104 def has_key(self,key): 105 if key == "saves": return 1 106 else: return CifLoopBlock.has_key(self,key) 107 108 def __str__(self): 109 retstr = '' 110 for sb in self.saves.keys(): 111 retstr = retstr + '\nsave_%s\n\n' % sb 112 self.saves[sb].SetOutputLength(self.wraplength,self.maxoutlength) 113 retstr = retstr + str(self.saves[sb]) 114 retstr = retstr + '\nsave_\n\n' 115 return retstr + CifLoopBlock.__str__(self) 116 117 # this is not appropriate for save blocks. Instead, the save block 118 # should be accessed directly for update 119 120 def update(self,adict): 121 loopdone = [] 122 if not isinstance(adict,CifBlock): 123 raise TypeError 124 for key in adict.block.keys(): 125 self.AddCifItem((key,adict[key])) 126 for aloop in adict.loops: 127 self.insert_loop(aloop,audit=True) 128 129 def AddCifItem(self,data): 130 # we accept only tuples, strings and lists!! 131 if not (isinstance(data[0],(StringType,TupleType,ListType))): 132 raise TypeError, 'Cif datanames are either a string, tuple or list' 133 # single items passed straight through to underlying routine 134 # we catch single item loops as well... 135 if isinstance(data[0],StringType): 136 if isinstance(data[1],(TupleType,ListType)) and not isinstance(data[1],(StarFile.StarList,StarFile.StarTuple)): 137 CifLoopBlock.AddLoopItem(self,((data[0],),((data[1],)))) 138 else: 139 CifLoopBlock.AddLoopItem(self,data) 140 return 141 # otherwise, we unpack one level and send along. This is different 142 # to the StarBlock behaviour, which assumes that any tuples imply an 143 # inner loop. 144 keyvals = zip(data[0],data[1]) 145 map(lambda a:CifLoopBlock.AddLoopItem(self,a),keyvals) 146 147 def checklengths(self): 148 toolong = filter(lambda a:len(a)>75, self.keys()) 149 outstring = "" 150 for it in toolong: outstring += "\n" + it 151 if toolong: 152 raise CifError( 'Following data names too long:' + outstring) 153 154 def loopnames(self): 155 return map(lambda a:a.keys(),self.loops) 156 157 def assign_dictionary(self,dic): 158 if not dic.diclang=="DDLm": 159 print "Warning: ignoring dictionary %s" % dic.dic_as_cif.my_uri 160 return 161 self.dictionary = dic 162 163 def merge(self,new_block,mode="strict",match_att=[],match_function=None,nosaves=False, 164 rel_keys = []): 165 # deal with save frames 166 if not nosaves: 167 self["saves"].merge(new_block["saves"],mode,match_att=match_att, 168 match_function=match_function) 169 if mode == 'strict': 170 for key in new_block.item_order: 171 if self.has_key(key) and key not in match_att: 172 raise CifError( "Identical keys %s in strict merge mode" % key) 173 elif key not in match_att: #no change otherwise 174 if isinstance(key,StringType): 175 self[key] = new_block[key] 176 else: 177 self.insert_loop(key) 178 elif mode == 'replace': 179 newkeys = new_block.keys() 180 for ma in match_att: 181 try: 182 newkeys.remove(ma) #don't touch the special ones 183 except ValueError: 184 pass 185 for key in new_block.item_order: 186 if isinstance(key,StringType): 187 self[key] = new_block[key] 188 else: 189 self.insert_loop(key) #assume is a loop 190 elif mode == 'overlay': 191 for attribute in new_block.keys(): 192 if attribute in match_att: continue #ignore this one 193 new_value = new_block[attribute] 194 #non-looped items 195 if isinstance(new_value,StringType): 196 self[attribute] = new_value 197 these_atts = self.keys() 198 for newloop in new_block.loops: 199 newkeys = newloop.keys() 200 # note that the following line determines packet item order 201 overlaps = filter(lambda a: a in these_atts,newkeys) 202 if len(overlaps)< len(newloop):#completely new loop 203 self.insert_loop(newloop) 204 elif len(overlaps)==len(newloop): 205 # appending packets 206 # print "In overlay merge mode, found extra packet items:" 207 # print `overlaps` 208 # get key position 209 loop_keys = filter(lambda a:a in rel_keys,overlaps) 210 try: 211 newkeypos = map(lambda a:newkeys.index(a),loop_keys) 212 newkeypos = newkeypos[0] #one key per loop for now 213 loop_keys = loop_keys[0] 214 except (ValueError,IndexError): 215 newkeypos = [] 216 overlap_data = map(lambda a:listify(self[a]),overlaps) #old packet data 217 new_data = map(lambda a:new_block[a],overlaps) #new packet data 218 packet_data = transpose(overlap_data) 219 new_p_data = transpose(new_data) 220 # remove any packets for which the keys match between old and new; we 221 # make the arbitrary choice that the old data stays 222 if newkeypos: 223 # get matching values in new list 224 print "Old, new data:\n%s\n%s" % (`overlap_data[newkeypos]`,`new_data[newkeypos]`) 225 key_matches = filter(lambda a:a in overlap_data[newkeypos],new_data[newkeypos]) 226 # filter out any new data with these key values 227 new_p_data = filter(lambda a:a[newkeypos] not in key_matches,new_p_data) 228 if new_p_data: 229 new_data = transpose(new_p_data) 230 else: new_data = [] 231 # wipe out the old data and enter the new stuff 232 byebyeloop = self.GetLoop(overlaps[0]) 233 # print "Removing '%s' with overlaps '%s'" % (`byebyeloop`,`overlaps`) 234 # Note that if, in the original dictionary, overlaps are not 235 # looped, GetLoop will return the block itself. So we check 236 # for this case... 237 if byebyeloop != self: 238 self.remove_loop(byebyeloop) 239 self.AddCifItem(((overlaps,),(overlap_data,))) #adding old packets 240 for pd in new_p_data: #adding new packets 241 if pd not in packet_data: 242 for i in range(len(overlaps)): 243 #don't do this at home; we are appending 244 #to something in place 245 self[overlaps[i]].append(pd[i]) 246 247 248 class CifFile(StarFile.StarFile): 249 def __init__(self,datasource=None,strict=1,maxinlength=2048,maxoutlength=0,**kwargs): 250 StarFile.StarFile.__init__(self,datasource=datasource,maxinlength=maxinlength,maxoutlength=maxoutlength,blocktype=CifBlock,**kwargs) 251 self.strict = strict 252 self.header_comment = \ 253 """#\\#CIF1.1 254 ########################################################################## 255 # Crystallographic Information Format file 256 # Produced by PyCifRW module 257 # 258 # This is a CIF file. CIF has been adopted by the International 259 # Union of Crystallography as the standard for data archiving and 260 # transmission. 261 # 262 # For information on this file format, follow the CIF links at 263 # http://www.iucr.org 264 ########################################################################## 265 """ 266 def NewBlock(self,blockname,*nkwargs,**kwargs): 267 if len(blockname)>75: 268 raise CifError , 'Blockname %s is longer than 75 characters' % blockname 269 else: 270 StarFile.StarFile.NewBlock(self,blockname,*nkwargs,**kwargs) 271 272 273 class CifError(Exception): 274 def __init__(self,value): 275 self.value = value 276 def __str__(self): 277 return '\nCif Format error: '+ self.value 278 279 class ValidCifError(Exception): 280 def __init__(self,value): 281 self.value = value 282 def __str__(self): 283 return '\nCif Validity error: ' + self.value 284 285 class CifDic(StarFile.BlockCollection): 286 def __init__(self,dic,do_minimum=False,grammar='1.1'): 287 self.do_minimum = do_minimum 288 self.dic_as_cif = dic 289 self.template_cache = {} #for DDLm imports 290 self.ddlm_functions = {} #for DDLm functions 291 self.switch_numpy(False) #no Numpy arrays returned 292 if isinstance(dic,StringType): 293 self.dic_as_cif = CifFile(dic,grammar=grammar) 294 (self.dicname,self.diclang,self.defdata) = self.dic_determine(self.dic_as_cif) 295 StarFile.BlockCollection.__init__(self,element_class=CifBlock,datasource=self.defdata) 296 self.scopes_mandatory = {"dictionary":[],"category":[],"item":[]} 297 self.scopes_naughty = {"dictionary":[],"category":[],"item":[]} 298 # rename and expand out definitions using "_name" in DDL dictionaries 299 if self.diclang == "DDL1": 300 self.DDL1_normalise() #this removes any non-definition entries 301 self.ddl1_cat_load() 302 elif self.diclang == "DDL2": 303 self.DDL2_normalise() #iron out some DDL2 tricky bits 304 elif self.diclang == "DDLm": 305 self.ddlm_normalise() 306 self.ddlm_import() #recursively calls this routine 307 if not self.do_minimum: 308 print "Doing full dictionary initialisation" 309 self.ddlm_parse_valid() #extract validity information from data block 310 self.transform_drel() #parse the drel functions 311 self.add_drel_funcs() #put the drel functions into the namespace 312 self.add_category_info() 313 # initialise type information 314 self.typedic={} 315 self.primdic = {} #typecode<->primitive type translation 316 self.add_type_info() 317 self.item_validation_funs = [ 318 self.validate_item_type, 319 self.validate_item_esd, 320 self.validate_item_enum, # functions which check conformance 321 self.validate_enum_range, 322 self.validate_looping] 323 self.loop_validation_funs = [ 324 self.validate_loop_membership, 325 self.validate_loop_key, 326 self.validate_loop_references] # functions checking loop values 327 self.global_validation_funs = [ 328 self.validate_exclusion, 329 self.validate_parent, 330 self.validate_child, 331 self.validate_dependents, 332 self.validate_uniqueness] # where we need to look at other values 333 self.block_validation_funs = [ # where only a full block will do 334 self.validate_mandatory_category] 335 self.global_remove_validation_funs = [ 336 self.validate_remove_parent_child] # removal is quicker with special checks 337 self.optimize = False # default value 338 self.done_parents = [] 339 self.done_children = [] 340 self.done_keys = [] 341 # debug 342 # j = open("dic_debug","w") 343 # j.write(self.__str__()) 344 # j.close() 345 346 def dic_determine(self,cifdic): 347 if cifdic.has_key("on_this_dictionary"): 348 self.master_key = "on_this_dictionary" 349 self.type_spec = "_type" 350 self.enum_spec = "_enumeration" 351 self.cat_spec = "_category" 352 self.esd_spec = "_type_conditions" 353 self.must_loop_spec = "_list" 354 self.must_exist_spec = "_list_mandatory" 355 self.list_ref_spec = "_list_reference" 356 self.unique_spec = "_list_uniqueness" 357 self.child_spec = "_list_link_child" 358 self.parent_spec = "_list_link_parent" 359 self.related_func = "_related_function" 360 self.related_item = "_related_item" 361 self.primitive_type = "_type" 362 self.dep_spec = "xxx" 363 self.cat_list = [] #to save searching all the time 364 name = cifdic["on_this_dictionary"]["_dictionary_name"] 365 version = cifdic["on_this_dictionary"]["_dictionary_version"] 366 return (name+version,"DDL1",cifdic) 367 elif len(cifdic.keys()) == 1: # DDL2/DDLm 368 self.master_key = cifdic.keys()[0] 369 name = cifdic[self.master_key]["_dictionary.title"] 370 version = cifdic[self.master_key]["_dictionary.version"] 371 if name != self.master_key: 372 print "Warning: DDL2 blockname %s not equal to dictionary name %s" % (self.master_key,name) 373 if cifdic[self.master_key].has_key("_dictionary.class"): #DDLm 374 self.unique_spec = "_category_key.generic" 375 return(name+version,"DDLm",cifdic[self.master_key]["saves"]) 376 #otherwise DDL2 377 self.type_spec = "_item_type.code" 378 self.enum_spec = "_item_enumeration.value" 379 self.esd_spec = "_item_type_conditions.code" 380 self.cat_spec = "_item.category_id" 381 self.loop_spec = "there_is_no_loop_spec!" 382 self.must_loop_spec = "xxx" 383 self.must_exist_spec = "_item.mandatory_code" 384 self.child_spec = "_item_linked.child_name" 385 self.parent_spec = "_item_linked.parent_name" 386 self.related_func = "_item_related.function_code" 387 self.related_item = "_item_related.related_name" 388 self.unique_spec = "_category_key.name" 389 self.list_ref_spec = "xxx" 390 self.primitive_type = "_type" 391 self.dep_spec = "_item_dependent.dependent_name" 392 return (name+version,"DDL2",cifdic[self.master_key]["saves"]) 393 else: 394 raise CifError, "Unable to determine dictionary DDL version" 395 396 def DDL1_normalise(self): 397 # add default type information in DDL2 style 398 # initial types and constructs 399 base_types = ["char","numb","null"] 400 prim_types = base_types[:] 401 base_constructs = [".*", 402 '(-?(([0-9]*[.][0-9]+)|([0-9]+)[.]?)([(][0-9]+[)])?([eEdD][+-]?[0-9]+)?)|\?|\.', 403 "\"\" "] 404 for key,value in self.dictionary.items(): 405 if value.has_key("_name"): 406 real_name = value["_name"] 407 if type(real_name) is ListType: #looped values 408 for looped_name in real_name: 409 new_value = value.copy() 410 new_value["_name"] = looped_name #only looped name 411 self.dictionary[looped_name] = new_value 412 else: self.dictionary[real_name] = value 413 # delete the old one 414 del self.dictionary[key] 415 # loop again to normalise the contents of each definition 416 for key,value in self.dictionary.items(): 417 # deal with a missing _list, _type_conditions 418 if not value.has_key("_list"): value["_list"] = 'no' 419 if not value.has_key("_type_conditions"): value["_type_conditions"] = 'none' 420 # deal with enumeration ranges 421 if value.has_key("_enumeration_range"): 422 max,min = self.getmaxmin(value["_enumeration_range"]) 423 if min == ".": 424 self.dictionary[key].AddLoopItem((("_item_range.maximum","_item_range.minimum"),((max,max),(max,min)))) 425 elif max == ".": 426 self.dictionary[key].AddLoopItem((("_item_range.maximum","_item_range.minimum"),((max,min),(min,min)))) 427 else: 428 self.dictionary[key].AddLoopItem((("_item_range.maximum","_item_range.minimum"),((max,max,min),(max,min,min)))) 429 #add any type construct information 430 if value.has_key("_type_construct"): 431 base_types.append(value["_name"]+"_type") #ie dataname_type 432 base_constructs.append(value["_type_construct"]+"$") 433 prim_types.append(value["_type"]) #keep a record 434 value["_type"] = base_types[-1] #the new type name 435 436 437 #make categories conform with ddl2 438 #note that we must remove everything from the last underscore 439 if value["_category"] == "category_overview": 440 last_under = value["_name"].rindex("_") 441 catid = value["_name"][1:last_under] 442 value["_category.id"] = catid #remove square bracks 443 if catid not in self.cat_list: self.cat_list.append(catid) 444 # we now add any missing categories before filling in the rest of the 445 # information 446 for key,value in self.dictionary.items(): 447 if self[key].has_key("_category"): 448 if self[key]["_category"] not in self.cat_list: 449 # rogue category, add it in 450 newcat = self[key]["_category"] 451 fake_name = "_" + newcat + "_[]" 452 newcatdata = CifBlock() 453 newcatdata["_category"] = "category_overview" 454 newcatdata["_category.id"] = newcat 455 newcatdata["_type"] = "null" 456 self[fake_name] = newcatdata 457 self.cat_list.append(newcat) 458 # write out the type information in DDL2 style 459 self.dic_as_cif[self.master_key].AddLoopItem(( 460 ("_item_type_list.code","_item_type_list.construct", 461 "_item_type_list.primitive_code"), 462 (base_types,base_constructs,prim_types) 463 )) 464 465 def DDL2_normalise(self): 466 listed_defs = filter(lambda a:isinstance(self[a].get('_item.name'),ListType),self.keys()) 467 # now filter out all the single element lists! 468 dodgy_defs = filter(lambda a:len(self[a]['_item.name']) > 1, listed_defs) 469 for item_def in dodgy_defs: 470 # print "DDL2 norm: processing %s" % item_def 471 thisdef = self[item_def] 472 packet_no = thisdef['_item.name'].index(item_def) 473 realcat = thisdef['_item.category_id'][packet_no] 474 realmand = thisdef['_item.mandatory_code'][packet_no] 475 # first add in all the missing categories 476 # we don't replace the entry in the list corresponding to the 477 # current item, as that would wipe out the information we want 478 for child_no in range(len(thisdef['_item.name'])): 479 if child_no == packet_no: continue 480 child_name = thisdef['_item.name'][child_no] 481 child_cat = thisdef['_item.category_id'][child_no] 482 child_mand = thisdef['_item.mandatory_code'][child_no] 483 if not self.has_key(child_name): 484 self[child_name] = CifBlock() 485 self[child_name]['_item.name'] = child_name 486 self[child_name]['_item.category_id'] = child_cat 487 self[child_name]['_item.mandatory_code'] = child_mand 488 self[item_def]['_item.name'] = item_def 489 self[item_def]['_item.category_id'] = realcat 490 self[item_def]['_item.mandatory_code'] = realmand 491 # go through any _item_linked tables 492 dodgy_defs = filter(lambda a:isinstance(self[a].get('_item_linked.child_name'),ListType),self.keys()) 493 dodgy_defs = filter(lambda a:len(self[a]['_item_linked.child_name']) > 1, dodgy_defs) 494 for item_def in dodgy_defs: 495 thisdef = self[item_def] 496 child_list = thisdef.get('_item_linked.child_name',[]) 497 parents = thisdef.get('_item_linked.parent_name',[]) 498 # zap the parents, they will confuse us!! 499 del thisdef['_item_linked.parent_name'] 500 if isinstance(child_list,StringType): 501 self[child_list]['_item_linked.parent_name'] = parents 502 self[parents]['_item_linked.child_name'] = child_list 503 else: 504 # for each parent, find the list of children. 505 family = map(None,parents,child_list) 506 notmychildren = family 507 while len(notmychildren): 508 # get all children of first entry 509 mychildren = filter(lambda a:a[0]==notmychildren[0][0],family) 510 # print "Parent %s: %d children" % (notmychildren[0][0],len(mychildren)) 511 for parent,child in mychildren: #parent is the same for all 512 self[child]['_item_linked.parent_name'] = parent 513 # put all the children into the parent 514 try: 515 del self[mychildren[0][0]]['_item_linked.child_name'] 516 except ValueError: pass 517 self[mychildren[0][0]]['_item_linked.child_name'] = map(lambda a:a[1],mychildren) 518 # now make a new,smaller list 519 notmychildren = filter(lambda a:a[0]!=mychildren[0][0],notmychildren) 520 # now flatten any single element lists 521 single_defs = filter(lambda a:len(self[a]['_item.name'])==1,listed_defs) 522 for flat_def in single_defs: 523 flat_keys = self[flat_def].GetLoop('_item.name').keys() 524 for flat_key in flat_keys: self[flat_def][flat_key] = self[flat_def][flat_key][0] 525 # now deal with the multiple lists 526 # next we do aliases 527 all_aliases = filter(lambda a:self[a].has_key('_item_aliases.alias_name'),self.keys()) 528 for aliased in all_aliases: 529 my_aliases = listify(self[aliased]['_item_aliases.alias_name']) 530 for alias in my_aliases: 531 self[alias] = self[aliased].copy() #we are going to delete stuff... 532 del self[alias]["_item_aliases.alias_name"] 533 534 def ddlm_normalise(self): 535 for key,value in self.dictionary.items(): 536 if value.has_key("_name.category_id"): 537 real_name = "_" + value["_name.category_id"] + "." + value["_name.object_id"] 538 self[real_name] = value 539 # delete the old one 540 del self[key] 541 542 def ddlm_parse_valid(self): 543 if not self.dic_as_cif[self.master_key].has_key("_dictionary_valid.scope"): 544 return 545 for scope_pack in self.dic_as_cif[self.master_key].GetLoop("_dictionary_valid.scope"): 546 scope = getattr(scope_pack,"_dictionary_valid.scope") 547 valid_info = getattr(scope_pack,"_dictionary_valid.attributes") 548 valid_info = valid_info.split() 549 for i in range(0,len(valid_info),2): 550 if valid_info[i]=="+": 551 self.scopes_mandatory[scope.lower()].append(valid_info[i+1].lower()) 552 elif valid_info[i]=="!": 553 self.scopes_naughty[scope.lower()].append(valid_info[i+1].lower()) 554 555 def ddlm_import(self): 556 import urllib 557 #first check the outermost datablocks. Note we expect our dREL 558 #machinery to create _import_list.id only if the individual values are available 559 #For this to happen, we need the ddl.dic to have been assigned 560 try: 561 to_be_imported = self.dic_as_cif[self.master_key]["_import_list.id"] 562 except KeyError: 563 pass 564 else: 565 # deal with foreshortened import blocks 566 for import_target in to_be_imported: 567 if len(import_target)==3: #default values have been left off 568 import_target.append('Exit') 569 import_target.append('Exit') 570 for scope,dict_block,file_loc,on_dupl,on_miss in to_be_imported: 571 scope = scope.lower() #work around capitalisation in draft dics 572 if scope == 'att' or scope == 'sta' or scope == 'val': 573 print 'Improper import directive at top level in %s: ignored' % self.master.key 574 continue 575 # resolve URI 576 full_uri = self.resolve_path(file_loc) 577 dic_as_cif = CifFile(urllib.urlopen(full_uri),grammar="DDLm") 578 import_from = CifDic(dic_as_cif,do_minimum=True) #this will recurse internal imports 579 # and now merge these definitions 580 if scope == "dic": 581 self.get_whole_dict(import_from,on_dupl,on_miss) 582 elif scope=="cat": 583 self.get_one_cat(import_from,dict_block,on_dupl,on_miss) 584 elif scope=="grp": 585 self.get_one_cat_with_children(import_from,dict_block,on_dupl,on_miss) 586 elif scope=="itm": #not clear what to do if category is missing 587 self.add_one_defn(import_from,dict_block,on_dupl,on_miss) 588 # it will never happen again... 589 del self.dic_as_cif[self.master_key]["_import_list.id"] 590 # next we resolve per-definition imports 591 for one_def in self.keys(): 592 try: 593 to_be_imported = self[one_def]["_import_list.id"] 594 except KeyError: 595 pass 596 else: 597 if len(to_be_imported) == 5 and len(to_be_imported[0])!=5: 598 #catch an error in earlier versions of the dictionaries where 599 #the outer brackets were missing 600 to_be_imported = [to_be_imported] 601 # deal with foreshortened import blocks 602 for import_target in to_be_imported: 603 if len(import_target)==3: #default values have been left off 604 import_target.append('Exit') 605 import_target.append('Exit') 606 for scope,block,file_loc,on_dupl,on_miss in to_be_imported: 607 scope = scope.lower() #work around capitalisation in draft dics 608 if scope == 'dic' or scope == 'cat' or scope == 'grp' or scope == "itm": 609 print 'Improper import directive at definition level in %s: ignored' % self.master.key 610 continue 611 full_uri = self.resolve_path(file_loc) 612 if full_uri not in self.template_cache: 613 dic_as_cif = CifFile(urllib.urlopen(full_uri),grammar="DDLm") 614 self.template_cache[full_uri] = CifDic(dic_as_cif,do_minimum=True) #this will recurse internal imports 615 print 'Added %s to cached dictionaries' % full_uri 616 import_from = self.template_cache[full_uri] 617 if scope == 'att': 618 self.import_attributes(one_def,import_from,block,on_dupl,on_miss) 619 elif scope == 'sta': 620 self.import_loop(one_def,import_from,block,'_enumeration_set.state',on_miss) 621 elif scope == 'val': 622 self.import_loop(one_def,import_from,block,'_enumeration_default.value',on_miss) 623 else: 624 raise CifError, "Unrecognised import scope %s" % scope 625 # remove the import attribute 626 del self[one_def]["_import_list.id"] 627 628 def resolve_path(self,file_loc): 629 import urlparse 630 url_comps = urlparse.urlparse(file_loc) 631 if url_comps[0]: return file_loc #already full URI 632 new_url = urlparse.urljoin(self.dic_as_cif.my_uri,file_loc) 633 print "Transformed %s to %s for import " % (file_loc,new_url) 634 return new_url 635 636 def get_whole_dict(self,source_dict,on_dupl,on_miss): 637 print "Cat_map: `%s`" % source_dict.cat_map.values() 638 for source_cat in source_dict.cat_map.values(): 639 self.get_one_cat(source_dict,source_cat,on_dupl,on_miss) 640 641 def get_one_cat(self,source_dict,source_cat,on_dupl,on_miss): 642 ext_cat = source_dict.get(source_cat,"") 643 this_cat = self.get(source_cat,"") 644 print "Adding category %s" % source_cat 645 if not ext_cat: 646 if on_miss == "Ignore": 647 pass 648 else: 649 raise CifError, "Missing category %s" % source_cat 650 else: 651 all_ext_defns = source_dict.keys() 652 cat_list = filter(lambda a:source_dict[a].get("_name.category_id","").lower()==source_cat.lower(), 653 all_ext_defns) 654 print "Items: %s" % `cat_list` 655 if this_cat: # The category block itself is duplicated 656 if on_dupl=="Ignore": 657 pass 658 elif on_dupl == "Exit": 659 raise CifError, "Duplicate category %s" % source_cat 660 else: 661 self[source_cat] = ext_cat 662 else: 663 self[source_cat] = ext_cat 664 # now do all member definitions 665 for cat_defn in cat_list: 666 self.add_one_defn(source_dict,cat_defn,on_dupl) 667 668 def add_one_defn(self,source_dict,cat_defn,on_dupl): 669 if self.has_key(cat_defn): 670 if on_dupl == "Ignore": pass 671 elif on_dupl == "Exit": 672 raise CifError, "Duplicate definition %s" % cat_defn 673 else: self[cat_defn] = source_dict[cat_defn] 674 else: self[cat_defn] = source_dict[cat_defn] 675 print " "+cat_defn 676 677 def get_one_cat_with_children(self,source_dict,source_cat,on_dupl,on_miss): 678 self.get_one_cat(source_dict,source_cat,on_dupl,on_miss) 679 child_cats = filter(lambda a:source_dict[a]["_category.parent_id"]==source_dict[source_cat]["_definition.id"],source_dict.cat_map.values()) 680 for child_cat in child_cats: self.get_one_cat(source_dict,child_cat,on_dupl,on_miss) 681 682 def import_attributes(self,mykey,source_dict,source_def,on_dupl,on_miss): 683 # process missing 684 if not source_dict.has_key(source_def): 685 if on_miss == 'Exit': 686 raise CifError, 'Missing definition for import %s' % source_def 687 else: return #nothing else to do 688 # now do the import 689 print 'Adding attributes from %s to %s' % (source_def,mykey) 690 self[mykey].merge(source_dict[source_def],mode='replace',match_att= \ 691 ['_definition.id','_name.category_id','_name.object_id']) 692 693 def import_loop(self,mykey,source_dict,source_def,loop_name,on_miss): 694 # process imssing 695 if not source_dict.has_key(source_def): 696 if on_miss == 'Exit': 697 raise CifError, 'Missing definition for import %s' % source_def 698 else: return #nothing else to do 699 print 'Adding %s attributes from %s to %s' % (loop_name,source_def,mykey) 700 state_loop = source_dict[source_def].GetLoop(loop_name) 701 self[mykey].insert_loop(state_loop) 702 703 704 def ddl1_cat_load(self): 705 deflist = self.keys() #slight optimization 706 cat_mand_dic = {} 707 cat_unique_dic = {} 708 # a function to extract any necessary information from each definition 709 def get_cat_info(single_def): 710 if self[single_def].get(self.must_exist_spec)=='yes': 711 thiscat = self[single_def]["_category"] 712 curval = cat_mand_dic.get(thiscat,[]) 713 curval.append(single_def) 714 cat_mand_dic[thiscat] = curval 715 # now the unique items... 716 # cif_core.dic throws us a curly one: the value of list_uniqueness is 717 # not the same as the defined item for publ_body_label, so we have 718 # to collect both together. We assume a non-listed entry, which 719 # is true for all current (May 2005) ddl1 dictionaries. 720 if self[single_def].get(self.unique_spec,None)!=None: 721 thiscat = self[single_def]["_category"] 722 new_unique = self[single_def][self.unique_spec] 723 uis = cat_unique_dic.get(thiscat,[]) 724 if single_def not in uis: uis.append(single_def) 725 if new_unique not in uis: uis.append(new_unique) 726 cat_unique_dic[thiscat] = uis 727 728 map(get_cat_info,deflist) # apply the above function 729 for cat in cat_mand_dic.keys(): 730 cat_entry = self.get_ddl1_entry(cat) 731 self[cat_entry]["_category_mandatory.name"] = cat_mand_dic[cat] 732 for cat in cat_unique_dic.keys(): 733 cat_entry = self.get_ddl1_entry(cat) 734 self[cat_entry]["_category_key.name"] = cat_unique_dic[cat] 735 736 # A helper function get find the entry corresponding to a given category name: 737 # yes, in DDL1 the actual name is different in the category block due to the 738 # addition of square brackets which may or may not contain stuff. 739 740 def get_ddl1_entry(self,cat_name): 741 chop_len = len(cat_name) 742 possibles = filter(lambda a:a[1:chop_len+3]==cat_name+"_[",self.keys()) 743 if len(possibles) > 1 or possibles == []: 744 raise ValidCifError, "Category name %s can't be matched to category entry" % cat_name 745 else: 746 return possibles[0] 747 748 def add_type_info(self): 749 if self.dic_as_cif[self.master_key].has_key("_item_type_list.construct"): 750 types = self.dic_as_cif[self.master_key]["_item_type_list.code"] 751 prim_types = self.dic_as_cif[self.master_key]["_item_type_list.primitive_code"] 752 constructs = map(lambda a: a + "$", self.dic_as_cif[self.master_key]["_item_type_list.construct"]) 753 # add in \r wherever we see \n, and change \{ to \\{ 754 def regex_fiddle(mm_regex): 755 brack_match = r"((.*\[.+)(\\{)(.*\].*))" 756 ret_match = r"((.*\[.+)(\\n)(.*\].*))" 757 fixed_regexp = mm_regex[:] #copy 758 # fix the brackets 759 bm = re.match(brack_match,mm_regex) 760 if bm != None: 761 fixed_regexp = bm.expand(r"\2\\\\{\4") 762 # fix missing \r 763 rm = re.match(ret_match,fixed_regexp) 764 if rm != None: 765 fixed_regexp = rm.expand(r"\2\3\\r\4") 766 #print "Regexp %s becomes %s" % (mm_regex,fixed_regexp) 767 return fixed_regexp 768 constructs = map(regex_fiddle,constructs) 769 packed_up = map(None,types,constructs) 770 for typecode,construct in packed_up: 771 self.typedic[typecode] = re.compile(construct,re.MULTILINE|re.DOTALL) 772 # now make a primitive <-> type construct mapping 773 packed_up = map(None,types,prim_types) 774 for typecode,primtype in packed_up: 775 self.primdic[typecode] = primtype 776 777 def add_category_info(self): 778 if self.diclang == "DDLm": 779 categories = filter(lambda a:self[a].get("_definition.scope","Item")=="Category",self.keys()) 780 category_ids = map(lambda a:self[a]["_definition.id"],categories) 781 782 783 else: 784 categories = filter(lambda a:self[a].has_key("_category.id"),self.keys()) 785 # get the category id 786 category_ids = map(lambda a:self[a]["_category.id"],categories) 787 788 # match ids and entries in the dictionary 789 catpairs = map(None,category_ids,categories) 790 self.cat_map = {} 791 for catid,cat in catpairs:self.cat_map[catid] = cat 792 793 def names_in_cat(self,cat): 794 nameblocks = filter(lambda a:self[a].get("_name.category_id","").lower() 795 ==cat.lower(),self.keys()) 796 return map(lambda a:"_" + self[a]["_name.category_id"]+"." + self[a]["_name.object_id"],nameblocks) 797 798 def get_key_pack(self,category,value,data): 799 keyname = self[category][self.unique_spec] 800 onepack = data.GetPackKey(keyname,value) 801 return onepack 802 803 def get_number_with_esd(numstring): 804 import string 805 numb_re = '((-?(([0-9]*[.]([0-9]+))|([0-9]+)[.]?))([(][0-9]+[)])?([eEdD][+-]?[0-9]+)?)|(\?)|(\.)' 806 our_match = re.match(numb_re,numstring) 807 if our_match: 808 a,base_num,b,c,dad,dbd,esd,exp,q,dot = our_match.groups() 809 # print "Debug: %s -> %s" % (numstring, `our_match.groups()`) 810 else: 811 return None,None 812 if dot or q: return None,None #a dot or question mark 813 if exp: #has exponent 814 exp = string.replace(exp,"d","e") # mop up old fashioned numbers 815 exp = string.replace(exp,"D","e") 816 base_num = base_num + exp 817 #print "Debug: have %s for base_num from %s" % (base_num,numstring) 818 base_num = float(base_num) 819 # work out esd, if present. 820 if esd: 821 esd = float(esd[1:-1]) # no brackets 822 if dad: # decimal point + digits 823 esd = esd * (10 ** (-1* len(dad))) 824 if exp: 825 esd = esd * (10 ** (float(exp[1:]))) 826 return base_num,esd 827 828 def getmaxmin(self,rangeexp): 829 regexp = '(-?(([0-9]*[.]([0-9]+))|([0-9]+)[.]?)([eEdD][+-]?[0-9]+)?)*' 830 regexp = regexp + ":" + regexp 831 regexp = re.match(regexp,rangeexp) 832 try: 833 minimum = regexp.group(1) 834 maximum = regexp.group(7) 835 except AttributeError: 836 print "Can't match %s" % rangeexp 837 if minimum == None: minimum = "." 838 else: minimum = float(minimum) 839 if maximum == None: maximum = "." 840 else: maximum = float(maximum) 841 return maximum,minimum 842 843 def transform_drel(self): 844 import drel_yacc 845 parser = drel_yacc.parser 846 my_namespace = self.keys() 847 my_namespace = dict(map(None,my_namespace,my_namespace)) 848 parser.loopable_cats = filter(lambda a:self[a].get("_definition.class","Datum")=="List",self.keys()) 849 parser.loopable_cats = map(lambda a:self[a]["_definition.id"],parser.loopable_cats) 850 parser.listable_items = filter(lambda a:"*" in self[a].get("_type.dimension",""),self.keys()) 851 derivable_list = filter(lambda a:self[a].has_key("_method.expression") and self[a].get("_definition.scope","")!='Category' and self[a].get("_name.category_id","")!= "function",self.keys()) 852 for derivable in derivable_list: 853 parser.target_id = derivable 854 # reset the list of visible names for parser 855 parser.special_id = [my_namespace] 856 # reset list of looped with statements 857 parser.withtable = {} 858 print "Target id: %s" % derivable 859 drel_expr = self[derivable]["_method.expression"] 860 if isinstance(drel_expr,ListType): 861 drel_expr = drel_expr[0] 862 print "Transforming %s" % drel_expr 863 # List categories are treated differently... 864 pyth_meth = parser.parse(drel_expr,debug=True) 865 self[derivable]["_loop_categories"] = pyth_meth[1].keys() 866 self[derivable]["_method.expression"] = drel_yacc.make_func(pyth_meth,"pyfunc",None) 867 print "Final result:\n " + self[derivable]["_method.expression"] 868 869 def add_drel_funcs(self): 870 import drel_yacc 871 funclist = filter(lambda a:self[a].get("_name.category_id","")=='function',self.keys()) 872 funcnames = map(lambda a:self[a]["_name.object_id"],funclist) 873 funcbodys = map(lambda a:self[a]["_method.expression"],funclist) 874 # create executable python code... 875 parser = drel_yacc.parser 876 for funcname,funcbody in zip(funcnames,funcbodys): 877 parser.target_id = funcname 878 parser.special_id = [{}] #first element is always global namespace of dictionary 879 parser.withtable = {} 880 res,ww = parser.parse(funcbody[0]) 881 print 'dREL library function ->\n' + res 882 global_table = globals() 883 global_table.update(self.ddlm_functions) 884 exec res in global_table #add to namespace 885 print "All functions -> " + `self.ddlm_functions` 886 887 def switch_numpy(self,to_val): 888 if to_val: 889 self.recursive_numerify = self.numpy_numerify 890 else: 891 self.recursive_numerify = self.normal_numerify 892 893 def derive_item(self,key,cifdata,store_value = False): 894 # store any default value in case we have a problem 895 def_val = self[key].get("_enumeration.default","") 896 def_index_val = self[key].get("_enumeration.def_index_id","") 897 the_func = self[key].get("_method.expression","") 898 if def_val and not the_func : return def_val 899 if def_index_val and not the_func: #derive a default value 900 index_vals = self[key]["_enumeration_default.index"] 901 val_to_index = cifdata[def_index_val] #what we are keying on 902 # Handle loops 903 if isinstance(val_to_index,ListType): 904 keypos = map(lambda a:index_vals.index(a),val_to_index) 905 result = map(lambda a:self[key]["_enumeration_default.value"][a] ,keypos) 906 else: 907 keypos = index_vals.index(val_to_index) #value error if no such value available 908 result = self[key]["_enumeration_default.value"][keypos] 909 print "Indexed on %s to get %s for %s" % (def_index_val,`result`,`val_to_index`) 910 return result 911 # read it in 912 the_category = self[key]["_name.category_id"] 913 the_type = self[the_category]["_definition.class"] 914 global_table = globals() 915 global_table.update(self.ddlm_functions) 916 exec the_func in global_table,locals() #will access dREL functions, puts "pyfunc" in scope 917 print 'Executing following function' 918 print the_func 919 print 'With following loop categories:' + `self[key].get("_loop_categories","")` 920 # print 'in following global environment: ' + `global_table` 921 if self[key].get("_loop_categories",""): 922 loop_category = self[key]["_loop_categories"][0] 923 loop_names = self.names_in_cat(loop_category) 924 no_of_packs = len(cifdata[loop_names[0]]) 925 packlist = [] 926 for pack_index in range(no_of_packs): 927 packlist.append(pyfunc(self,cifdata,pack_index)) 928 # now try to insert the new information into the right place 929 # find if items of this category already appear... 930 if store_value: 931 cat_names = filter(lambda a:self[a].get["_name.category_id",None]==the_category,self.keys()) 932 has_cat_names = filter(lambda a:cifdata.has_key(a),cat_names) 933 if len(has_cat_names)>0: 934 target_loop = cifdata.GetLoop(has_cat_names[0]) 935 target_loop[key] = packlist #lengths must match or else!! 936 else: 937 cifdata[key] = packlist 938 return packlist 939 else: # No looped categories 940 return pyfunc(self,cifdata) 941 942 def change_type(self,itemname,inval): 943 import numpy 944 # we need to iterate over the structure description. For now we deal only with 945 # Single and Array containers, with types that are a simple specification 946 item_type = self[itemname]["_type.contents"] 947 item_container = self[itemname]["_type.container"] 948 isnumeric = (item_type == "Real" or \ 949 item_type == "Float" or \ 950 item_type == "Count" or \ 951 item_type == "Integer" or \ 952 item_type == "Digit") 953 if not isnumeric: return inval # we don't attempt any changes 954 # even for a 'Single' container, it may be looped 955 # print 'Changing type for %s' % `inval` 956 if StarFile.get_dim(inval)[0] == 0: 957 if item_container == 'Single': return float_with_esd(inval) 958 if item_container == 'Array': 959 return self.recursive_numerify(inval) 960 else: 961 if item_container == 'Single': return map(float_with_esd,inval) 962 if item_container == 'Array': return map(self.recursive_numerify,inval) 963 964 # A utility function to recursively make all atomic values numeric 965 # All embedded values will be either StarTuples or StarLists 966 def normal_numerify(self,valarray): 967 # print 'Recursive evaluation of %s' % `valarray` 968 if isinstance(valarray,StarFile.StarTuple): 969 return StarFile.StarTuple(map(self.recursive_numerify,valarray)) 970 if isinstance(valarray,StarFile.StarList): 971 return StarFile.StarList(map(self.recursive_numerify,valarray)) 972 if isinstance(valarray,(StringType,IntType,LongType)): 973 return float_with_esd(valarray) 974 else: 975 return valarray #assume is OK 976 977 # Identical to the above except that a numpy array is returned. We 978 # do the normal_numerify call in order to perform the float conversion. 979 # 980 def numpy_numerify(self,valarray): 981 import numpy 982 return numpy.array(self.normal_numerify(valarray)) 983 984 def validate_item_type(self,item_name,item_value): 985 def mymatch(m,a): 986 res = m.match(a) 987 if res != None: return res.group() 988 else: return "" 989 target_type = self[item_name].get(self.type_spec) 990 if target_type == None: # e.g. a category definition 991 return {"result":True} # not restricted in any way 992 matchexpr = self.typedic[target_type] 993 item_values = listify(item_value) 994 #for item in item_values: 995 #print "Type match " + item_name + " " + item + ":", 996 #skip dots and question marks 997 check_all = filter(lambda a: a !="." and a != "?",item_values) 998 check_all = filter(lambda a: mymatch(matchexpr,a) != a, check_all) 999 if len(check_all)>0: return {"result":False,"bad_values":check_all} 1000 else: return {"result":True} 1001 1002 def validate_item_esd(self,item_name,item_value): 1003 if self[item_name].get(self.primitive_type) != 'numb': 1004 return {"result":None} 1005 can_esd = self[item_name].get(self.esd_spec,"none") == "esd" 1006 if can_esd: return {"result":True} #must be OK! 1007 item_values = listify(item_value) 1008 check_all = filter(lambda a: get_number_with_esd(a)[1] != None, item_values) 1009 if len(check_all)>0: return {"result":False,"bad_values":check_all} 1010 return {"result":True} 1011 1012 def validate_enum_range(self,item_name,item_value): 1013 if not self[item_name].has_key("_item_range.minimum") and \ 1014 not self[item_name].has_key("_item_range.maximum"): 1015 return {"result":None} 1016 minvals = self[item_name].get("_item_range.minimum",default = ["."]) 1017 maxvals = self[item_name].get("_item_range.maximum",default = ["."]) 1018 def makefloat(a): 1019 if a == ".": return a 1020 else: return float(a) 1021 maxvals = map(makefloat, maxvals) 1022 minvals = map(makefloat, minvals) 1023 rangelist = map(None,minvals,maxvals) 1024 item_values = listify(item_value) 1025 def map_check(rangelist,item_value): 1026 if item_value == "?" or item_value == ".": return True 1027 iv,esd = get_number_with_esd(item_value) 1028 if iv==None: return None #shouldn't happen as is numb type 1029 for lower,upper in rangelist: 1030 #check the minima 1031 if lower == ".": lower = iv - 1 1032 if upper == ".": upper = iv + 1 1033 if iv > lower and iv < upper: return True 1034 if upper == lower and iv == upper: return True 1035 # debug 1036 # print "Value %s fails range check %d < x < %d" % (item_value,lower,upper) 1037 return False 1038 check_all = filter(lambda a,b=rangelist: map_check(b,a) != True, item_values) 1039 if len(check_all)>0: return {"result":False,"bad_values":check_all} 1040 else: return {"result":True} 1041 1042 def validate_item_enum(self,item_name,item_value): 1043 try: 1044 enum_list = self[item_name][self.enum_spec][:] 1045 except KeyError: 1046 return {"result":None} 1047 enum_list.append(".") #default value 1048 enum_list.append("?") #unknown 1049 item_values = listify(item_value) 1050 #print "Enum check: %s in %s" % (`item_values`,`enum_list`) 1051 check_all = filter(lambda a: a not in enum_list,item_values) 1052 if len(check_all)>0: return {"result":False,"bad_values":check_all} 1053 else: return {"result":True} 1054 1055 def validate_looping(self,item_name,item_value): 1056 try: 1057 must_loop = self[item_name][self.must_loop_spec] 1058 except KeyError: 1059 return {"result":None} 1060 if must_loop == 'yes' and isinstance(item_value,StringType): # not looped 1061 return {"result":False} #this could be triggered 1062 if must_loop == 'no' and not isinstance(item_value,StringType): 1063 return {"result":False} 1064 return {"result":True} 1065 1066 1067 def validate_loop_membership(self,loop_names): 1068 try: 1069 categories = map(lambda a:self[a][self.cat_spec],loop_names) 1070 except KeyError: #category is mandatory 1071 raise ValidCifError( "%s missing from dictionary %s for item in loop containing %s" % (self.cat_spec,self.dicname,loop_names[0])) 1072 bad_items = filter(lambda a:a != categories[0],categories) 1073 if len(bad_items)>0: 1074 return {"result":False,"bad_items":bad_items} 1075 else: return {"result":True} 1076 1077 def validate_loop_key(self,loop_names): 1078 category = self[loop_names[0]][self.cat_spec] 1079 # find any unique values which must be present 1080 entry_name = self.cat_map[category] 1081 key_spec = self[entry_name].get("_category_mandatory.name",[]) 1082 for names_to_check in key_spec: 1083 if isinstance(names_to_check,StringType): #only one 1084 names_to_check = [names_to_check] 1085 for loop_key in names_to_check: 1086 if loop_key not in loop_names: 1087 #is this one of those dang implicit items? 1088 if self[loop_key].get(self.must_exist_spec,None) == "implicit": 1089 continue #it is virtually there... 1090 alternates = self.get_alternates(loop_key) 1091 if alternates == []: 1092 return {"result":False,"bad_items":loop_key} 1093 for alt_names in alternates: 1094 alt = filter(lambda a:a in loop_names,alt_names) 1095 if len(alt) == 0: 1096 return {"result":False,"bad_items":loop_key} # no alternates 1097 return {"result":True} 1098 1099 def validate_loop_references(self,loop_names): 1100 must_haves = map(lambda a:self[a].get(self.list_ref_spec,None),loop_names) 1101 must_haves = filter(lambda a:a != None,must_haves) 1102 # build a flat list. For efficiency we don't remove duplicates,as 1103 # we expect no more than the order of 10 or 20 looped names. 1104 def flat_func(a,b): 1105 if isinstance(b,StringType): 1106 a.append(b) #single name 1107 else: 1108 a.extend(b) #list of names 1109 return a 1110 flat_mh = reduce(flat_func,must_haves,[]) 1111 group_mh = filter(lambda a:a[-1]=="_",flat_mh) 1112 single_mh = filter(lambda a:a[-1]!="_",flat_mh) 1113 res = filter(lambda a: a not in loop_names,single_mh) 1114 def check_gr(s_item, name_list): 1115 nl = map(lambda a:a[:len(s_item)],name_list) 1116 if s_item in nl: return True 1117 return False 1118 res_g = filter(lambda a:check_gr(a,loop_names),group_mh) 1119 if len(res) == 0 and len(res_g) == 0: return {"result":True} 1120 # construct alternate list 1121 alternates = map(lambda a: (a,self.get_alternates(a)),res) 1122 alternates = filter(lambda a:a[1] != [], alternates) 1123 # next two lines purely for error reporting 1124 missing_alts = filter(lambda a: a[1] == [], alternates) 1125 missing_alts = map(lambda a:a[0],missing_alts) 1126 if len(alternates) != len(res): 1127 return {"result":False,"bad_items":missing_alts} #short cut; at least one 1128 #doesn't have an altern 1129 #loop over alternates 1130 for orig_name,alt_names in alternates: 1131 alt = filter(lambda a:a in loop_names,alt_names) 1132 if len(alt) == 0: return {"result":False,"bad_items":orig_name}# no alternates 1133 return {"result":True} #found alternates 1134 1135 def get_alternates(self,main_name,exclusive_only=False): 1136 alternates = self[main_name].get(self.related_func,None) 1137 alt_names = [] 1138 if alternates != None: 1139 alt_names = self[main_name].get(self.related_item,None) 1140 if isinstance(alt_names,StringType): 1141 alt_names = [alt_names] 1142 alternates = [alternates] 1143 together = map(None,alt_names,alternates) 1144 if exclusive_only: 1145 alt_names = filter(lambda a:a[1]=="alternate_exclusive" \ 1146 or a[1]=="replace", together) 1147 else: 1148 alt_names = filter(lambda a:a[1]=="alternate" or a[1]=="replace",together) 1149 alt_names = map(lambda a:a[0],alt_names) 1150 # now do the alias thing 1151 alias_names = listify(self[main_name].get("_item_aliases.alias_name",[])) 1152 alt_names.extend(alias_names) 1153 # print "Alternates for %s: %s" % (main_name,`alt_names`) 1154 return alt_names 1155 1156 1157 def validate_exclusion(self,item_name,item_value,whole_block,provisional_items={},globals={}): 1158 alternates = map(lambda a:a.lower(),self.get_alternates(item_name,exclusive_only=True)) 1159 item_name_list = map(lambda a:a.lower(),whole_block.keys()) 1160 item_name_list.extend(map(lambda a:a.lower(),provisional_items.keys())) 1161 item_name_list.extend(map(lambda a:a.lower(),globals.keys())) 1162 bad = filter(lambda a:a in item_name_list,alternates) 1163 if len(bad)>0: 1164 print "Bad: %s, alternates %s" % (`bad`,`alternates`) 1165 return {"result":False,"bad_items":bad} 1166 else: return {"result":True} 1167 1168 # validate that parent exists and contains matching values 1169 def validate_parent(self,item_name,item_value,whole_block,provisional_items={},globals={}): 1170 parent_item = self[item_name].get(self.parent_spec) 1171 if not parent_item: return {"result":None} #no parent specified 1172 if isinstance(parent_item,ListType): 1173 parent_item = parent_item[0] 1174 if self.optimize: 1175 if parent_item in self.done_parents: 1176 return {"result":None} 1177 else: 1178 self.done_parents.append(parent_item) 1179 print "Done parents %s" % `self.done_parents` 1180 # initialise parent/child values 1181 if isinstance(item_value,StringType): 1182 child_values = [item_value] 1183 else: child_values = item_value[:] #copy for safety 1184 # track down the parent 1185 # print "Looking for %s parent item %s in %s" % (item_name,parent_item,`whole_block`) 1186 # if globals contains the parent values, we are doing a DDL2 dictionary, and so 1187 # we have collected all parent values into the global block - so no need to search 1188 # for them elsewhere. 1189 # print "Looking for %s" % `parent_item` 1190 parent_values = globals.get(parent_item) 1191 if not parent_values: 1192 parent_values = provisional_items.get(parent_item,whole_block.get(parent_item)) 1193 if not parent_values: 1194 # go for alternates 1195 namespace = whole_block.keys() 1196 namespace.extend(provisional_items.keys()) 1197 namespace.extend(globals.keys()) 1198 alt_names = filter_present(self.get_alternates(parent_item),namespace) 1199 if len(alt_names) == 0: 1200 if len(filter(lambda a:a != "." and a != "?",child_values))>0: 1201 return {"result":False,"parent":parent_item}#no parent available -> error 1202 else: 1203 return {"result":None} #maybe True is more appropriate?? 1204 parent_item = alt_names[0] #should never be more than one?? 1205 parent_values = provisional_items.get(parent_item,whole_block.get(parent_item)) 1206 if not parent_values: # check global block 1207 parent_values = globals.get(parent_item) 1208 if isinstance(parent_values,StringType): 1209 parent_values = [parent_values] 1210 #print "Checking parent %s against %s, values %s/%s" % (parent_item, 1211 # item_name,`parent_values`,`child_values`) 1212 missing = self.check_parent_child(parent_values,child_values) 1213 if len(missing) > 0: 1214 return {"result":False,"bad_values":missing,"parent":parent_item} 1215 return {"result":True} 1216 1217 def validate_child(self,item_name,item_value,whole_block,provisional_items={},globals={}): 1218 try: 1219 child_items = self[item_name][self.child_spec][:] #copy 1220 except KeyError: 1221 return {"result":None} #not relevant 1222 # special case for dictionaries -> we check parents of children only 1223 if globals.has_key(item_name): #dictionary so skip 1224 return {"result":None} 1225 if isinstance(child_items,StringType): # only one child 1226 child_items = [child_items] 1227 if isinstance(item_value,StringType): # single value 1228 parent_values = [item_value] 1229 else: parent_values = item_value[:] 1230 # expand child list with list of alternates 1231 for child_item in child_items[:]: 1232 child_items.extend(self.get_alternates(child_item)) 1233 # now loop over the children 1234 for child_item in child_items: 1235 if self.optimize: 1236 if child_item in self.done_children: 1237 return {"result":None} 1238 else: 1239 self.done_children.append(child_item) 1240 print "Done children %s" % `self.done_children` 1241 if provisional_items.has_key(child_item): 1242 child_values = provisional_items[child_item][:] 1243 elif whole_block.has_key(child_item): 1244 child_values = whole_block[child_item][:] 1245 else: continue 1246 if isinstance(child_values,StringType): 1247 child_values = [child_values] 1248 # print "Checking child %s against %s, values %s/%s" % (child_item, 1249 # item_name,`child_values`,`parent_values`) 1250 missing = self.check_parent_child(parent_values,child_values) 1251 if len(missing)>0: 1252 return {"result":False,"bad_values":missing,"child":child_item} 1253 return {"result":True} #could mean that no child items present 1254 1255 #a generic checker: all child vals should appear in parent_vals 1256 def check_parent_child(self,parent_vals,child_vals): 1257 # shield ourselves from dots and question marks 1258 pv = parent_vals[:] 1259 pv.extend([".","?"]) 1260 res = filter(lambda a:a not in pv,child_vals) 1261 #print "Missing: %s" % res 1262 return res 1263 1264 def validate_remove_parent_child(self,item_name,whole_block): 1265 try: 1266 child_items = self[item_name][self.child_spec] 1267 except KeyError: 1268 return {"result":None} 1269 if isinstance(child_items,StringType): # only one child 1270 child_items = [child_items] 1271 for child_item in child_items: 1272 if whole_block.has_key(child_item): 1273 return {"result":False,"child":child_item} 1274 return {"result":True} 1275 1276 def validate_dependents(self,item_name,item_value,whole_block,prov={},globals={}): 1277 try: 1278 dep_items = self[item_name][self.dep_spec][:] 1279 except KeyError: 1280 return {"result":None} #not relevant 1281 if isinstance(dep_items,StringType): 1282 dep_items = [dep_items] 1283 actual_names = whole_block.keys() 1284 actual_names.extend(prov.keys()) 1285 actual_names.extend(globals.keys()) 1286 missing = filter(lambda a:a not in actual_names,dep_items) 1287 if len(missing) > 0: 1288 alternates = map(lambda a:[self.get_alternates(a),a],missing) 1289 # compact way to get a list of alternative items which are 1290 # present 1291 have_check = map(lambda b:[filter_present(b[0],actual_names), 1292 b[1]],alternates) 1293 have_check = filter(lambda a:len(a[0])==0,have_check) 1294 if len(have_check) > 0: 1295 have_check = map(lambda a:a[1],have_check) 1296 return {"result":False,"bad_items":have_check} 1297 return {"result":True} 1298 1299 def validate_uniqueness(self,item_name,item_value,whole_block,provisional_items={}, 1300 globals={}): 1301 category = self[item_name].get(self.cat_spec) 1302 if category == None: 1303 print "No category found for %s" % item_name 1304 return {"result":None} 1305 # print "Category %s for item %s" % (`category`,item_name) 1306 catentry = self.cat_map[category] 1307 # we make a copy in the following as we will be removing stuff later! 1308 unique_i = self[catentry].get("_category_key.name",[])[:] 1309 if isinstance(unique_i,StringType): 1310 unique_i = [unique_i] 1311 if item_name not in unique_i: #no need to verify 1312 return {"result":None} 1313 if isinstance(item_value,StringType): #not looped 1314 return {"result":None} 1315 # print "Checking %s -> %s -> %s ->Unique: " % (item_name,category,catentry) + `unique_i` 1316 # check that we can't optimize by not doing this check 1317 if self.optimize: 1318 if unique_i in self.done_keys: 1319 return {"result":None} 1320 else: 1321 self.done_keys.append(unique_i) 1322 val_list = [] 1323 # get the matching data from any other data items 1324 unique_i.remove(item_name) 1325 other_data = [] 1326 if len(unique_i) > 0: # i.e. do have others to think about 1327 for other_name in unique_i: 1328 # we look for the value first in the provisional dict, then the main block 1329 # the logic being that anything in the provisional dict overrides the 1330 # main block 1331 if provisional_items.has_key(other_name): 1332 other_data.append(provisional_items[other_name]) 1333 elif whole_block.has_key(other_name): 1334 other_data.append(whole_block[other_name]) 1335 elif self[other_name].get(self.must_exist_spec)=="implicit": 1336 other_data.append([item_name]*len(item_value)) #placeholder 1337 else: 1338 return {"result":False,"bad_items":other_name}#missing data name 1339 # ok, so we go through all of our values 1340 # this works by comparing lists of strings to one other, and 1341 # so could be fooled if you think that '1.' and '1' are 1342 # identical 1343 for i in range(len(item_value)): 1344 #print "Value no. %d" % i , 1345 this_entry = item_value[i] 1346 for j in range(len(other_data)): 1347 this_entry = " ".join([this_entry,other_data[j][i]]) 1348 #print "Looking for %s in %s: " % (`this_entry`,`val_list`) 1349 if this_entry in val_list: 1350 return {"result":False,"bad_values":this_entry} 1351 val_list.append(this_entry) 1352 return {"result":True} 1353 1354 1355 def validate_mandatory_category(self,whole_block,globals={},fake_mand=False): 1356 if fake_mand: 1357 return {"result":True} 1358 mand_cats = filter(lambda a:self[a].get("_category.mandatory_code","no")=="yes", 1359 self.keys()) 1360 # map to actual ids 1361 catlist = self.cat_map.items() 1362 # print "Mandatory categories - %s" % `mand_cats` 1363 all_keys = whole_block.keys() #non-save block keys 1364 if globals: # 1365 all_keys.extend(globals.abs_all_keys) 1366 for mand_cat in mand_cats: 1367 cat_id = filter(lambda a:a[1]==mand_cat,catlist)[0][0] 1368 no_of_items = len(filter(lambda a:self[a].get(self.cat_spec)==cat_id, 1369 all_keys)) 1370 if no_of_items == 0: 1371 return {"result":False,"bad_items":cat_id} 1372 return {"result":True} 1373 1374 def find_prob_cats(self,whole_block): 1375 mand_cats = filter(lambda a:self[a].get("_category.mandatory_code","no")=="yes", 1376 self.keys()) 1377 # map to actual ids 1378 catlist = self.cat_map.items() 1379 # find missing categories 1380 wbs = whole_block["saves"] 1381 abs_all_keys = whole_block.keys() 1382 abs_all_keys.extend(reduce(lambda a,b:a+(wbs[b].keys()),wbs.keys(),[])) 1383 prob_cats = [] 1384 for mand_cat in mand_cats: 1385 cat_id = filter(lambda a:a[1]==mand_cat,catlist)[0][0] 1386 1387 if len(filter(lambda a:self[a].get(self.cat_spec)==cat_id,abs_all_keys))==0: 1388 prob_cats.append(cat_id) 1389 if len(prob_cats) > 0: 1390 return (False,{'whole_block':[('validate_mandatory_category',{"result":False,"bad_items":problem_cats})]}) 1391 else: 1392 return (True,{}) 1393 1394 1395 def run_item_validation(self,item_name,item_value): 1396 return {item_name:map(lambda f:(f.__name__,f(item_name,item_value)),self.item_validation_funs)} 1397 1398 def run_loop_validation(self,loop_names): 1399 return {loop_names[0]:map(lambda f:(f.__name__,f(loop_names)),self.loop_validation_funs)} 1400 1401 def run_global_validation(self,item_name,item_value,data_block,provisional_items={},globals={}): 1402 results = map(lambda f:(f.__name__,f(item_name,item_value,data_block,provisional_items,globals)),self.global_validation_funs) 1403 return {item_name:results} 1404 1405 def run_block_validation(self,whole_block,globals={},fake_mand=False): 1406 results = map(lambda f:(f.__name__,f(whole_block,globals,fake_mand)),self.block_validation_funs) 1407 # fix up the return values 1408 return {"whole_block":results} 1409 1410 def optimize_on(self): 1411 self.optimize = True 1412 self.done_keys = [] 1413 self.done_children = [] 1414 self.done_parents = [] 1415 1416 def optimize_off(self): 1417 self.optimize = False 1418 self.done_keys = [] 1419 self.done_children = [] 1420 self.done_parents = [] 1421 1422 1423 class ValidCifBlock(CifBlock): 1424 def __init__(self,dic = None, diclist=[], mergemode = "replace",*args,**kwords): 1425 CifBlock.__init__(self,*args,**kwords) 1426 if dic and diclist: 1427 print "Warning: diclist argument ignored when initialising ValidCifBlock" 1428 if isinstance(dic,CifDic): 1429 self.fulldic = dic 1430 else: 1431 raise TypeError( "ValidCifBlock passed non-CifDic type in dic argument") 1432 if len(diclist)==0 and not dic: 1433 raise ValidCifError( "At least one dictionary must be specified") 1434 if diclist and not dic: 1435 self.fulldic = merge_dic(diclist,mergemode) 1436 if not self.run_data_checks()[0]: 1437 raise ValidCifError( self.report()) 1438 1439 def run_data_checks(self,verbose=False): 1440 self.v_result = {} 1441 self.fulldic.optimize_on() 1442 for dataname in self.keys(): 1443 update_value(self.v_result,self.fulldic.run_item_validation(dataname,self[dataname])) 1444 update_value(self.v_result,self.fulldic.run_global_validation(dataname,self[dataname],self)) 1445 for loop in self.loops: 1446 update_value(self.v_result,self.fulldic.run_loop_validation(loop.keys())) 1447 # now run block-level checks 1448 update_value(self.v_result,self.fulldic.run_block_validation(self)) 1449 # return false and list of baddies if anything didn't match 1450 self.fulldic.optimize_off() 1451 for test_key in self.v_result.keys(): 1452 #print "%s: %s" % (test_key,`self.v_result[test_key]`) 1453 self.v_result[test_key] = filter(lambda a:a[1]["result"]==False,self.v_result[test_key]) 1454 if len(self.v_result[test_key]) == 0: 1455 del self.v_result[test_key] 1456 isvalid = len(self.v_result)==0 1457 #if not isvalid: 1458 # print "Baddies:" + `self.v_result` 1459 return isvalid,self.v_result 1460 1461 def single_item_check(self,item_name,item_value): 1462 #self.match_single_item(item_name) 1463 if not self.fulldic.has_key(item_name): 1464 result = {item_name:[]} 1465 else: 1466 result = self.fulldic.run_item_validation(item_name,item_value) 1467 baddies = filter(lambda a:a[1]["result"]==False, result[item_name]) 1468 # if even one false one is found, this should trigger 1469 isvalid = (len(baddies) == 0) 1470 # if not isvalid: print "Failures for %s:" % item_name + `baddies` 1471 return isvalid,baddies 1472 1473 def loop_item_check(self,loop_names): 1474 in_dic_names = filter(lambda a:self.fulldic.has_key(a),loop_names) 1475 if len(in_dic_names)==0: 1476 result = {loop_names[0]:[]} 1477 else: 1478 result = self.fulldic.run_loop_validation(in_dic_names) 1479 baddies = filter(lambda a:a[1]["result"]==False,result[in_dic_names[0]]) 1480 # if even one false one is found, this should trigger 1481 isvalid = (len(baddies) == 0) 1482 # if not isvalid: print "Failures for %s:" % `loop_names` + `baddies` 1483 return isvalid,baddies 1484 1485 def global_item_check(self,item_name,item_value,provisional_items={}): 1486 if not self.fulldic.has_key(item_name): 1487 result = {item_name:[]} 1488 else: 1489 result = self.fulldic.run_global_validation(item_name, 1490 item_value,self,provisional_items = provisional_items) 1491 baddies = filter(lambda a:a[1]["result"]==False,result[item_name]) 1492 # if even one false one is found, this should trigger 1493 isvalid = (len(baddies) == 0) 1494 # if not isvalid: print "Failures for %s:" % item_name + `baddies` 1495 return isvalid,baddies 1496 1497 def remove_global_item_check(self,item_name): 1498 if not self.fulldic.has_key(item_name): 1499 result = {item_name:[]} 1500 else: 1501 result = self.fulldic.run_remove_global_validation(item_name,self,False) 1502 baddies = filter(lambda a:a[1]["result"]==False,result[item_name]) 1503 # if even one false one is found, this should trigger 1504 isvalid = (len(baddies) == 0) 1505 # if not isvalid: print "Failures for %s:" % item_name + `baddies` 1506 return isvalid,baddies 1507 1508 def AddToLoop(self,dataname,loopdata): 1509 # single item checks 1510 paired_data = loopdata.items() 1511 for name,value in paired_data: 1512 valid,problems = self.single_item_check(name,value) 1513 self.report_if_invalid(valid,problems) 1514 # loop item checks; merge with current loop 1515 found = 0 1516 for aloop in self.block["loops"]: 1517 if aloop.has_key(dataname): 1518 loopnames = aloop.keys() 1519 for new_name in loopdata.keys(): 1520 if new_name not in loopnames: loopnames.append(new_name) 1521 valid,problems = self.looped_item_check(loopnames) 1522 self.report_if_invalid(valid,problems) 1523 prov_dict = loopdata.copy() 1524 for name,value in paired_data: 1525 del prov_dict[name] # remove temporarily 1526 valid,problems = self.global_item_check(name,value,prov_dict) 1527 prov_dict[name] = value # add back in 1528 self.report_if_invalid(valid,problems) 1529 CifBlock.AddToLoop(self,dataname,loopdata) 1530 1531 def AddCifItem(self,data): 1532 if isinstance(data[0],StringType): # single item 1533 valid,problems = self.single_item_check(data[0],data[1]) 1534 self.report_if_invalid(valid,problems,data[0]) 1535 valid,problems = self.global_item_check(data[0],data[1]) 1536 self.report_if_invalid(valid,problems,data[0]) 1537 elif isinstance(data[0],TupleType) or isinstance(data[0],ListType): 1538 paired_data = map(None,data[0],data[1]) 1539 for name,value in paired_data: 1540 valid,problems = self.single_item_check(name,value) 1541 self.report_if_invalid(valid,problems,name) 1542 valid,problems = self.loop_item_check(data[0]) 1543 self.report_if_invalid(valid,problems,data[0]) 1544 prov_dict = {} # for storing temporary items 1545 for name,value in paired_data: prov_dict[name]=value 1546 for name,value in paired_data: 1547 del prov_dict[name] # remove temporarily 1548 valid,problems = self.global_item_check(name,value,prov_dict) 1549 prov_dict[name] = value # add back in 1550 self.report_if_invalid(valid,problems,name) 1551 CifBlock.AddCifItem(self,data) 1552 1553 # utility function 1554 def report_if_invalid(self,valid,bad_list,data_name): 1555 if not valid: 1556 error_string = reduce(lambda a,b: a + "," + b[0], bad_list, "") 1557 error_string = `data_name` + " fails following validity checks: " + error_string 1558 raise ValidCifError( error_string) 1559 1560 def __delitem__(self,key): 1561 # we don't need to run single item checks; we do need to run loop and 1562 # global checks. 1563 if self.has_key(key): 1564 try: 1565 loop_items = self.GetLoop(key) 1566 except TypeError: 1567 loop_items = [] 1568 if loop_items: #need to check loop conformance 1569 loop_names = map(lambda a:a[0],loop_items) 1570 loop_names = filter(lambda a:a != key,loop_names) 1571 valid,problems = self.loop_item_check(loop_names) 1572 self.report_if_invalid(valid,problems) 1573 valid,problems = self.remove_global_item_check(key) 1574 self.report_if_invalid(valid,problems) 1575 self.RemoveCifItem(key) 1576 1577 1578 def report(self): 1579 import cStringIO 1580 outstr = cStringIO.StringIO() 1581 outstr.write( "Validation results\n") 1582 outstr.write( "------------------\n") 1583 print "%d invalid items found\n" % len(self.v_result) 1584 for item_name,val_func_list in self.v_result.items(): 1585 outstr.write("%s fails following tests:\n" % item_name) 1586 for val_func in val_func_list: 1587 outstr.write("\t%s\n") 1588 return outstr.getvalue() 1589 1590 1591 class ValidCifFile(CifFile): 1592 def __init__(self,dic=None,diclist=[],mergemode="replace",*args,**kwargs): 1593 if not diclist and not dic and not hasattr(self,'bigdic'): 1594 raise ValidCifError( "At least one dictionary is required to create a ValidCifFile object") 1595 if not dic and diclist: #merge here for speed 1596 self.bigdic = merge_dic(diclist,mergemode) 1597 elif dic and not diclist: 1598 self.bigdic = dic 1599 CifFile.__init__(self,*args,**kwargs) 1600 #for blockname in self.keys(): 1601 # self.dictionary[blockname]=ValidCifBlock(data=self.dictionary[blockname],dic=self.bigdic) 1602 1603 def NewBlock(self,blockname,blockcontents,**kwargs): 1604 CifFile.NewBlock(self,blockname,blockcontents,**kwargs) 1605 # dictionary[blockname] is now a CifBlock object. We 1606 # turn it into a ValidCifBlock object 1607 self.dictionary[blockname] = ValidCifBlock(dic=self.bigdic, 1608 data=self.dictionary[blockname]) 1609 1610 1611 class ValidationResult: 1612 """Represents validation result""" 1613 def __init__(self,results): 1614 """results is return value of validate function""" 1615 self.valid_result, self.no_matches = results 1616 1617 def report(self,use_html): 1618 """Return string with human-readable description of validation result""" 1619 return validate_report((self.valid_result, self.no_matches),use_html) 1620 1621 def is_valid(self,block_name=None): 1622 """Return True for valid CIF file, otherwise False""" 1623 if block_name is not None: 1624 block_names = [block_name] 1625 else: 1626 block_names = self.valid_result.iterkeys() 1627 for block_name in block_names: 1628 if not self.valid_result[block_name] == (True,{}): 1629 valid = False 1630 break 1631 else: 1632 valid = True 1633 return valid 1634 1635 def has_no_match_items(self,block_name=None): 1636 """Return true if some items are not found in dictionary""" 1637 if block_name is not None: 1638 block_names = [block_name] 1639 else: 1640 block_names = self.no_matches.iter_keys() 1641 for block_name in block_names: 1642 if self.no_matches[block_name]: 1643 has_no_match_items = True 1644 break 1645 else: 1646 has_no_match_items = False 1647 return has_no_match_items 1648 1649 1650 1651 def validate(ciffile,dic = "", diclist=[],mergemode="replace",isdic=False,fake_mand=True): 1652 check_file = CifFile(ciffile) 1653 if not dic: 1654 fulldic = merge_dic(diclist,mergemode) 1655 else: 1656 fulldic = dic 1657 no_matches = {} 1658 valid_result = {} 1659 if isdic: #assume one block only 1660 blockname = check_file.keys()[0] 1661 check_bc = check_file[blockname]["saves"] 1662 check_globals = check_file[blockname] 1663 # collect a list of parents for speed 1664 poss_parents = fulldic.get_all("_item_linked.parent_name") 1665 for parent in poss_parents: 1666 curr_parent = listify(check_globals.get(parent,[])) 1667 new_vals = check_bc.get_all(parent) 1668 new_vals.extend(curr_parent) 1669 if len(new_vals)>0: 1670 check_globals[parent] = new_vals 1671 # print "Added %s (len %d)" % (parent,len(check_globals[parent])) 1672 # next dictionary problem: the main DDL2 dictionary has what 1673 # I would characterise as a mandatory_category problem, but 1674 # in order to gloss over it, we allow a different 1675 # interpretation, which requires only a single check for one 1676 # block. 1677 if fake_mand: 1678 valid_result[blockname] = fulldic.find_prob_cats(check_globals) 1679 no_matches[blockname] = filter(lambda a:not fulldic.has_key(a),check_globals.keys()) 1680 else: 1681 check_bc = check_file 1682 check_globals = CifBlock() #empty 1683 for block in check_bc.keys(): 1684 #print "Validating block %s" % block 1685 no_matches[block] = filter(lambda a:not fulldic.has_key(a),check_bc[block].keys()) 1686 # remove non-matching items 1687 # print "Not matched: " + `no_matches[block]` 1688 for nogood in no_matches[block]: 1689 del check_bc[block][nogood] 1690 valid_result[block] = run_data_checks(check_bc[block],fulldic,globals=check_globals,fake_mand=fake_mand) 1691 return valid_result,no_matches 1692 1693 def validate_report(val_result,use_html=False): 1694 import cStringIO 1695 valid_result,no_matches = val_result 1696 outstr = cStringIO.StringIO() 1697 if use_html: 1698 outstr.write("<h2>Validation results</h2>") 1699 else: 1700 outstr.write( "Validation results\n") 1701 outstr.write( "------------------\n") 1702 if len(valid_result) > 10: 1703 suppress_valid = True #don't clutter with valid messages 1704 if use_html: 1705 outstr.write("<p>For brevity, valid blocks are not reported in the output.</p>") 1706 else: 1707 suppress_valid = False 1708 for block in valid_result.keys(): 1709 block_result = valid_result[block] 1710 if block_result[0]: 1711 out_line = "Block '%s' is VALID" % block 1712 else: 1713 out_line = "Block '%s' is INVALID" % block 1714 if use_html: 1715 if (block_result[0] and (not suppress_valid or len(no_matches[block])>0)) or not block_result[0]: 1716 outstr.write( "<h3>%s</h3><p>" % out_line) 1717 else: 1718 outstr.write( "\n %s\n" % out_line) 1719 if len(no_matches[block])!= 0: 1720 if use_html: 1721 outstr.write( "<p>The following items were not found in the dictionary") 1722 outstr.write(" (note that this does not invalidate the data block):</p>") 1723 outstr.write("<p><table>\n") 1724 map(lambda it:outstr.write("<tr><td>%s</td></tr>" % it),no_matches[block]) 1725 outstr.write("</table>\n") 1726 else: 1727 outstr.write( "\n The following items were not found in the dictionary:\n") 1728 outstr.write("Note that this does not invalidate the data block\n") 1729 map(lambda it:outstr.write("%s\n" % it),no_matches[block]) 1730 # now organise our results by type of error, not data item... 1731 error_type_dic = {} 1732 for error_item, error_list in block_result[1].items(): 1733 for func_name,bad_result in error_list: 1734 bad_result.update({"item_name":error_item}) 1735 try: 1736 error_type_dic[func_name].append(bad_result) 1737 except KeyError: 1738 error_type_dic[func_name] = [bad_result] 1739 # make a table of test name, test message 1740 info_table = {\ 1741 'validate_item_type':\ 1742 "The following data items had badly formed values", 1743 'validate_item_esd':\ 1744 "The following data items should not have esds appended", 1745 'validate_enum_range':\ 1746 "The following data items have values outside permitted range", 1747 'validate_item_enum':\ 1748 "The following data items have values outside permitted set", 1749 'validate_looping':\ 1750 "The following data items violate looping constraints", 1751 'validate_loop_membership':\ 1752 "The following looped data names are of different categories to the first looped data name", 1753 'validate_loop_key':\ 1754 "A required dataname for this category is missing from the loop\n containing the dataname", 1755 'validate_loop_references':\ 1756 "A dataname required by the item is missing from the loop", 1757 'validate_parent':\ 1758 "A parent dataname is missing or contains different values", 1759 'validate_child':\ 1760 "A child dataname contains different values to the parent", 1761 'validate_uniqueness':\ 1762 "One or more data items do not take unique values", 1763 'validate_dependents':\ 1764 "A dataname required by the item is missing from the data block", 1765 'validate_exclusion': \ 1766 "Both dataname and exclusive alternates or aliases are present in data block", 1767 'validate_mandatory_category':\ 1768 "A required category is missing from this block"} 1769 1770 for test_name,test_results in error_type_dic.items(): 1771 if use_html: 1772 outstr.write(html_error_report(test_name,info_table[test_name],test_results)) 1773 else: 1774 outstr.write(error_report(test_name,info_table[test_name],test_results)) 1775 outstr.write("\n\n") 1776 return outstr.getvalue() 1777 1778 # A function to lay out a single error report. We are passed 1779 # the name of the error (one of our validation functions), the 1780 # explanation to print out, and a dictionary with the error 1781 # information. We print no more than 50 characters of the item 1782 1783 def error_report(error_name,error_explanation,error_dics): 1784 retstring = "\n\n " + error_explanation + ":\n\n" 1785 headstring = "%-32s" % "Item name" 1786 bodystring = "" 1787 if error_dics[0].has_key("bad_values"): 1788 headstring += "%-20s" % "Bad value(s)" 1789 if error_dics[0].has_key("bad_items"): 1790 headstring += "%-20s" % "Bad dataname(s)" 1791 if error_dics[0].has_key("child"): 1792 headstring += "%-20s" % "Child" 1793 if error_dics[0].has_key("parent"): 1794 headstring += "%-20s" % "Parent" 1795 headstring +="\n" 1796 for error in error_dics: 1797 bodystring += "\n%-32s" % error["item_name"] 1798 if error.has_key("bad_values"): 1799 out_vals = map(lambda a:a[:50],error["bad_values"]) 1800 bodystring += "%-20s" % out_vals 1801 if error.has_key("bad_items"): 1802 bodystring += "%-20s" % error["bad_items"] 1803 if error.has_key("child"): 1804 bodystring += "%-20s" % error["child"] 1805 if error.has_key("parent"): 1806 bodystring += "%-20s" % error["parent"] 1807 return retstring + headstring + bodystring 1808 1809 # This lays out an HTML error report 1810 1811 def html_error_report(error_name,error_explanation,error_dics,annotate=[]): 1812 retstring = "<h4>" + error_explanation + ":</h4>" 1813 retstring = retstring + "<table cellpadding=5><tr>" 1814 headstring = "<th>Item name</th>" 1815 bodystring = "" 1816 if error_dics[0].has_key("bad_values"): 1817 headstring += "<th>Bad value(s)</th>" 1818 if error_dics[0].has_key("bad_items"): 1819 headstring += "<th>Bad dataname(s)</th>" 1820 if error_dics[0].has_key("child"): 1821 headstring += "<th>Child</th>" 1822 if error_dics[0].has_key("parent"): 1823 headstring += "<th>Parent</th>" 1824 headstring +="</tr>\n" 1825 for error in error_dics: 1826 bodystring += "<tr><td><tt>%s</tt></td>" % error["item_name"] 1827 if error.has_key("bad_values"): 1828 bodystring += "<td>%s</td>" % error["bad_values"] 1829 if error.has_key("bad_items"): 1830 bodystring += "<td><tt>%s</tt></td>" % error["bad_items"] 1831 if error.has_key("child"): 1832 bodystring += "<td><tt>%s</tt></td>" % error["child"] 1833 if error.has_key("parent"): 1834 bodystring += "<td><tt>%s</tt></td>" % error["parent"] 1835 bodystring += "</tr>\n" 1836 return retstring + headstring + bodystring + "</table>\n" 1837 1838 def run_data_checks(check_block,fulldic,globals={},fake_mand=False): 1839 v_result = {} 1840 for key in check_block.keys(): 1841 update_value(v_result, fulldic.run_item_validation(key,check_block[key])) 1842 update_value(v_result, fulldic.run_global_validation(key,check_block[key],check_block,globals=globals)) 1843 for loop in check_block.loops: 1844 update_value(v_result, fulldic.run_loop_validation(loop.keys())) 1845 update_value(v_result,fulldic.run_block_validation(check_block,globals=globals,fake_mand=fake_mand)) 1846 # return false and list of baddies if anything didn't match 1847 for test_key in v_result.keys(): 1848 v_result[test_key] = filter(lambda a:a[1]["result"]==False,v_result[test_key]) 1849 if len(v_result[test_key]) == 0: 1850 del v_result[test_key] 1851 # if even one false one is found, this should trigger 1852 # print "Baddies:" + `v_result` 1853 isvalid = len(v_result)==0 1854 return isvalid,v_result 1855 1856 1857 def get_number_with_esd(numstring): 1858 import string 1859 numb_re = '((-?(([0-9]*[.]([0-9]+))|([0-9]+)[.]?))([(][0-9]+[)])?([eEdD][+-]?[0-9]+)?)|(\?)|(\.)' 1860 our_match = re.match(numb_re,numstring) 1861 if our_match: 1862 a,base_num,b,c,dad,dbd,esd,exp,q,dot = our_match.groups() 1863 # print "Debug: %s -> %s" % (numstring, `our_match.groups()`) 1864 else: 1865 return None,None 1866 if dot or q: return None,None #a dot or question mark 1867 if exp: #has exponent 1868 exp = string.replace(exp,"d","e") # mop up old fashioned numbers 1869 exp = string.replace(exp,"D","e") 1870 base_num = base_num + exp 1871 #print "Debug: have %s for base_num from %s" % (base_num,numstring) 1872 base_num = float(base_num) 1873 # work out esd, if present. 1874 if esd: 1875 esd = float(esd[1:-1]) # no brackets 1876 if dad: # decimal point + digits 1877 esd = esd * (10 ** (-1* len(dad))) 1878 if exp: 1879 esd = esd * (10 ** (float(exp[1:]))) 1880 return base_num,esd 1881 1882 def float_with_esd(inval): 1883 if isinstance(inval,StringType): 1884 j = inval.find("(") 1885 if j>=0: return float(inval[:j]) 1886 return float(inval) 1887 1888 1889 1890 # A utility function to append to item values rather than replace them 1891 def update_value(base_dict,new_items): 1892 for new_key in new_items.keys(): 1893 if base_dict.has_key(new_key): 1894 base_dict[new_key].extend(new_items[new_key]) 1895 else: 1896 base_dict[new_key] = new_items[new_key] 1897 1898 #Transpose the list of lists passed to us 1899 def transpose(base_list): 1900 new_lofl = [] 1901 full_length = len(base_list) 1902 opt_range = range(full_length) 1903 for i in range(len(base_list[0])): 1904 new_packet = [] 1905 for j in opt_range: 1906 new_packet.append(base_list[j][i]) 1907 new_lofl.append(new_packet) 1908 return new_lofl 1909 1910 # listify strings - used surprisingly often 1911 def listify(item): 1912 if isinstance(item,StringType): return [item] 1913 else: return item 1914 1915 # given a list of search items, return a list of items 1916 # actually contained in the given data block 1917 def filter_present(namelist,datablocknames): 1918 return filter(lambda a:a in datablocknames,namelist) 1919 1920 # merge ddl dictionaries. We should be passed filenames or CifFile 1921 # objects 1922 def merge_dic(diclist,mergemode="replace",ddlspec=None): 1923 dic_as_cif_list = [] 1924 for dic in diclist: 1925 if not isinstance(dic,CifFile) and \ 1926 not isinstance(dic,StringType): 1927 raise TypeError, "Require list of CifFile names/objects for dictionary merging" 1928 if not isinstance(dic,CifFile): dic_as_cif_list.append(CifFile(dic)) 1929 else: dic_as_cif_list.append(dic) 1930 # we now merge left to right 1931 basedic = dic_as_cif_list[0] 1932 if basedic.has_key("on_this_dictionary"): #DDL1 style only 1933 for dic in dic_as_cif_list[1:]: 1934 basedic.merge(dic,mode=mergemode,match_att=["_name"]) 1935 elif len(basedic.keys()) == 1: #One block: DDL2 style 1936 old_block = basedic[basedic.keys()[0]] 1937 for dic in dic_as_cif_list[1:]: 1938 new_block = dic[dic.keys()[0]] 1939 basedic.merge(dic,mode=mergemode, 1940 single_block=[basedic.keys()[0],dic.keys()[0]], 1941 match_att=["_item.name"],match_function=find_parent) 1942 return CifDic(basedic) 1943 1944 def find_parent(ddl2_def): 1945 if not ddl2_def.has_key("_item.name"): 1946 return None 1947 if isinstance(ddl2_def["_item.name"],StringType): 1948 return ddl2_def["_item.name"] 1949 if not ddl2_def.has_key("_item_linked.child_name"): 1950 raise CifError("Asked to find parent in block with no child_names") 1951 if not ddl2_def.has_key("_item_linked.parent_name"): 1952 raise CifError("Asked to find parent in block with no parent_names") 1953 result = filter(lambda a:a not in ddl2_def["_item_linked.child_name"],ddl2_def["_item.name"]) 1954 if len(result)>1 or len(result)==0: 1955 raise CifError("Unable to find single unique parent data item") 1956 return result[0] 1957 1958 1959 def ReadCif(filename,strict=1,maxlength=2048,scantype="standard",grammar="1.1"): 1960 proto_cif = StarFile.ReadStar(filename,maxlength,scantype=scantype,grammar=grammar) 1961 # convert to CifFile 1962 proto_cif = CifFile(proto_cif) 1963 # check for nested loops 1964 for bname,bvalue in proto_cif.items(): 1965 nests = filter(lambda a:len(a.loops)>0,bvalue.loops) 1966 if len(nests) > 0: 1967 raise CifError( "Block %s contains nested loops") 1968 # check for save frame references (not yet implemented in PySTARRW) 1969 # check for global blocks (not yet implemented in PySTARRW) 1970 return proto_cif 1971 1972 1 from CifFile import *
Note: See TracChangeset
for help on using the changeset viewer.