290 lines
12 KiB
Python
290 lines
12 KiB
Python
# agv_path_planner.py
|
|
# ------------------------------------------------------------
|
|
# AGV 경로 계획 + F/B(전/후진) 주석 출력
|
|
# - 입력: 현재 RFID, 직전 RFID, 마지막 모터(F/B), 목적지 RFID
|
|
# - MapData.json(첨부 데이터)에서 맵/좌표/도킹/연결 파싱
|
|
# - 결정 규칙: 목적지 근처 TP(갈림길) 우선 + 포크 루프(Left>Right>Straight) + 최종 도킹 모터 강제
|
|
# - 출력: RFID 경로 + F/B 주석 (항상 단일 경로)
|
|
# ------------------------------------------------------------
|
|
import re, math, argparse, sys, json
|
|
from collections import defaultdict, deque
|
|
from pathlib import Path
|
|
|
|
# ============= Map 파서 =============
|
|
def parse_map_text(text: str):
|
|
parts = re.split(r'\bNodeId\s+', text)
|
|
nodes = {}
|
|
for part in parts:
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
m = re.match(r'(\S+)', part)
|
|
if not m:
|
|
continue
|
|
nid = m.group(1)
|
|
|
|
def find_num(key):
|
|
m = re.search(fr'\b{key}\s+(-?\d+)', part)
|
|
return int(m.group(1)) if m else None
|
|
|
|
def find_str(key):
|
|
m = re.search(fr'\b{key}\s+([^\s\r\n]+)', part)
|
|
return m.group(1) if m else None
|
|
|
|
# Position
|
|
mpos = re.search(r'\bPosition\s+(-?\d+)\s*,\s*(-?\d+)', part)
|
|
pos = (int(mpos.group(1)), int(mpos.group(2))) if mpos else None
|
|
|
|
# ConnectedNodes (원문은 단방향처럼 보일 수 있으나 그래프는 양방향로 구축)
|
|
mconn = re.search(r'\bConnectedNodes\s+([^\r\n]+)', part)
|
|
conns = []
|
|
if mconn:
|
|
seg = mconn.group(1)
|
|
seg = re.split(r'\b(CanRotate|StationId|StationType|CreatedDate|ModifiedDate|IsActive|DisplayColor|RfidId|RfidStatus|RfidDescription|LabelText|FontFamily|FontSize|FontStyle|ForeColor|BackColor|ShowBackground|ImagePath|Scale|Opacity|Rotation|DisplayText|Name|Type|DockDirection|Position|NodeId)\b', seg)[0]
|
|
conns = re.findall(r'N\d+', seg)
|
|
|
|
nodes[nid] = {
|
|
'NodeId': nid,
|
|
'RfidId': find_str('RfidId'),
|
|
'Type': find_num('Type'), # 2=Station/Buffer, 3=Charger 등
|
|
'DockDirection': find_num('DockDirection'), # 1=전면(F), 2=후면(B)
|
|
'Position': pos,
|
|
'ConnectedNodes': conns,
|
|
}
|
|
|
|
# 양방향 인접 리스트
|
|
adj = defaultdict(set)
|
|
for u, d in nodes.items():
|
|
for v in d['ConnectedNodes']:
|
|
if v in nodes:
|
|
adj[u].add(v)
|
|
adj[v].add(u)
|
|
|
|
nid2rfid = {nid: nd['RfidId'] for nid, nd in nodes.items() if nd['RfidId']}
|
|
rfid2nid = {rfid: nid for nid, rfid in nid2rfid.items()}
|
|
return nodes, adj, nid2rfid, rfid2nid
|
|
|
|
|
|
# ============= 기하/도움함수 =============
|
|
def degree_map(adj):
|
|
return {n: len(adj[n]) for n in adj}
|
|
|
|
def is_fork(adj, n):
|
|
return len(adj[n]) >= 3 # 갈림길 기준
|
|
|
|
def vec(nodes, a, b):
|
|
ax, ay = nodes[a]['Position']; bx, by = nodes[b]['Position']
|
|
return (bx-ax, by-ay)
|
|
|
|
def angle_between(u, v):
|
|
ux, uy = u; vx, vy = v
|
|
du = max((ux*ux+uy*uy)**0.5, 1e-9); dv = max((vx*vx+vy*vy)**0.5, 1e-9)
|
|
ux/=du; uy/=du; vx/=dv; vy/=dv
|
|
dot = max(-1.0, min(1.0, ux*vx+uy*vy))
|
|
ang = math.degrees(math.acos(dot))
|
|
cross = ux*vy - uy*vx
|
|
return ang, cross
|
|
|
|
def classify_at_fork(nodes, adj, fork, came_from):
|
|
"""
|
|
진입 벡터(came_from -> fork) 기준으로 fork의 각 출구를
|
|
직진/좌/우로 분류. 좌표계(y-down) 관례를 반영해 cross>0 => Left.
|
|
"""
|
|
vin = vec(nodes, fork, came_from)
|
|
cand=[]
|
|
for nb in adj[fork]:
|
|
if nb == came_from:
|
|
continue
|
|
v = vec(nodes, fork, nb)
|
|
ang, cross = angle_between(vin, v)
|
|
dev = abs(180 - ang) # 180°에 가까울수록 '직진'
|
|
side = 'left' if cross > 0 else 'right' # 보정: cross>0 => left
|
|
cand.append((nb, dev, side))
|
|
if not cand:
|
|
return {'straight':None, 'left':None, 'right':None}
|
|
straight = min(cand, key=lambda x:x[1])[0]
|
|
lefts = [x for x in cand if x[2]=='left' and x[0]!=straight]
|
|
rights= [x for x in cand if x[2]=='right'and x[0]!=straight]
|
|
left = min(lefts, key=lambda x:x[1])[0] if lefts else None
|
|
right = min(rights, key=lambda x:x[1])[0] if rights else None
|
|
return {'straight': straight, 'left': left, 'right': right}
|
|
|
|
def shortest_path(adj, s, t):
|
|
q=deque([s]); prev={s:None}
|
|
while q:
|
|
u=q.popleft()
|
|
if u==t: break
|
|
for v in adj[u]:
|
|
if v not in prev:
|
|
prev[v]=u; q.append(v)
|
|
if t not in prev:
|
|
return None
|
|
path=[]; cur=t
|
|
while cur is not None:
|
|
path.append(cur); cur=prev[cur]
|
|
return list(reversed(path))
|
|
|
|
def desired_final_motor(nodes, goal_nid):
|
|
dd = nodes[goal_nid]['DockDirection']
|
|
ty = nodes[goal_nid]['Type']
|
|
if dd in (1,2):
|
|
return 'F' if dd==1 else 'B'
|
|
# fallback: Charger(Type=3)=>F, Station/Buffer(Type=2)=>B
|
|
if ty==3: return 'F'
|
|
if ty==2: return 'B'
|
|
return 'F'
|
|
|
|
# ============= TP(터닝포인트) 선택(목표 근접 우선 + 동률/우선순위 보정) =============
|
|
def choose_turning_point(nodes, adj, nid2rfid, rfid2nid, goal_nid, current_nid, preferred_tp_rfids=('004','039','040','038','005')):
|
|
forks = [n for n in adj if is_fork(adj, n) and n != goal_nid and n != current_nid]
|
|
if not forks:
|
|
return None
|
|
# 목표에서의 BFS 거리
|
|
q=deque([goal_nid]); dist={goal_nid:0}
|
|
while q:
|
|
u=q.popleft()
|
|
for v in adj[u]:
|
|
if v not in dist:
|
|
dist[v]=dist[u]+1; q.append(v)
|
|
pref_index = {rfid2nid[rf]:i for i,rf in enumerate(preferred_tp_rfids) if rf in rfid2nid}
|
|
best=None; best_key=None
|
|
for n in forks:
|
|
if n in dist:
|
|
key=(dist[n], pref_index.get(n, 9999)) # 1)목표와의 거리 2)사전선호
|
|
if best_key is None or key<best_key:
|
|
best_key=key; best=n
|
|
return best
|
|
|
|
# ============= 메인 플래너(단일 해답, F/B 주석) =============
|
|
def plan_route_with_fb(nodes, adj, nid2rfid, rfid2nid, current_rfid, prev_rfid, last_motor, goal_rfid,
|
|
preferred_tp_rfids=('004','039','040','038','005')):
|
|
"""
|
|
반환:
|
|
- path_rfid: RFID 시퀀스
|
|
- annotated: [(from, 'F'|'B', to), ...]
|
|
규칙:
|
|
- 마지막 모터 != 도킹 모터 => TP(목표근접)까지 F 전진, TP에서 '포크 루프' 수행(Left>Right>Straight),
|
|
TP 재진입 후에는 최종 모터로 goal까지 진행.
|
|
- 마지막 모터 == 도킹 모터 => 최단경로를 해당 모터로 진행(마지막 홉 모터 일치 보장).
|
|
"""
|
|
cur = rfid2nid[current_rfid]
|
|
prv = rfid2nid[prev_rfid]
|
|
goal= rfid2nid[goal_rfid]
|
|
|
|
sp = shortest_path(adj, cur, goal)
|
|
if not sp:
|
|
raise RuntimeError('경로 없음')
|
|
|
|
final_motor = desired_final_motor(nodes, goal)
|
|
last_motor = last_motor.upper()[0]
|
|
annotated=[]
|
|
|
|
def push(a,b,m):
|
|
annotated.append((nid2rfid[a], m, nid2rfid[b]))
|
|
|
|
# --- 모터 일치: 바로 최단경로 (마지막 홉 모터=final_motor 보장) ---
|
|
if last_motor == final_motor:
|
|
for i in range(len(sp)-1):
|
|
a,b = sp[i], sp[i+1]
|
|
motor = final_motor # 전체 구간 동일 모터로 통일
|
|
push(a,b,motor)
|
|
return [nid2rfid[n] for n in sp], annotated
|
|
|
|
# --- 모터 불일치: TP(목표근접) 사용 + 포크 루프 ---
|
|
tp = choose_turning_point(nodes, adj, nid2rfid, rfid2nid, goal, cur, preferred_tp_rfids)
|
|
if tp is None:
|
|
# TP가 없으면(희박) 최단경로 그대로, 마지막 홉만 강제(참고: 물리 제약상 권장X)
|
|
for i in range(len(sp)-1):
|
|
a,b = sp[i], sp[i+1]
|
|
motor = final_motor if i==len(sp)-2 else last_motor
|
|
push(a,b,motor)
|
|
return [nid2rfid[n] for n in sp], annotated
|
|
|
|
# A) 현재 -> TP는 'F(전진)'로
|
|
path_to_tp = shortest_path(adj, cur, tp)
|
|
for i in range(len(path_to_tp)-1):
|
|
a,b = path_to_tp[i], path_to_tp[i+1]
|
|
push(a,b,'F')
|
|
|
|
# B) TP 포크 루프: 분기 결정(결정적)
|
|
# - exit_to_goal: TP에서 goal로 향하는 최단경로의 다음 노드
|
|
# - came_from : TP에 들어올 때 직전 노드
|
|
path_back = shortest_path(adj, tp, goal)
|
|
exit_to_goal = path_back[1] if len(path_back)>=2 else None
|
|
came_from = path_to_tp[-2] if len(path_to_tp)>=2 else None
|
|
|
|
# 분류: TP에 came_from 기준으로 진입
|
|
cls = classify_at_fork(nodes, adj, tp, came_from) if is_fork(adj, tp) and came_from else {'left':None, 'right':None, 'straight':None}
|
|
# 분기 후보: (Left > Right > Straight), 단 exit_to_goal/came_from 제외
|
|
candidates = [cls.get('left'), cls.get('right'), cls.get('straight')]
|
|
loop_branch = None
|
|
for nb in candidates:
|
|
if nb and nb != exit_to_goal and nb != came_from:
|
|
loop_branch = nb; break
|
|
# 그래도 없으면 TP의 임의 다른 이웃 선택(출구/진입 제외)
|
|
if loop_branch is None:
|
|
for nb in adj[tp]:
|
|
if nb != exit_to_goal and nb != came_from:
|
|
loop_branch = nb; break
|
|
# (안전장치) 여전히 없으면 루프 생략
|
|
if loop_branch:
|
|
# TP -> loop_branch : F
|
|
push(tp, loop_branch, 'F')
|
|
# loop_branch -> TP : B
|
|
push(loop_branch, tp, 'B')
|
|
|
|
# C) TP -> goal 은 최종 모터로
|
|
path_back = shortest_path(adj, tp, goal) # (루프 후 동일)
|
|
for i in range(len(path_back)-1):
|
|
a,b = path_back[i], path_back[i+1]
|
|
push(a,b, final_motor)
|
|
|
|
# RFID 경로 구성 (루프를 포함시켜야 하므로 annotated에서 꺼냄)
|
|
rpath = [annotated[0][0]]
|
|
for (_,_,to_rfid) in annotated:
|
|
rpath.append(to_rfid)
|
|
return rpath, annotated
|
|
|
|
# ============= 유틸: 출력 포맷 =============
|
|
def format_annotated(annotated):
|
|
return " ".join([f"{a} -({m})-> {b}" for (a,m,b) in annotated])
|
|
|
|
# ============= CLI =============
|
|
def main():
|
|
p = argparse.ArgumentParser(description="AGV RFID 경로 계획 + F/B 주석")
|
|
p.add_argument("--map", default="MapData.json", help="맵 데이터 파일 (기본: MapData.json)")
|
|
p.add_argument("--current", required=True, help="현재 RFID (예: 032)")
|
|
p.add_argument("--prev", required=True, help="직전 RFID (예: 033)")
|
|
p.add_argument("--last", required=True, choices=['F','B','f','b','forward','backward','Forward','Backward'],
|
|
help="마지막 모터(F/B 또는 forward/backward)")
|
|
p.add_argument("--goal", required=True, help="목표 RFID (예: 040)")
|
|
p.add_argument("--tp-order", default="004,039,040,038,005", help="TP 우선순위 RFID(쉼표구분, 기본: 004,039,040,038,005)")
|
|
args = p.parse_args()
|
|
|
|
map_path = Path(args.map)
|
|
if not map_path.exists():
|
|
print(f"[오류] 맵 파일이 없습니다: {map_path.resolve()}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
text = map_path.read_text(encoding="utf-8", errors="ignore")
|
|
nodes, adj, nid2rfid, rfid2nid = parse_map_text(text)
|
|
|
|
# 입력 RFID 검증
|
|
for rf in (args.current, args.prev, args.goal):
|
|
if rf not in rfid2nid:
|
|
print(f"[오류] 미등록 RFID: {rf}", file=sys.stderr); sys.exit(2)
|
|
|
|
last_motor = 'F' if args.last.lower().startswith('f') else 'B'
|
|
tp_pref = tuple([rf.strip() for rf in args.tp_order.split(",") if rf.strip()])
|
|
|
|
rpath, annotated = plan_route_with_fb(nodes, adj, nid2rfid, rfid2nid,
|
|
args.current, args.prev, last_motor, args.goal,
|
|
preferred_tp_rfids=tp_pref)
|
|
|
|
print("\n=== 결과 ===")
|
|
print("RFID Path :", " → ".join(rpath))
|
|
print("F/B Path :", format_annotated(annotated))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|