diff --git a/competition/custom_utils.py b/competition/custom_utils.py new file mode 100644 index 000000000..1b709a781 --- /dev/null +++ b/competition/custom_utils.py @@ -0,0 +1,189 @@ +"""Utility module. + +For the IROS competition + +""" + +from math import sqrt, sin, cos, atan2, pi +import numpy as np + +def cubic_interp(inv_t, x0, xf, dx0, dxf): + dx = xf - x0 + return np.flip(( + x0, + dx0, + (3*dx*inv_t - 2*dx0 - dxf)*inv_t, + (-2*dx*inv_t + (dxf + dx0))*inv_t*inv_t)) + +def quintic_interp(inv_t, x0, xf, dx0, dxf, d2x0, d2xf): + dx = xf - x0 + return np.flip(( + x0, + dx0, + d2x0*0.5, + ((10*dx*inv_t - (4*dxf+6*dx0))*inv_t - 0.5*(3*d2x0-d2xf))*inv_t, + ((-15*dx*inv_t + (7*dxf+8*dx0))*inv_t + 0.5*(3*d2x0-2*d2xf))*inv_t*inv_t, + ((6*dx*inv_t - 3*(dxf+dx0))*inv_t - 0.5*(d2x0-d2xf))*inv_t*inv_t*inv_t)) + + +def f(coeffs, t): + return np.polyval(coeffs, t) + +def df(coeffs, t): + return np.polyval(np.polyder(coeffs), t) + +def d2f(coeffs, t): + return np.polyval(np.polyder(coeffs,2), t) + +def d3f(coeffs, t): + return np.polyval(np.polyder(coeffs,3), t) + +def df_idx(length, ts, x_coeffs, y_coeffs, z_coeffs): + def df_(t): + for idx in range(length-1): + if (ts[idx+1] > t): + break + t -= ts[idx] + dx = df(x_coeffs[:, idx], t) + dy = df(y_coeffs[:, idx], t) + dz = df(z_coeffs[:, idx], t) + return sqrt(dx*dx+dy*dy+dz*dz) + return df_ + +# Fill s, v, a, t arrays fro S-curve using j and T +def fill(T, t, s, v, a, j): + for i in range(7): + dt = T[i+1] + t[i+1] = t[i] + dt + s[i+1] = ((j[i]/6*dt + a[i]/2)*dt + v[i])*dt+ s[i] + v[i+1] = (j[i]/2*dt + a[i] )*dt + v[i] + a[i+1] = j[i] *dt + a[i] + +# RPY angles to 3x3 rotation matrix +def rpy2rot(roll, pitch, yaw): + sr = sin(roll) + cr = cos(roll) + sp = sin(pitch) + cp = cos(pitch) + sy = sin(yaw) + cy = cos(yaw) + m = np.matrix(((cy*cp, -sy*cr+cy*sp*sr, sy*sr+cy*cr*sp), + (sy*cp, cy*cr+sy*sp*sr, -cy*sr+sp*sy*cr), + (-sp , cp*sr , cp*cr))) + return m + +def is_intersect(waypoints, obstacle, low, high): + [x,y] = obstacle[0:2] + x_min, x_max = x + low, x + high + y_min, y_max = y + low, y + high + for idx, waypoint in enumerate(waypoints): + [x, y] = waypoint[0:2] + # print((x,y), obstacle, (x_min, y_min), (x_max, y_max)) + if x_min <= x and x <= x_max and y_min <= y and y <= y_max: + return True, idx + return False, None + +# Distance is distance away from obstacle that vector between spline_waypoint and obstacle is projected +# returns new waypoint [x,y,z,yaw] +def project_point(spline_waypoint, obstacle, distance): + [x1,y1] = spline_waypoint[0:2] + [x2,y2] = obstacle[0:2] + vec = np.array([x1-x2, y1-y2]) + norm_vec = vec * (1/np.linalg.norm(vec)) + new_waypoint = norm_vec * distance + np.array(obstacle[0:2]) + z = spline_waypoint[2] + yaw = pi/2. + atan2(new_waypoint[1], new_waypoint[0]) + new_waypoint = [new_waypoint[0], new_waypoint[1], z, yaw] + print("new_waypoint: ", new_waypoint) + return new_waypoint + +# Get nearest path segment (in x y plane) +def get_nearest_path_segment(new_waypoint, waypoints): + if(len(waypoints) < 2): + return 0.0 + min_dist_to_path = np.Inf + min_idx = 0 + for idx in range(len(waypoints)-1): + vec = np.array(waypoints[idx+1] - waypoints[idx]) + main_vec = new_waypoint - waypoints[idx] + vec[2] = main_vec[2] = 0.0 + dist_to_path = np.dot(vec, main_vec) * (1/np.linalg.norm(vec)) + if(dist_to_path < min_dist_to_path): + min_dist_to_path = dist_to_path + min_idx = idx + print(idx) + return min_idx+1 + +def update_waypoints_avoid_obstacles(spline_waypoints, waypoints, obstacles, initial_info): + is_collision = False + + ### Determine range around obstacle that is dangerous (low,high) + obstacle_distrib_dict = initial_info['gates_and_obs_randomization'] + tolerance = 0.1 # magic number to say how near we can be to the obs. includes drone size as well + low = -initial_info['obstacle_dimensions']['radius'] - tolerance + high = initial_info['obstacle_dimensions']['radius'] + tolerance + if 'obstacles' in obstacle_distrib_dict: + obstacle_distrib_dict = obstacle_distrib_dict['obstacles'] + low -= obstacle_distrib_dict['low'] + high += obstacle_distrib_dict['high'] + + # Check if obstacle position uncertainty intersects with waypoint path + for obstacle in obstacles: + collision, idx = is_intersect(spline_waypoints, obstacle, low, high) + if collision: + is_collision = True + print("Collision!") + dist = max(-low, high) + 0.5 + new_waypoint = project_point(spline_waypoints[idx], obstacle, dist) + insertion_idx = get_nearest_path_segment(new_waypoint, waypoints) + waypoints = np.insert(waypoints, insertion_idx, new_waypoint, axis=0) + + return is_collision, waypoints + +def check_intersect_poly(x_coeff, y_coeff, dt, x, y, r): + # Function describing distance between spline and point (x,y) + x_poly = np.copy(x_coeff) + x_poly[-1] -= x + y_poly = np.copy(y_coeff) + y_poly[-1] -= y + LHS = np.polyadd(np.polymul(x_poly, x_poly), np.polymul(y_poly, y_poly)) + + # Find stationary points + dLHS = np.polyder(LHS) + dLHS_roots = np.real_if_close(np.roots(dLHS)) + + # Find stationarty points that lie within radius of obstacle + tolerance = 0.1 # parameter to say how near we can be to the obs. includes drone size as well + r_2 = (r + tolerance)**2 + selected = [] + for root in dLHS_roots: + if root.imag: + continue + root = root.real + if root < 0 or root > dt: + continue + LHS_curr = np.polyval(LHS, root) + if LHS_curr > r_2: + continue + dist = sqrt(LHS_curr) + selected.append((root, dist)) + + if not selected: + # no collision + return + + # Collect all collisions + projected_points = [] + for t, dist in selected: + spline_x = f(x_coeff, t) + spline_y = f(y_coeff, t) + spline_dx = df(x_coeff, t) + spline_dy = df(y_coeff, t) + spline_d2x = d2f(x_coeff, t) + spline_d2y = d2f(y_coeff, t) + scale = (r + 2*tolerance) / dist + # print("scale", scale) + projected_x = x + (spline_x - x) * scale + projected_y = y + (spline_y - y) * scale + projected_points.append((t, (projected_x, spline_dx, spline_d2x),(projected_y, spline_dy, spline_d2y))) + return projected_points \ No newline at end of file diff --git a/competition/edit_this.py b/competition/edit_this.py index e61f25d55..a17a07009 100644 --- a/competition/edit_this.py +++ b/competition/edit_this.py @@ -28,6 +28,8 @@ from collections import deque +from competition.custom_utils import update_waypoints_avoid_obstacles + try: from competition_utils import Command, PIDController, timing_step, timing_ep, plot_trajectory, draw_trajectory except ImportError: @@ -38,13 +40,15 @@ # REPLACE THIS (START) ## ######################### -# Optionally, create and import modules you wrote. -# Please refrain from importing large or unstable 3rd party packages. +import scipy.integrate +from math import sqrt, sin, cos, atan2, log1p, pi +from copy import deepcopy + try: - import example_custom_utils as ecu + import custom_utils as utils except ImportError: # PyTest import. - from . import example_custom_utils as ecu + from . import custom_utils as utils ######################### # REPLACE THIS (END) #### @@ -109,51 +113,281 @@ def __init__(self, # REPLACE THIS (START) ## ######################### - # Call a function in module `example_custom_utils`. - ecu.exampleFunction() + # Kinematic limits + # TODO determine better estimates from model + v_max = 1.5 + a_max = 0.5 + j_max = 2 + # Affect curve radius around waypoints, higher value means larger curve, smaller value means tighter curve + self.grad_scale = .4 * v_max - # Example: hardcode waypoints through the gates. - if use_firmware: - waypoints = [(self.initial_obs[0], self.initial_obs[2], initial_info["gate_dimensions"]["tall"]["height"])] # Height is hardcoded scenario knowledge. - else: - waypoints = [(self.initial_obs[0], self.initial_obs[2], self.initial_obs[4])] + self.mass = initial_info['nominal_physical_parameters']['quadrotor_mass'] + self.tall = initial_info["gate_dimensions"]["tall"]["height"] + self.low = initial_info["gate_dimensions"]["low"]["height"] + + ### Spline fitting between waypoints ### + + self.length = len(self.NOMINAL_GATES) + 2 + # end goal + waypoints = [[self.initial_obs[0], self.initial_obs[2], self.initial_obs[4], self.initial_obs[8]],] + + self.half_pi = 0.5*pi for idx, g in enumerate(self.NOMINAL_GATES): - height = initial_info["gate_dimensions"]["tall"]["height"] if g[6] == 0 else initial_info["gate_dimensions"]["low"]["height"] - if g[5] > 0.75 or g[5] < 0: - if idx == 2: # Hardcoded scenario knowledge (direction in which to take gate 2). - waypoints.append((g[0]+0.3, g[1]-0.3, height)) - waypoints.append((g[0]-0.3, g[1]-0.3, height)) - else: - waypoints.append((g[0]-0.3, g[1], height)) - waypoints.append((g[0]+0.3, g[1], height)) - else: - if idx == 3: # Hardcoded scenario knowledge (correct how to take gate 3). - waypoints.append((g[0]+0.1, g[1]-0.3, height)) - waypoints.append((g[0]+0.1, g[1]+0.3, height)) - else: - waypoints.append((g[0], g[1]-0.3, height)) - waypoints.append((g[0], g[1]+0.3, height)) - waypoints.append([initial_info["x_reference"][0], initial_info["x_reference"][2], initial_info["x_reference"][4]]) - - # Polynomial fit. - self.waypoints = np.array(waypoints) - deg = 6 - t = np.arange(self.waypoints.shape[0]) - fx = np.poly1d(np.polyfit(t, self.waypoints[:,0], deg)) - fy = np.poly1d(np.polyfit(t, self.waypoints[:,1], deg)) - fz = np.poly1d(np.polyfit(t, self.waypoints[:,2], deg)) - duration = 15 - t_scaled = np.linspace(t[0], t[-1], int(duration*self.CTRL_FREQ)) - self.ref_x = fx(t_scaled) - self.ref_y = fy(t_scaled) - self.ref_z = fz(t_scaled) + [x, y, _, _, _, yaw, typ] = g + yaw += self.half_pi + z = self.tall if typ == 0 else self.low + waypoints.append([x, y, z, yaw]) + # end goal + waypoints.append([initial_info["x_reference"][0], initial_info["x_reference"][2], initial_info["x_reference"][4], initial_info["x_reference"][8]]) + + # "time" for each waypoint + # time interval determined by eulcidean distance between waypoints along xyz plane + ts = np.zeros(self.length) + [x_prev, y_prev, z_prev, _] = waypoints[0] + for idx in range(1,self.length): + [x_curr, y_curr, z_curr, _] = waypoints[idx] + norm = sqrt((x_curr-x_prev)**2 + (y_curr-y_prev)**2 + (z_curr-z_prev)**2) + ts[idx] = ts[idx-1] + norm / v_max + + # Flip gates + # Selectively flip gate orientation based on vector from previous and to next waypoint, + # and current gate's orientation + [x_prev, y_prev, _, _] = waypoints[0] + [x_curr, y_curr, _, yaw_curr] = waypoints[1] + dt = ts[1] - ts[0] + for idx in range(1,self.length-1): + [x_next, y_next, _, yaw_next] = waypoints[idx+1] + dt_next = ts[idx+1] - ts[idx] + dxf = cos(yaw_curr) * self.grad_scale + dyf = sin(yaw_curr) * self.grad_scale + vx1 = np.array((x_curr - x_prev,dt)) + vx21 = np.array((dxf,1)) + vx22 = np.array((-dxf,1)) + vx3 = np.array((x_next - x_curr,dt_next)) + + vy1 = np.array((y_curr - y_prev,dt)) + vy21 = np.array((dyf,1)) + vy22 = np.array((-dyf,1)) + vy3 = np.array((y_next - y_curr,dt_next)) + choice_1 = vx1.dot(vx21)/np.linalg.norm(vx1)/np.linalg.norm(vx21)+vx21.dot(vx3)/np.linalg.norm(vx21)/np.linalg.norm(vx3) + choice_1 += vy1.dot(vy21)/np.linalg.norm(vy1)/np.linalg.norm(vy21)+vy21.dot(vy3)/np.linalg.norm(vy21)/np.linalg.norm(vy3) + choice_2 = vx1.dot(vx22)/np.linalg.norm(vx1)/np.linalg.norm(vx22)+vx22.dot(vx3)/np.linalg.norm(vx22)/np.linalg.norm(vx3) + choice_2 += vy1.dot(vy22)/np.linalg.norm(vy1)/np.linalg.norm(vy22)+vy22.dot(vy3)/np.linalg.norm(vy22)/np.linalg.norm(vy3) + if choice_2 > choice_1: + waypoints[idx][3] += pi + # print("flipping", idx) + x_prev, y_prev = x_curr, y_curr + x_curr, y_curr = x_next, y_next + yaw_curr = yaw_next + dt = dt_next + + # Slightly shift gate waypoint to deal with over/undershoot of PID controller + # E.g. for double left > shift gate to right + # for double right > shift gate to left + # for right then left > shift gate to right + # for left then right > shift gate to left + # For consecutive turns in same direction, distance shifted is larger + # shift_dist = .2 + # yaw_prev = waypoints[0][3] + # yaw_curr = waypoints[1][3] + # dyaw = yaw_curr - yaw_prev + # for idx in range(1,self.length-1): + # yaw_next = waypoints[idx+1][3] + # dyaw_next = yaw_next - yaw_curr + # scale = (((dyaw + dyaw_next) / 2. / pi + 1.) % 2.) - 1. + # if dyaw > 0: + # scale = -scale + # if dyaw * dyaw_next < 0: + # scale = -scale + # waypoints[idx][0] -= sin(yaw_curr) * shift_dist * scale + # waypoints[idx][1] -= cos(yaw_curr) * shift_dist * scale + # yaw_prev = yaw_curr + # yaw_curr = yaw_next + # dyaw = dyaw_next + + # Obstacle radius (including randomness) + r = initial_info['obstacle_dimensions']['radius'] + obstacle_distrib_dict = initial_info['gates_and_obs_randomization'] + if 'obstacles' in obstacle_distrib_dict: + r += max(abs(obstacle_distrib_dict['obstacles']['low']), abs(obstacle_distrib_dict['obstacles']['high'])) + + x_coeffs = [] + y_coeffs = [] + z_coeffs = [] + [x0, y0, z0, yaw0] = waypoints[0] + dx0 = cos(yaw0)*0.01 + dy0 = sin(yaw0)*0.01 + dz0 = v_max + dzf = 0 + d2x0 = 0 + d2y0 = 0 + d2z0 = a_max + d2xf = 0 + d2yf = 0 + d2zf = 0 + self.ts = [0,] + waypoints_added = [] + for idx in range(1, self.length): + waypoint = waypoints[idx] + [xf, yf, zf, yawf] = waypoint + waypoints_added.append(waypoint) + dt = ts[idx] - ts[idx - 1] + inv_t = 1/dt + dxf = cos(yawf) * self.grad_scale + dyf = sin(yawf) * self.grad_scale + if idx == self.length-1: # yaw of last goal not important + dxf = dx0 * 0.01 + dyf = dy0 * 0.01 + x_coeff = utils.quintic_interp(inv_t, x0, xf, dx0, dxf, d2x0, d2xf) + y_coeff = utils.quintic_interp(inv_t, y0, yf, dy0, dyf, d2y0, d2yf) + z_coeff = utils.quintic_interp(inv_t, z0, zf, dz0, dzf, d2z0, d2zf) + + # Obstacle avoidance by finding closest points on spline to obstacle + # Adds another waypoint that projects the spline outward from centre of obstacle + projected_points = [] + for obstacle in self.NOMINAL_OBSTACLES: + x = obstacle[0] + y = obstacle[1] + projected_point = utils.check_intersect_poly(x_coeff, y_coeff, dt, x, y, r) + if projected_point: + # print("projected points", projected_point) + projected_points += projected_point + if projected_points: + projected_points.sort(key = lambda x: x[0]) + prev_t = 0 + for t, (x, dx, d2x), (y, dy, d2y) in projected_points: + z = utils.f(z_coeff, t) + dz = utils.df(z_coeff, t) + d2z = utils.d2f(z_coeff, t) + inv_t = 1/(t - prev_t) + x_coeff = utils.quintic_interp(inv_t, x0, x, dx0, dx, d2x0, d2x) + y_coeff = utils.quintic_interp(inv_t, y0, y, dy0, dy, d2y0, d2y) + z_coeff = utils.quintic_interp(inv_t, z0, z, dz0, dz, d2z0, d2z) + x_coeffs.append(x_coeff) + y_coeffs.append(y_coeff) + z_coeffs.append(z_coeff) + self.ts.append(ts[idx - 1] + t) + waypoints_added.append((x, y, z, 0)) + + [x0, y0, z0] = [x, y, z] + [dx0, dy0, dz0] = [dx, dy, dz] + [d2x0, d2y0, d2z0] = [d2x, d2y, d2z] + prev_t = t + inv_t = 1/(ts[idx] - ts[idx - 1] - prev_t) + x_coeff = utils.quintic_interp(inv_t, x0, xf, dx0, dxf, d2x0, d2xf) + y_coeff = utils.quintic_interp(inv_t, y0, yf, dy0, dyf, d2y0, d2yf) + z_coeff = utils.quintic_interp(inv_t, z0, zf, dz0, dzf, d2z0, d2zf) + x_coeffs.append(x_coeff) + y_coeffs.append(y_coeff) + z_coeffs.append(z_coeff) + self.ts.append(ts[idx]) + + [x0, y0, z0] = [xf, yf, zf] + [dx0, dy0, dz0] = [dxf, dyf, dzf] + [d2x0, d2y0, d2z0] = [d2xf, d2yf, d2zf] + x_coeffs = np.array(x_coeffs).transpose() + y_coeffs = np.array(y_coeffs).transpose() + z_coeffs = np.array(z_coeffs).transpose() + waypoints = np.array(waypoints_added) + self.length = len(self.ts) + # z_coeffs = scipy.interpolate.PchipInterpolator(self.ts, waypoints[:,2], 0).c + + # modify endpoint gradient for smooth z ending (pchip iterpolator) + # z_coeffs[:,-1] = utils.cubic_interp(1/(self.ts[-1]-self.ts[-2]), + # waypoints[-2,2], + # waypoints[-1,2], + # z_coeffs[-2,-1], + # 0 + # ) + self.coeffs = [x_coeffs, y_coeffs, z_coeffs] + + # Integrate to get pathlength + pathlength = 0 + for idx in range(self.length-1): + pathlength += scipy.integrate.quad( + utils.df_idx(self.length, self.ts, x_coeffs, y_coeffs, z_coeffs), + 0, self.ts[idx+1] - self.ts[idx])[0] + self.scaling_factor = self.ts[-1] / pathlength if self.VERBOSE: + duration = self.ts[-1] - self.ts[0] + t_scaled = np.linspace(self.ts[0], self.ts[-1], int(duration*self.CTRL_FREQ)) + t_diff_scaled = np.zeros(t_scaled.shape) + waypoint_scaled = np.zeros(t_scaled.shape, dtype=np.ushort) + waypoint = 0 + print(self.ts) + print(x_coeffs) + print(y_coeffs) + print(z_coeffs) + print(waypoints) + for i, t in enumerate(t_scaled): + if waypoint < self.length and t > self.ts[waypoint+1]: + waypoint += 1 + t_diff_scaled[i] = t_scaled[i] - self.ts[waypoint] + waypoint_scaled[i] = waypoint + x_scaled = np.array(tuple(map( + lambda idx, t: utils.f(x_coeffs[:, idx], t), + waypoint_scaled, t_diff_scaled))) + y_scaled = np.array(tuple(map( + lambda idx, t: utils.f(y_coeffs[:, idx], t), + waypoint_scaled, t_diff_scaled))) + z_scaled = np.array(tuple(map( + lambda idx, t: utils.f(z_coeffs[:, idx], t), + waypoint_scaled, t_diff_scaled))) # Plot trajectory in each dimension and 3D. - plot_trajectory(t_scaled, self.waypoints, self.ref_x, self.ref_y, self.ref_z) - - # Draw the trajectory on PyBullet's GUI. - draw_trajectory(initial_info, self.waypoints, self.ref_x, self.ref_y, self.ref_z) + plot_trajectory(t_scaled, waypoints, x_scaled, y_scaled, z_scaled) + # Draw the trajectory on PyBullet's GUI + draw_trajectory(initial_info, waypoints, x_scaled, y_scaled, z_scaled) + + # spline_waypoints = np.dstack((x_scaled, y_scaled, z_scaled))[0] + # is_collision, waypoints = update_waypoints_avoid_obstacles(spline_waypoints, waypoints, self.NOMINAL_OBSTACLES, + # initial_info) + + ### S-curve ### + sf = pathlength + + s = np.zeros(8) + v = np.zeros(8) + a = np.zeros(8) + j = np.array((j_max, 0, -j_max, 0, -j_max, 0, j_max, 0)) + t = np.zeros(8) + T = np.zeros(8) + + T[1] = T[3] = T[5] = T[7] = min(sqrt(v_max/j_max), a_max/j_max) # min(wont't hit a_max, will hit a_max) + utils.fill(T, t, s, v, a, j) + ds = sf - s[-1] + if ds < 0: + # T[2] = T[4] = T[6] = 0 + T[1] = T[3] = T[5] = T[7] = (xf/(2*j_max))**(1./3) + else: + T_2_max = (v_max - 2*v[1])/a_max + added_T_2 = (-v[1]+sqrt(v[1]*v[1]/2 + a_max*ds))/a_max + if added_T_2 > T_2_max: + T[2] = T[6] = T_2_max + ds -= (2*v[1] + a_max*T_2_max)*T_2_max + T[4] = ds/v_max + else: + # T[4] = 0 + T[2] = T[6] = added_T_2 + utils.fill(T, t, s, v, a, j) + def s_curve(t_): + for i in range(7): + if t[i+1] > t_: + break + dt = t_ - t[i] + s_ = ((j[i]/6*dt + a[i]/2)*dt + v[i])*dt+ s[i] + v_ = (j[i]/2*dt + a[i] )*dt + v[i] + a_ = j[i] *dt + a[i] + j_ = j[i] + return [s_, v_, a_, j_] + self.s = s_curve + self.end_t = t[-1] + self.scaled_t = 0 + self.time_scale = 1 + self.at_gate = False + self.gate_no = 0 + self.run_coeffs = deepcopy(self.coeffs) + self.run_ts = np.copy(self.ts) ######################### # REPLACE THIS (END) #### @@ -194,64 +428,113 @@ def cmdFirmware(self, # REPLACE THIS (START) ## ######################### - # Handwritten solution for GitHub's getting_stated scenario. - - if iteration == 0: - height = 1 - duration = 2 - - command_type = Command(2) # Take-off. - args = [height, duration] - - elif iteration >= 3*self.CTRL_FREQ and iteration < 20*self.CTRL_FREQ: - step = min(iteration-3*self.CTRL_FREQ, len(self.ref_x) -1) - target_pos = np.array([self.ref_x[step], self.ref_y[step], self.ref_z[step]]) - target_vel = np.zeros(3) - target_acc = np.zeros(3) - target_yaw = 0. - target_rpy_rates = np.zeros(3) - - command_type = Command(1) # cmdFullState. - args = [target_pos, target_vel, target_acc, target_yaw, target_rpy_rates] - - elif iteration == 20*self.CTRL_FREQ: - command_type = Command(6) # Notify setpoint stop. - args = [] - - elif iteration == 20*self.CTRL_FREQ+1: - x = self.ref_x[-1] - y = self.ref_y[-1] - z = 1.5 - yaw = 0. - duration = 2.5 - - command_type = Command(5) # goTo. - args = [[x, y, z], yaw, duration, False] - - elif iteration == 23*self.CTRL_FREQ: - x = self.initial_obs[0] - y = self.initial_obs[2] - z = 1.5 - yaw = 0. - duration = 6 - - command_type = Command(5) # goTo. - args = [[x, y, z], yaw, duration, False] - - elif iteration == 30*self.CTRL_FREQ: - height = 0. - duration = 3 - - command_type = Command(3) # Land. - args = [height, duration] - - elif iteration == 33*self.CTRL_FREQ-1: - command_type = Command(-1) # Terminate command to be sent once the trajectory is completed. - args = [] - + if self.scaled_t < self.end_t: + if info and info['current_target_gate_in_range']: + if not self.at_gate: + # Local replan when near to goal + self.at_gate = True + [x, y, z, _, _, yaw] = info['current_target_gate_pos'] + + # slow down based on distance jumped by gate + prev_x = self.run_coeffs[0][-1,self.gate_no+1] + prev_y = self.run_coeffs[1][-1,self.gate_no+1] + prev_z = self.run_coeffs[2][-1,self.gate_no+1] + error = sqrt((x-prev_x)**2 + (y-prev_y)**2 + (z-prev_z)**2) + self.time_scale += -log1p(np.linalg.norm(error)-.25)*0.05 + self.time_scale = max(0.0, min(1.0, self.time_scale)) + + yaw += self.half_pi + dt = self.run_ts[self.gate_no + 1] - self.curve_t + inv_t = 1/dt + dx_gate = cos(yaw) * self.grad_scale + dy_gate = sin(yaw) * self.grad_scale + dz_gate = 0 + for i, (val, dval) in enumerate(zip([x,y,z], [dx_gate, dy_gate, dz_gate])): + self.run_coeffs[i] = np.insert(self.run_coeffs[i], self.gate_no+1, utils.quintic_interp( + inv_t, + self.target_pose[i], + val, + self.tangent[i], + dval, + self.dtangent[i], + 0 + ), axis=1) + dt = self.run_ts[self.gate_no + 2] - self.run_ts[self.gate_no + 1] + inv_t = 1/dt + for i, (val, dval) in enumerate(zip([x,y,z], [dx_gate, dy_gate, dz_gate])): + self.run_coeffs[i][:,self.gate_no+2] = utils.quintic_interp( + inv_t, + val, + utils.f(self.run_coeffs[i][:, self.gate_no+2], dt), + dval, + utils.df(self.run_coeffs[i][:, self.gate_no+2], dt), + 0, + 0 + ) + self.run_ts = np.insert(self.run_ts, self.gate_no+1, self.curve_t) + self.length += 1 + self.gate_no += 1 + else: + self.at_gate = False + + self.scaled_t += self.time_scale / self.CTRL_FREQ / self.scaling_factor + + [curve_t, curve_v, curve_a, curve_j] = self.s(self.scaled_t) + curve_t *= self.scaling_factor + curve_v *= self.scaling_factor * self.time_scale + curve_a *= self.scaling_factor * self.time_scale + curve_j *= self.scaling_factor * self.time_scale + self.curve_t = curve_t + + if (self.gate_no < self.length-2 and self.run_ts[self.gate_no+1] <= curve_t): + self.gate_no += 1 + + t = curve_t - self.run_ts[self.gate_no] + + self.target_pose = np.array([utils.f(coeffs[:, self.gate_no], t) for coeffs in self.run_coeffs]) + self.tangent = np.array([utils.df(coeffs[:, self.gate_no], t) for coeffs in self.run_coeffs]) + self.dtangent = np.array([utils.d2f(coeffs[:, self.gate_no], t) for coeffs in self.run_coeffs]) + self.d2tangent = np.array([utils.d3f(coeffs[:, self.gate_no], t) for coeffs in self.run_coeffs]) + + error = np.array((obs[0] - self.target_pose[0], obs[2] - self.target_pose[1], obs[4] - self.target_pose[2])) # positional error (world frame) + # print(error) + self.time_scale += -log1p(np.linalg.norm(error)-.25)*0.05 + self.time_scale = max(0.0, min(1.0, self.time_scale)) + # print(self.time_scale) + + target_yaw = atan2(self.tangent[1], self.tangent[0]) + target_vel = self.tangent * curve_v + target_acc = self.tangent * curve_a + self.dtangent * curve_v**2 + + # Roll, pitch rate + # Small angle approximation + target_jerk = self.tangent * curve_j + self.d2tangent * curve_v**3 + 3 * self.dtangent * curve_v * curve_a + Jinv = utils.rpy2rot(obs[6], obs[7], obs[8]).transpose() + body_jerk = np.matmul(Jinv, target_jerk.transpose()) + p = self.mass/9.8*body_jerk[0,1] # roll rate = mass / g * y_jerk + q = self.mass/9.8*body_jerk[0,0] # pitch rate = mass / g * x_jerk + + # Yaw rate + den = np.linalg.norm(self.tangent[:2]) + if den < 1e-9: + r = 0 + else: + num = self.tangent[0] * self.dtangent[1] - self.tangent[1] * self.dtangent[0] + r = num/den + r *= curve_v + + target_rpy_rates = np.array((p,q,r)) + # target_vel = np.zeros(3) + # target_acc = np.zeros(3) + # target_yaw = 0. + # target_rpy_rates = np.zeros(3) + self.args = [self.target_pose, target_vel, target_acc, target_yaw, target_rpy_rates] else: - command_type = Command(0) # None. - args = [] + self.args[1] = np.zeros(3) + self.args[2] = np.zeros(3) + self.args[4] = np.zeros(3) + command_type = Command(1) # cmdFullState. + args = self.args ######################### # REPLACE THIS (END) #### @@ -361,6 +644,16 @@ def interEpisodeLearn(self): _ = self.done_buffer _ = self.info_buffer + # Reset run variables + + self.scaled_t = 0 + self.time_scale = 1 + self.at_gate = False + self.gate_no = 0 + self.run_coeffs = deepcopy(self.coeffs) + self.run_ts = np.copy(self.ts) + self.length = len(self.ts) + ######################### # REPLACE THIS (END) #### ######################### diff --git a/competition/levelh.yaml b/competition/levelh.yaml new file mode 100644 index 000000000..7396ddacf --- /dev/null +++ b/competition/levelh.yaml @@ -0,0 +1,182 @@ +# Level 0 + +# | Evaluation Scenario | Constraints | Rand. Inertial Properties | Randomized Obstacles, Gates | Rand. Between Episodes | Notes | +# | :-----------------: | :----------: | :-----------------------: | :-------------------------: | :--------------------: | :---------------: | +# | `level0.yaml` | **Yes** | *No* | *No* | *No* | Perfect knowledge | + +# Configuration +num_episodes: 2 # Customizable number of episodes (NOTE: solutions that require fewer episode will receive higher scores) +use_firmware: True # Whether to use pycffirmware or not (NOTE: only set to False for debugging purposes) +verbose: True # Boolean passed to the Controller class, can be used to turn on and off additional printouts + +# Note: DSL's flyable/usable space in Toronto extends from -3 to +3 meters in x and y and 0 and 2 meters in z +# Thus, only scenarios that meet these constraints are suitable for sim2real experiments + +# Environment +quadrotor_config: + seed: 2222 # Random seed + reseed_on_reset: True # Whether to re-seed the random number generator between episodes + gui: True # Whether to spawn PyBullet's GUI + camera_view: [5, -40, -40, 0.5, -1, 0.5] # Distance, yaw, pitch, x, y, z target position + + ctrl_freq: 30 # Frequency of the controller's decision making + pyb_freq: 500 # If `use_firmware` == True, must be multiple of 500, otherwise it must be multiple of ctrl_freq + episode_len_sec: 33 # Maximum episode duration, in seconds + + done_on_violation: True # Whether `done` becomes True when a constraint is violated + done_on_completion: True # Whether `done` becomes True when the task is completed (traveled all gates and 2'' hover at stabilization_goal) + done_on_collision: True # Whether `done` becomes True when a collision happens + cost: competition # Sparse reward function, if you desire, re-implement it in Quadrotor._get_reward() + # github.com/utiasDSL/safe-control-gym/blob/beta-iros-competition/safe_control_gym/envs/gym_pybullet_drones/quadrotor.py + + task: stabilization + task_info: + stabilization_goal: [-0.5, 2.9, 0.75] # Final goal (target position in x-y-z) + stabilization_goal_tolerance: 0.15 + + # Nominal (pre-randomization) initial state + init_state: + init_x: -0.9 + init_x_dot: 0 + init_y: -2.9 + init_y_dot: 0 + init_z: 0.03 + init_z_dot: 0 + init_phi: 0 + init_theta: 0 + init_psi: 0 + init_p: 0 + init_q: 0 + init_r: 0 + + # Nominal (pre-randomization) positions of gates and obstacles + gates: + [ # x, y, z, r, p, y, type (0: `tall` obstacle, 1: `low` obstacle) + [0.5, -2.45, 0, 0, 0, -1.57, 0], + [2, -1.5, 0, 0, 0, 0, 1], + [0, 0.2, 0, 0, 0, 1.57, 1], + [-0.5, 1.5, 0, 0, 0, 0, 0] + ] + obstacles: + [ # x, y, z, r, p, y + [0, -2.5, 0, 0, 0, 0], + [1, -2, 0, 0, 0, 0], + [1.5, 0, 0, 0, 0, 0], + [-1, 0, 0, 0, 0, 0] + ] + + # Randomization of the initial state + randomized_init: False + # init_state_randomization_info: + # init_x: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + # init_y: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + # init_z: + # distrib: "uniform" + # low: 0.0 + # high: 0.02 + # init_phi: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + # init_theta: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + # init_psi: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + + # Randomization of the quadrotor inertial properties + randomized_inertial_prop: False + # inertial_prop_randomization_info: + # M: + # distrib: "uniform" + # low: -0.01 + # high: 0.01 + # Ixx: + # distrib: "uniform" + # low: -0.000001 + # high: 0.000001 + # Iyy: + # distrib: "uniform" + # low: -0.000001 + # high: 0.000001 + # Izz: + # distrib: "uniform" + # low: -0.000001 + # high: 0.000001 + + # Randomization of the gates and obstacles positions + randomized_gates_and_obstacles: False + # gates_and_obstacles_randomization_info: + # gates: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + # obstacles: + # distrib: "uniform" + # low: -0.1 + # high: 0.1 + + # Constraints + constraints: + # Input constraints + - constraint_form: default_constraint + constrained_variable: input + # State constraints + - constraint_form: bounded_constraint + constrained_variable: state + active_dims: [0, 2, 4] # The uncommented entries in the upper_bounds/lower_bounds vectors below + upper_bounds: + - 3 # x + # - 100 # x_dot + - 3 # y + # - 100 # y_dot + - 2 # z + # - 100 # z_dot + # - 3 # phi + # - 3 # theta + # - 3 # psi + # - 10 # p + # - 10 # q + # - 10 # r + lower_bounds: + - -3 # x + # - -100 # x_dot + - -3 # y + # - -100 # y_dot + - -0.1 # z + # - -100 # z_dot + # - -3 # phi + # - -3 # theta + # - -3 # psi + # - -10 # p + # - -10 # q + # - -10 # r + + # Disturbances + # disturbances: + # # Additive noise on the commanded input action + # action: + # - disturbance_func: white_noise + # std: 0.001 + # # Random external force applied to the quadrotor (e.g. a push or wind) + # dynamics: + # - disturbance_func: uniform + # low: [-0.1, -0.1, -0.1] + # high: [0.1, 0.1, 0.1] + + + + # Do NOT edit + quad_type: 3 # 3D quadrotor + info_in_reset: True # Info dictionary as second return value (after the initial observation) of env.reset() + normalized_rl_action_space: False + done_on_out_of_bound: False