-
Notifications
You must be signed in to change notification settings - Fork 1
/
Curves.py
782 lines (640 loc) · 33.4 KB
/
Curves.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
"""
Curves.py by John Dorsey.
Curves.py contains tools like the Spline class for creating curves and doing calculations related to them. The Spline class is mainly used by the Cell Elimination Run codec in PyCellElimRun.py for predicting missing samples of audio.
"""
import math
import itertools
import PyDeepArrTools
from PyDeepArrTools import enumerateDeeply
import SpiralMath
import PyGenTools
from PyGenTools import arrTakeOnly, seqsAreEqual, genTakeOnly
import TestingTools
try:
itertools.izip_longest = itertools.zip_longest
except AttributeError:
pass
try:
range = xrange
except NameError:
pass
"""
oldprint = print
def print(arg):
if arg == None:
raise ZeroDivisionError()
oldprint(arg)"""
DODBGPRINT = False #print debug info.
DOVIS = False #show pretty printing, mostly for debugging.
def dbgPrint(text,end="\n"): #only print if DODBGPRINT.
if DODBGPRINT:
print(text)
print("custom line endings disabled for python2 compatibility.")
def forceMonotonicSlopes(sur,slopes): #completely untested.
#sur stands for surroundings.
surRises = [sur[i+1][1] - sur[i][1] for i in range(len(sur)-1)] #changes in y between each pair of points.
surMonotonicSlope = -1 if all(((item <= 0) for item in surRises)) else 1 if all(((item >= 0) for item in surRises)) else 0 #the sign of every surRise if those are all the same sign, else 0.
if surMonotonicSlope == 1:
for i in range(len(slopes)):
slopes[i] = max(0,slopes[i])
elif surMonotonicSlope == -1:
for i in range(len(slopes)):
slopes[i] = min(0,slopes[i])
#these functions are used in constructing cubic hermite splines.
def hermite_h00(t):
return 2*t**3 - 3*t**2 + 1
def hermite_h10(t):
return t**3 - 2*t**2 + t
def hermite_h01(t):
return -2*t**3 + 3*t**2
def hermite_h11(t):
return t**3-t**2
distance_2d_funs = {
"absolute_distance":(lambda x, y: (x**2 + y**2)**0.5),
"bilog_distance":(lambda x, y: (math.log(x+1)**2 + math.log(y+1)**2)**0.5), #don't use this name anymore.
"axial_log_distance":(lambda x, y: (math.log(x+1)**2 + math.log(y+1)**2)**0.5),
"manhattan_distance":(lambda x, y: x + y)
}
point_distance_2d_funs = {
"absolute_distance":(lambda p0, p1: ((p0[0]-p1[0])**2 + (p0[1]-p1[1])**2)**0.5),
"bilog_distance":(lambda p0, p1: (math.log(abs(p0[0]-p1[0])+1)**2 + math.log(abs(p0[1]-p1[1])+1)**2)**0.5), #don't use this name anymore.
"axial_log_distance":(lambda p0, p1: (math.log(abs(p0[0]-p1[0])+1)**2 + math.log(abs(p0[1]-p1[1])+1)**2)**0.5),
"manhattan_distance":(lambda p0, p1: abs(p0[0]-p1[0]) + abs(p0[1]-p1[1]))
}
point_distance_nd_funs = {
"absolute_distance":(lambda p0, p1: sum((p0[dimi]-p1[dimi])**2 for dimi in range(len(p0)))**0.5),
"axial_log_distance":(lambda p0, p1: sum(math.log(abs(p0[dimi]-p1[dimi])+1)**2 for dimi in range(len(p0)))**0.5),
"manhattan_distance":(lambda p0, p1: sum(abs(p0[dimi]-p1[dimi]) for dimi in range(len(p0))))
}
def flatten_point(point_to_flatten, world_size):
#assert point_to_flatten != None
#assert None not in point_to_flatten
assert len(point_to_flatten) == len(world_size)
result = 0
for pointElement,measure in itertools.izip_longest(point_to_flatten,world_size):
result = result * measure + pointElement
return result
def hash_point_list(point_list_to_hash, world_size):
if len(point_list_to_hash) == 0: #avoid index error.
return 0
uniform_base = flatten_point([element-1 for element in world_size], world_size) + 2 #the +2 accomodates Nones.
flattened_points = sorted(((0 if point==None else flatten_point(point,world_size)+1) for point in point_list_to_hash)) #sorting is performed after hashing because sorting integers is much faster than sorting tuples or lists.
if sum(flattened_points) == 0: #avoid possible index error.
return 0
assert flattened_points[-1] < uniform_base
result = 0
for flattened_point in flattened_points:
result = result * uniform_base + flattened_point
return result
def lookup_if_str(lookup_dict, potential_key, default=None):
if type(potential_key) == str:
if default != None:
return lookup_dict.get(potential_key,default=default)
else:
return lookup_dict[potential_key]
else:
return potential_key
def genFindNearest2d(startPoint,triggerFun,shape="circular",timeout=None):
assert shape in ["circular","square"]
if shape == "square":
for i in (range(0,timeout) if timeout != None else itertools.count(0)):
currentCoord = SpiralMath.spiralCoordDecode(i,home=startPoint,rot=0,spin=1)
if triggerFun(currentCoord):
yield currentCoord
elif shape == "circular":
circularSpiralCoordGen = SpiralMath.genCircularSpiralCoords(home=startPoint,rot=0,spin=1)
limitedCircularSpiralCoordGen = genTakeOnly(circularSpiralCoordGen,timeout) if timeout != None else circularSpiralCoordGen
for i,currentCoord in enumerate(limitedCircularSpiralCoordGen):
if triggerFun(currentCoord):
yield currentCoord
else:
assert False
class SparseList(list):
def __init__(self, data, useNearbyBoneLocationCache, useBoneDistanceAbsCache):
self.data = data
assert isinstance(self.data, list)
assert not isinstance(self.data, SparseList)
#print("SparseList input arg data is {} of type {}.".format(repr(data), repr(type(data))))
self.useNearbyBoneLocationCache, self.useBoneDistanceAbsCache = useNearbyBoneLocationCache, useBoneDistanceAbsCache
self.initializeCaches()
#print("SparseList.data is {} of type {}.".format(repr(self.data), repr(type(self.data))))
def initializeCaches(self):
if self.useBoneDistanceAbsCache:
self.boneDistanceAbs = [None for i in range(len(self.data))]
if self.useNearbyBoneLocationCache:
assert not self.useBoneDistanceAbsCache, "these caching modes can't be used simultaneously!"
self.nearbyBoneLocation = [None for i in range(len(self.data))]
def getPointInDirection(self, location, direction, skipStart=True):
assert isinstance(direction, int)
assert direction in [-1,1]
assert 0 <= location < len(self.data)
if self.useNearbyBoneLocationCache:
location += skipStart*direction
while 0 <= location < len(self.data):
nearbyBoneLocation = self.nearbyBoneLocation[location]
if nearbyBoneLocation == location:
return (location, self.data[location])
nearbyBoneDisplacement = nearbyBoneLocation - location
if nearbyBoneDisplacement*direction > 0: #if they have the same sign:
return (nearbyBoneLocation, self.data[nearbyBoneLocation])
location -= nearbyBoneDisplacement
return None
elif self.useBoneDistanceAbsCache:
location += skipStart*direction
while 0 <= location < len(self.data):
if self.data[location] != None:
return (location,self.data[location])
location += direction * self.boneDistanceAbs[location]
return None
else:
location += skipStart*direction
if not 0 <= location < len(self.data):
return None
for i in range(len(self.data)*2):
if self.data[location] != None:
return (location,self.data[location])
location += direction
if location < 0 or location >= len(self.data):
return None
assert False
def __setitem__(self, index, value):
if self.useNearbyBoneLocationCache:
assert value != None, "None values can't be set while bone location caching is enabled."
self.nearbyBoneLocation[index] = index
for direction in [-1,1]:
for x in (range(index-1,-1,-1) if direction==-1 else range(index+1, len(self.data))):
oldNearestBoneLocation = self.nearbyBoneLocation[x]
if oldNearestBoneLocation == None:
self.nearbyBoneLocation[x] = index
else:
oldDist = abs(x-oldNearestBoneLocation)
newDist = abs(x - index)
if newDist >= oldDist:
break
self.nearbyBoneLocation[x] = index
elif self.useBoneDistanceAbsCache:
assert value != None, "None values can't be set while bone distance abs caching is enabled."
self.boneDistanceAbs[index] = 0
for direction in [-1,1]:
#x = index + direction
#while x < len(self.data):
for x in (range(index-1,-1,-1) if direction==-1 else range(index+1, len(self.data))):
oldDist = self.boneDistanceAbs[x]
newDist = abs(x - index)
if oldDist != None:
if newDist >= oldDist:
break
self.boneDistanceAbs[x] = newDist
self.data[index] = value
def __str__(self):
#print("SparseList.__str__ called.")
return str(self.data)
def __repr__(self):
#print("SparseList.__repr__ called.")
return "SparseList({})".format(repr(self.data))
def __len__(self):
#print("SparseList.__len__ called.")
return len(self.data)
def __getitem__(self, index):
return self.data[index]
def __iter__(self):
return iter(self.data)
class Spline:
#the Spline is what holds sparse or complete records of all the samples of a wave, and uses whichever interpolation mode was chosen upon its creation to provide guesses about the values of missing samples. It is what will inform decisions about how likely a cell (combination of a sample location and sample value) is to be filled/true.
SUPPORTED_INTERPOLATION_METHOD_NAMES = ["hold","nearest_neighbor","linear","sinusoidal","finite_difference_cubic_hermite","inverse_distance_weighted"]
SUPPORTED_OUTPUT_FILTERS = ["span_clip","global_clip","round","monotonic"]
CACHE_VALUES_2D = False #enabling offers a Testing.test() speedup from 16.8 to 14.0 seconds, not considering time spent on numberCodecs and other tasks.
CACHE_VALUES_BY_SUR_HASH = True
USE_BONE_DISTANCE_ABS_CACHE = False #only applies when valueCache misses.
USE_NEARBY_BONE_LOCATION_CACHE = True #an alternative to caching bone distance abs.
#in the following dictionary, integers represent points relative to the new point 0. 1 is the next anchor point to the right of the new point, 2 is the point to the right of that one, and the points -1 and -2 are similar. a tuple of two ints represents the span between those two points. Future interpolation methods might require more than this notation can offer, such as trig (fourier) requiring a keyword like "ENTIRE".
VALUE_CACHE_UPDATE_TYPES = {
"hold":[(0,1)],
"nearest_neighbor":[(-1,0),(0,1)],
"linear":[(-1,0),(0,1)],
"sinusoidal":[(-1,0),(0,1)],
"finite_difference_cubic_hermite":[(-2,-1),(-1,0),(0,1),(1,2)],
"inverse_distance_weighted":[(-2,-1),(-1,0),(0,1),(1,2)]
}
SURROUNDINGS_REQUIREMENTS = {
"hold":[0,1,0,0],
"nearest_neighbor":[0,1,1,0],
"linear":[0,1,1,0],
"sinusoidal":[0,1,1,0],
"finite_difference_cubic_hermite":[1,1,1,1],
"inverse_distance_weighted":[1,1,1,1],
"unspecified":[0,0,0,0] #this exists to make it easier to set endpoints.
}
OUTPUT_FILTER_TEMPLATES = {
"span_clip":"max(min({},max(sur_for_filter[1][1],sur_for_filter[2][1])),min(sur_for_filter[1][1],sur_for_filter[2][1]))",
"global_clip":"max(min({},self.size[1]),0)",
"round":"int(round({}))"
}
def __init__(self, interpolation_mode="unspecified", size=None, endpointInitMode=None, default_value="zero"):
self.initializeByDefault()
self.setInterpolationMode(interpolation_mode)
self.setSizeAndEndpoints(size, endpointInitMode)
self.setDefaultValue(default_value)
self.initializeCaches()
self.initializeData()
self.finalizeEndpoints()
assert seqsAreEqual(PyDeepArrTools.shape(self.data), self.size[:-1])
#print("spline init finished")
def initializeByDefault(self):
self.forceMonotonicSlopes = forceMonotonicSlopes
self.hermite_h00 = hermite_h00
self.hermite_h10 = hermite_h10
self.hermite_h01 = hermite_h01
self.hermite_h11 = hermite_h11
def setInterpolationMode(self, interpolationMode):
if type(interpolationMode) == str:
self.interpolation_method_name = interpolationMode.split("&")[0]
self.output_filters = []
if "&" in interpolationMode:
self.output_filters.extend(interpolationMode.split("&")[1].split(";")) #@ this is not ideal but it saves complexity in testing. It lets every configuration I want to test be described by a single string.
if not self.interpolation_method_name in Spline.SUPPORTED_INTERPOLATION_METHOD_NAMES:
if self.interpolation_method_name != "unspecified": #else let it slide.
raise ValueError("The interpolation_method_name " + self.interpolation_method_name + " is not supported.")
for outputFilter in self.output_filters:
if not outputFilter in Spline.SUPPORTED_OUTPUT_FILTERS:
raise ValueError("The outputFilter " + outputFilter + " is not supported.")
elif isinstance(interpolationMode,dict):
self.interpolation_method_name = interpolationMode.get("method_name","unspecified163")
self.interpolation_power = interpolationMode.get("power",2)
self.output_filters = interpolationMode.get("output_filters",[])
#self.interpolation_distance_2d_fun = lookup_if_str(distance_2d_funs,interpolationMode.get("distance_fun","absolute_distance"))
self.interpolation_point_distance_nd_fun = lookup_if_str(point_distance_nd_funs, interpolationMode.get("distance_fun","absolute_distance"))
#print("setInterpolationMode set outputFilters to {}.".format(self.outputFilters))
self.findPerformanceWarnings()
self.initializeOutputFilter()
def findPerformanceWarnings(self):
def perfWarn(filterName,modeName):
print("Curves.Spline.findPerformanceWarnings: outputFilter \"" + filterName + "\" and interpolationMode \"" + modeName + "\" are both enabled, but that filter doesn't affect the results of that method of interpolation.")
for outputFilter in ["span_clip", "global_clip", "monotonic"]:
if outputFilter in self.output_filters:
if self.interpolation_method_name in ["hold", "nearest_neighbor", "linear", "sinusoidal"]:
perfWarn(outputFilter, self.interpolation_method_name)
for outputFilter in ["round"]:
if outputFilter in self.output_filters:
if self.interpolation_method_name in ["hold", "nearest_neighbor"]:
perfWarn(outputFilter, self.interpolation_method_name)
if "span_clip" in self.output_filters and "global_clip" in self.output_filters:
print("Curves.Spline.findPerformanceWarnings: global_clip and span_clip are both enabled, but span_clip always does the job of global_clip, assuming no Spline bones are outside of the size of the spline.")
def initializeOutputFilter(self):
if "global_clip" in self.output_filters:
print("Curves.Spline: warning: global clip incorrectly uses size. boundaries should be added to spline to properly support global clipping.")
ORIGINAL_OUTPUT_FILTER_TEMPLATE = "lambda value_to_filter, sur_for_filter: {}"
outputFilterTemplate = ORIGINAL_OUTPUT_FILTER_TEMPLATE
#print("Curves.Spline.initializeOutputFilter: self.outputFilters is now {}.".format(self.outputFilters))
for outputFilterName in self.output_filters:
if outputFilterName in Spline.OUTPUT_FILTER_TEMPLATES.keys():
outputFilterTemplate = outputFilterTemplate.format(Spline.OUTPUT_FILTER_TEMPLATES[outputFilterName])
else:
print("output filter {} is not in OUTPUT_FILTER_TEMPLATES, but that's probably fine.".format(outputFilterName))
outputFilterTemplate = outputFilterTemplate.format("value_to_filter")
self._output_filter_fun = eval(outputFilterTemplate)
#print("initializeOutputFilter finally evaluated {}.".format(repr(outputFilterTemplate)))
def setSizeAndEndpoints(self, size, endpointInitMode):
self.size = size
self.dimensions = len(self.size)
self.volume = PyGenTools.accumulate(self.size, (lambda x,y: x*y))
assert type(endpointInitMode) in [list,str]
if type(endpointInitMode) == str:
endpointInitMode = [str(endpointInitMode) for i in range(2)]
assert len(endpointInitMode) == 2 #1 for each endpoint.
tempEndpoints = [[0, None], [self.size[0]-1, None]]
for i in [0,1]:
if self.size[1] == None:
tempEndpoints[i][1] = 0
print("Curves.Spline.setSizeAndEndpoints: size[1]==None sets endpoint height to zero.")
continue
try:
tempEndpoints[i][1] = self.resolveInitValueKeyword(endpointInitMode[i])
except KeyError:
raise KeyError("The endpointInitMode value {} at i={} is invalid!".format(repr(endpointInitMode[i]),i))
self.endpoints = ((tempEndpoints[0][0], tempEndpoints[0][1]), (tempEndpoints[1][0], tempEndpoints[1][1]))
if self.size == None:
print("Curves.Spline.setSizeAndEndpoints: Size should not be empty, and this branch should not be running!")
self.size = [self.endpoints[1][0] - self.endpoints[0][0] + 1, None]
print("Curves.Spline.setSizeAndEndpoints: self.size is incomplete.")
assert len(self.endpoints) == 2
assert self.endpoints[0][0] == 0, "sample ranges not starting at zero are not yet supported."
assert self.endpoints[1][0] == size[0]-1, "sample ranges not ending at their second endpoint are not supported."
def setDefaultValue(self, default_value):
if type(default_value) == str:
self.default_value = self.resolveInitValueKeyword(default_value)
else:
self.default_value = default_value
if self.default_value == None:
print("Curves.Spline.set_default_value: warning: default value is now None! this may cause problems!")
def resolveInitValueKeyword(self, keyword):
if keyword == "middle":
return self.size[1]>>1
elif keyword == "zero":
return 0
elif keyword == "maximum":
return self.size[1]-1
else:
raise KeyError("The init value keyword is invalid!")
def initializeCaches(self):
if Spline.CACHE_VALUES_BY_SUR_HASH:
self.value_cache_by_surroundings_hash = dict()
if Spline.CACHE_VALUES_2D:
assert not Spline.CACHE_VALUES_BY_SUR_HASH, "these caching modes can't be used simultaneously!"
if self.dimensions != 2:
print("Curves.Spline.initializeCaches: warning: the regular value cache is disabled because self.dimensions != 2.")
else:
self.valueCache = [None for i in range(len(self.data))]
def initializeData(self):
self.data = PyDeepArrTools.noneInitializer(self.size[:-1])
if self.dimensions == 2:
self.data = SparseList(self.data, useNearbyBoneLocationCache=Spline.USE_NEARBY_BONE_LOCATION_CACHE, useBoneDistanceAbsCache=Spline.USE_BONE_DISTANCE_ABS_CACHE)
def finalizeEndpoints(self):
if self.dimensions == 2:
self.__setitem__(0,self.endpoints[0][1])
self.__setitem__(-1,self.endpoints[1][1])
def toPrettyStr(self):
alphabet = [["~","-"],["%","#"]] #access like alphabet[known?][exact?]
result = "Curves.Spline.toPrettyStr generated string representation."
assert self.endpoints[0][0] == 0
tempValues = [self.__getitem__(index) for index in range(self.endpoints[1][0]+1)]
roundedTempValues = [int(round(value)) for value in tempValues]
valueRange = (min(roundedTempValues),max(roundedTempValues))
for y in range(valueRange[1],valueRange[0]-1,-1):
if not y in roundedTempValues: #@ slow.
if not result.endswith("\n..."):
result += "\n..."
continue
result += "\n"+str(y).rjust(10," ")+": "
#print("Curves.Spline.prettyPrint: prettyPrinting disabled for python2 compatibility.")
for index in range(self.size[0]):
if not roundedTempValues[index] == y:
result += " "
continue
known = (self.data[index] != None)
exact = (tempValues[index] == y)
addition = alphabet[known][exact]
result += addition
result += "\nend of generated string representation."
return result
def interpolateMissingValues(self, data):
"""
fill in missing values in another data structure using interpolation.
"""
changeCounter = 0
for columnID,_ in enumerateDeeply(data):
newValue = self.get_value_using_path(columnID)
originalValue = PyDeepArrTools.setValueUsingPath(data, columnID, newValue)
if originalValue != newValue:
changeCounter += 1
assert originalValue == None, "a non-none value ({}) was changed.".format(originalValue)
if changeCounter > 0:
print("Curves.Spline.interpolateMissingValues changed {} values.".format(changeCounter))
def getPointInDirection(self, location, direction, skipStart=True):
assert len(self.size) == 2, "this is a 2d only method."
result = self.data.getPointInDirection(location, direction, skipStart=skipStart)
return result
def get_enclosing_surroundings(self,location):
assert isinstance(location,list) or isinstance(location,tuple)
assert len(location) + 1 == self.dimensions
if self.dimensions == 2:
return self.get_surroundings_2d(location[0],[0,1,1,0])
elif self.dimensions == 3:
triggerFun = (lambda testLocation: PyDeepArrTools.isInShape(testLocation,self.size[:-1]) and PyDeepArrTools.getValueUsingPath(self.data, testLocation) != None)
surLocations = arrTakeOnly(genFindNearest2d(location,triggerFun,timeout=self.volume),4)
return [list(surLocation) + [PyDeepArrTools.getValueUsingPath(self.data,surLocation)] for surLocation in surLocations]
elif self.dimensions > 3:
raise NotImplementedError("above 3d")
else:
assert False, "Invalid dimensions."
def get_necessary_surroundings_2d(self,index):
"""
creates a surroundings list [second item to left, item to left, item to right, second item to right] populated with only the values needed by the current interpolation mode. Values not needed will be None.
"""
surroundingsRequest = Spline.SURROUNDINGS_REQUIREMENTS[self.interpolation_method_name]
return self.get_surroundings_2d(index, surroundingsRequest)
def get_surroundings_2d(self, index, surroundingsRequest):
"""
creates a surroundings list [second item to left, item to left, item to right, second item to right] populated with only the values requested. Values not requested will be None.
"""
surroundings = [None, None, None, None]
if surroundingsRequest[1]:
surroundings[1] = self.getPointInDirection(index,-1)
if surroundingsRequest[0]:
if surroundings[1] == None:
print("Curves.Spline.get_surroundings_2d: at index {}, surroundings[1] was empty, and surroundings[0] won't be filled in.".format(index))
else:
surroundings[0] = self.getPointInDirection(surroundings[1][0], -1)
if surroundingsRequest[2]:
surroundings[2] = self.getPointInDirection(index,1)
if surroundingsRequest[3]:
if surroundings[2] == None:
print("Curves.Spline.get_surroundings_2d: at index {}, surroundings[2] was empty, and surroundings[3] won't be filled in.".format(index))
else:
surroundings[3] = self.getPointInDirection(surroundings[2][0], 1)
return surroundings
def __len__(self): #this is to make Spline iterable.
assert self.dimensions == 2, "this is a 2d only method."
return self.size[0]
def __getitem__(self, index):
assert self.dimensions == 2, "this is a 2d only method."
index = index%len(self.data)
location = (index,)
return self.get_at_location_cached(location)
def __setitem__(self, index, value):
assert self.dimensions == 2, "this is a 2d only method."
index = index % len(self)
location = (index,)
self.set_at_location_cached(location, value)
def get_value_using_path(self, path):
assert len(path) + 1 == self.dimensions
result = self.get_at_location_cached(path)
assert result != None, "result should never be None!" #@ slow
return result
def set_value_using_cell(self, cell):
assert len(cell) == self.dimensions
self.set_at_location_cached(cell[:-1],cell[-1])
def get_at_location_cached(self, location):
assert len(location) + 1 == self.dimensions
if self.dimensions == 2:
index = location[0]
directAccessResult = self.data[index]
if directAccessResult != None: #no interpolation is ever done when the index in question has a known value.
return directAccessResult
if Spline.CACHE_VALUES_2D:
if self.valueCache[index] != None:
return self.valueCache[index]
else:
sur = self.get_necessary_surroundings_2d(index)
result = self.solve_location(location,sur)
self.valueCache[index] = result
return result
elif self.dimensions > 2:
directAccessResult = PyDeepArrTools.getValueUsingPath(self.data,location)
if directAccessResult != None:
return directAccessResult
else:
assert False, "invalid dimension count."
if Spline.CACHE_VALUES_BY_SUR_HASH:
sur = None
if self.dimensions == 2:
sur = self.get_necessary_surroundings_2d(location[0])
elif self.dimensions > 2:
sur = self.get_enclosing_surroundings(location) #make sure to change this at the same time when changing the way surroundings are found in set_at_location.
else:
assert False, "Invalid dimension count."
surHash = hash_point_list(sur, self.size) #include cell heights.
if surHash in self.value_cache_by_surroundings_hash:
surHashEntryDict = self.value_cache_by_surroundings_hash[surHash]
else: #if no point with these surroundings has ever been calculated and cached before:
surHashEntryDict = dict() #make a new entry.
self.value_cache_by_surroundings_hash[surHash] = surHashEntryDict #register the new entry.
locationHash = flatten_point(location,self.size[:-1])
if locationHash in surHashEntryDict:
result = surHashEntryDict[locationHash]
assert result != None, "result should never be None!" #@ slow
return result
else:
result = self.solve_location(location,sur)
surHashEntryDict[locationHash] = result
#assert result != None, "result should never be None!" #@ slow
return result
return self.solve_location(location, sur) #this will give a name error someday. When it does, sur calculation should be made more centralized.
def solve_location(self, location, sur):
#print("solve_location: location is {}. sur is {}.".format(location,sur))
assert isinstance(location, list) or isinstance(location, tuple)
assert sur is not None
result = None
interpolation_method_name = self.interpolation_method_name
if interpolation_method_name == "hold":
resultPoint = sur[1]
result = resultPoint[1]
elif interpolation_method_name == "nearest_neighbor":
#when two neighbors are equal distances away, the one on the left will be chosen.
t = float(location[0]-sur[1][0])/float(sur[2][0]-sur[1][0])
resultPoint = sur[1] if (t <= 0.5) else sur[2]
result = resultPoint[1]
elif interpolation_method_name in ["linear", "sinusoidal"]:
t = float(location[0]-sur[1][0])/float(sur[2][0]-sur[1][0])
leftBone, rightBone = sur[1], sur[2]
riseScale = rightBone[1] - leftBone[1]
if interpolation_method_name == "linear":
rise = riseScale * t
elif interpolation_method_name == "sinusoidal":
rise = riseScale * 0.5 * (1-math.cos(math.pi*t))
result = leftBone[1] + rise
elif interpolation_method_name == "finite_difference_cubic_hermite":
t = float(location[0]-sur[1][0])/float(sur[2][0]-sur[1][0])
if None in sur[1:3]:
assert False, "an important (inner) item is missing from the surroundings."
slopes = [None,None]
if sur[0] is None:
slopes[0] = float(sur[2][1]-sur[1][1])/float(sur[2][0]-sur[1][0])
else:
slopes[0] = 0.5*(float(sur[2][1]-sur[1][1])/float(sur[2][0]-sur[1][0])+float(sur[1][1]-sur[0][1])/float(sur[1][0]-sur[0][0]))
if sur[3] is None:
slopes[1] = (sur[2][1]-sur[1][1])/(sur[2][0]-sur[1][0])
else:
slopes[1] = 0.5*(float(sur[3][1]-sur[2][1])/float(sur[3][0]-sur[2][0])+float(sur[2][1]-sur[1][1])/float(sur[2][0]-sur[1][0]))
if "monotonic" in self.output_filters:
self.forceMonotonicSlopes(sur, slopes) #might not have any effect anyway.
result = self.hermite_h00(t)*sur[1][1]+self.hermite_h10(t)*slopes[0]+self.hermite_h01(t)*sur[2][1]+self.hermite_h11(t)*slopes[1]
elif interpolation_method_name == "inverse_distance_weighted":
weightSum = 0.0
workingResult = 0.0
for surPoint in sur:
if surPoint == None:
continue
surPointValue = surPoint[-1]
#surPointWeight = float(abs(location - surPoint[0]))**(-self.interpolation_power)
surPointDistance = self.interpolation_point_distance_nd_fun(location, surPoint)
assert surPointDistance > 0, "this should have been detected earlier!"
surPointWeight = surPointDistance**(-self.interpolation_power)
workingResult += surPointValue * surPointWeight
weightSum += surPointWeight
if weightSum > 0: #avoid zero division error.
workingResult /= weightSum
result = workingResult
else:
result = self.default_value
else:
raise KeyError("The interpolation_method_name {} isn't supported.".format(interpolation_method_name))
result = self._output_filter_fun(result, sur)
assert result != None, "result should never be None!" #@ slow
return result
def set_at_location_cached(self, location, value):
assert len(location) + 1 == self.dimensions
if Spline.CACHE_VALUES_BY_SUR_HASH:
sur = None
if self.dimensions == 2:
sur = self.get_necessary_surroundings_2d(location[0])
elif self.dimensions > 2:
#raise NotImplementedError("some cache handling isn't ready for this number of dimensions.")
sur = self.get_enclosing_surroundings(location) #@ make sure to change this at the same time as elsewhere.
else:
assert False, "invalid dimension count."
self.clear_cache_entry_for_surroundings(sur)
if self.dimensions == 2:
index = location[0]
self.data[index] = value
if Spline.CACHE_VALUES_2D:
#the following fast update version works by processing runs of non-None values. It is too simple to handle clearing a far region without a closer one first. The benefit of doing it this way is that bone distances don't need to be determined before work starts.
if self.interpolation_method_name == "unspecified":
return
updateType = Spline.VALUE_CACHE_UPDATE_TYPES[self.interpolation_method_name]
if type(updateType) != list:
raise ValueError("updateType definitions of types other than list are not supported yet.")
self.valueCache[index] = None
if (0,1) in updateType:
self.clearCacheInDirection(index, 1, times=(2 if (1,2) in updateType else 1))
if (-1,0) in updateType:
self.clearCacheInDirection(index, -1, times=(2 if (-2,-1) in updateType else 1))
elif self.dimensions > 2:
PyDeepArrTools.setValueUsingPath(self.data, location, value)
if Spline.USE_NEARBY_BONE_LOCATION_CACHE:
#print("Spline.set_at_location_cached: warning: can't cache nearby bone location when dimensions > 2.")
pass
if Spline.USE_BONE_DISTANCE_ABS_CACHE:
#print("Spline.set_at_location_cached: warning: can't cache bone distance abs when dimensions > 2.")
pass
else:
assert False, "invalid dimension count."
def clearCacheInDirection(self, index, direction, times=1):
"""
skips clearing the value at index. The arg _times_ indicates the number of times to skip over a value of None and continue.
If interpolation with endpoint wrapping is added, this method will need to be changed to wrap as well.
"""
clearingIndex = index
try:
for i in range(times):
clearingIndex += direction
while self.valueCache[clearingIndex] != None:
self.valueCache[clearingIndex] = None
clearingIndex += direction
except IndexError:
pass
def clear_cache_entry_for_surroundings(self, sur):
assert all(len(item) == len(self.size) for item in sur if item != None)
surHash = hash_point_list(sur, self.size)
if surHash in self.value_cache_by_surroundings_hash:
del self.value_cache_by_surroundings_hash[surHash] #the old surroundings are now not a valid thing to search by. Any points who used to have values chached in the dict stored here now need to be regenerated.
print("testing Curves.SparseList...")
testSparseList = SparseList([5,6,7], False, False)
#print("analyzing testSparseList...")
#print("testSparseList is {}, repr is {}, type is {}.".format(testSparseList, repr(testSparseList), repr(type(testSparseList))))
#print("analyzing testSparseList.data..")
#print("testSparseList.data is {}, repr is {}, type is {}.".format(testSparseList.data, repr(testSparseList.data), repr(type(testSparseList.data))))
#print("analyzing testSparseList type...")
assert isinstance(testSparseList, SparseList)
#print("still analyzing testSparseList type...")
assert isinstance(testSparseList, list)
#print("analyzing testSparseList.data type...")
assert isinstance(testSparseList.data, list)
#print("still analyzing testSparseList.data type...")
assert not isinstance(testSparseList.data, SparseList)
#TestingTools.assertSame(testSparseList.__len__, testSparseList.data.__len__)
#print("testSparseList.__len__ call test...")
TestingTools.assertEqual(testSparseList.__len__(), 3)
#print("len(testSparseList) test...")
TestingTools.assertEqual(len(testSparseList), 3)
TestingTools.assertEqual(testSparseList[1], 6)