Skip to content
Snippets Groups Projects
Commit a8a8d311 authored by Timo Specht's avatar Timo Specht
Browse files

:construction_site: WIP de Casteljau no class

parent a6365ae4
No related branches found
No related tags found
1 merge request!9Merge chapter5 snippets into bezier_curve
......@@ -12,109 +12,6 @@ from curvepy.utilities import csv_read
from curvepy.types import bernstein_polynomial
class Tholder:
"""
Class holds Array with equidistant ts in [0,1] of length n
Parameters
----------
n: int
Numbers of ts to calculate
Attributes
-------
_tArray : np.ndarray
array in which all ts are stored
_pointer : int
pointer pointing on next t to get
lockMe: th.Lock
lock for multithreading
"""
def __init__(self, ts: np.ndarray = None) -> None:
self._tArray = ts
self._pointer = 0
self.lockMe = th.Lock() # variable used to control access of threads
def get_next_t(self) -> float:
"""
Method for threads to get next t
Returns
-------
float:
t to calculate the next De Casteljau step
"""
if self._pointer == len(self._tArray):
return -1
res = self._tArray[self._pointer]
self._pointer += 1
return res
class CasteljauThread(th.Thread):
"""
Thread class computing the De Casteljau algorithm
Parameters
----------
ts_holder: Tholder
Class which yields all ts
c: np.ndarray
Array with control points
f: Function
Function to transform t if necessary
Attributes
-------
_ts_holder : Tholder
instance of class Tholder so thread can get the ts for calculating the de Casteljau algorithm
_coords: np.ndarray
original control points
res: list
actual points on curve
_func: Function
function for transforming t
"""
def __init__(self, ts_holder: Tholder, c: np.ndarray, f: Callable[[float], float] = lambda x: x) -> None:
th.Thread.__init__(self)
self._ts_holder = ts_holder
self._coords = c
self.res = []
self._func = f
def de_caes(self, t: float, n: int) -> None:
"""
Method implementing the the De Casteljau algorithm
Parameters
----------
t: float
value at which to be evaluated at
n: int
number of iterations TODO find more describing phrasing together
"""
m = self._coords.copy()
t = self._func(t)
for r in range(n):
m[:, :(n - r - 1)] = (1 - t) * m[:, :(n - r - 1)] + t * m[:, 1:(n - r)]
self.res.append(m[:, 0])
def run(self) -> None:
"""
Method calculates points until depot is empty
"""
_, n = self._coords.shape
while True:
self._ts_holder.lockMe.acquire()
t = self._ts_holder.get_next_t()
self._ts_holder.lockMe.release()
if t == -1:
break
self.de_caes(t, n)
class AbstractBezierCurve(ABC):
"""
Abstract class for creating a Bezier Curve.
......@@ -521,6 +418,7 @@ class BezierCurveThreaded(AbstractBezierCurve):
return curve
class BezierCurveBernstein(AbstractBezierCurve):
def init_func(self, m: np.ndarray) -> Callable:
......@@ -561,6 +459,7 @@ class BezierCurveBernstein(AbstractBezierCurve):
return np.sum([m[:, i] * bernstein_polynomial(n - r - 1, i, t) for i in range(n - r)], axis=0)
def init() -> None:
m = csv_read('test.csv') # reads csv file with bezier points
b1 = BezierCurve(m)
......
from typing import Iterable, Union, Tuple
import concurrent.futures
from typing import List, Tuple
import numpy as np
from multiprocessing import cpu_count
class DeCasteljau:
def de_caes_one_step(m: np.ndarray, t: float = 0.5, interval: Tuple[int, int] = (0, 1)) -> np.ndarray:
"""
Method computing one round of de Casteljau
def __init__(self, m: np.ndarray, make_copy=True, interval: Tuple[int, int] = (0, 1)):
self.m = m.copy() if make_copy else m
self.interval = interval
Parameters
----------
m: np.ndarray:
array containing coefficients
def __call__(self, ts: Iterable[float]) -> np.ndarray: # get value
"""
Method computing r round of de Casteljau
t: float:
value for which point is calculated
Parameters
----------
t: float:
value for which point is calculated
interval: Tuple[int, int]:
if interval != (0,1) we need to transform t
Returns
-------
np.ndarray:
array containing calculated points with given t
"""
m = self.m
Returns
-------
np.ndarray:
array containing calculated points with given t
"""
for i, t in enumerate(ts):
m = self.step(t)
l, r = interval
t2 = (t - l) / (r - l) if interval != (0, 1) else t
t1 = 1 - t2
return m
m[:, :-1] = t1 * m[:, :-1] + t2 * m[:, 1:]
def __getitem__(self, t: float = 0.5):
return DeCasteljau(self.step(t)[:, :-1])
return m[:, :-1] if m.shape != (2, 1) else m
def step(self, t: float = 0.5) -> np.ndarray:
"""
Method computing one round of de Casteljau
Parameters
----------
t: float:
value for which point is calculated
def de_caes_n_steps(m: np.ndarray, t: float = 0.5, r: int = 1, interval: Tuple[int, int] = (0, 1)) -> np.ndarray:
"""
Method computing r round of de Casteljau
Returns
-------
np.ndarray:
array containing calculated points with given t
"""
m = self.m
l, r = self.interval
if m.shape[1] < 2:
raise Exception("At least two points are needed")
Parameters
----------
m: np.ndarray:
array containing coefficients
t2 = (t - l) / (r - l) if (l, r) != (0, 1) else t
t1 = 1 - t2
t: float:
value for which point is calculated
m[:, :-1] = t1 * m[:, :-1] + t2 * m[:, 1:]
r: int:
how many rounds of de Casteljau algorithm should be performed
return m if m.shape == (2, 1) else m[:, :-1]
interval: Tuple[int, int]:
if interval != (0,1) we need to transform t
def apply_many(self, ts: Iterable[float]):
return [self[t] for t in ts]
Returns
-------
np.ndarray:
array containing calculated points with given t
"""
for i in range(r):
m = de_caes_one_step(m, t, interval)
return m
def de_caes(m: np.ndarray, t: float = 0.5, make_copy: bool = False, interval: Tuple[int, int] = (0, 1)) -> np.ndarray:
"""
Method computing de Casteljau
Parameters
----------
m: np.ndarray:
array containing coefficients
t: float:
value for which point is calculated
make_copy: bool:
optional parameter if computation should not be in place
interval: Tuple[int, int]:
if interval != (0,1) we need to transform t
Returns
-------
np.ndarray:
array containing calculated points with given t
"""
_, n = m.shape
return de_caes_n_steps(m.copy(), t, n, interval) if make_copy else de_caes_n_steps(m, t, n, interval)
def de_caes_blossom(m: np.ndarray, ts: List[float], make_copy: bool = False,
interval: Tuple[int, int] = (0, 1)) -> np.ndarray:
"""
Method computing de Casteljau with different values of t in each step
Parameters
----------
m: np.ndarray:
array containing coefficients
ts: List[float]:
List containing all ts that are used in calculation
make_copy: bool:
optional parameter if computation should not be in place
interval: Tuple[int, int]:
if interval != (0,1) we need to transform t
Returns
-------
np.ndarray:
array containing calculated points with given t
"""
if m.shape[1] < 2:
raise Exception("At least two points are needed")
if len(ts) >= m.shape[1]:
raise Exception("Too many values to use!")
if not ts:
raise Exception("At least one element is needed!")
c = m.copy() if make_copy else m
for t in ts:
c = de_caes_one_step(c, t, interval)
return c
def parallel_decaes_unblossomed(m: np.ndarray, ts, interval: Tuple[int, int] = (0, 1)):
with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()*2) as executor:
return executor.map(lambda t: de_caes(m, t, interval=interval), ts)
if __name__ == '__main__':
x = [0, 0, 8, 4]
y = [0, 2, 2, 0]
# x_1 = [0]
# y_1 = [1]
test = np.array([x, y], dtype=float)
ptmp = list(parallel_decaes_unblossomed(test, np.linspace(0, 1, 1000)))
tmp = [de_caes(test, t, make_copy=True) for t in np.linspace(0, 1, 1000)]
assert len(ptmp) == len(tmp)
print('Länge OK')
for a, b in zip(tmp, ptmp):
assert all(a == b)
print('Alle Elemente gleich')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment