- agv_path_planner.py: JSON 형식 맵 파일 지원 추가 * parse_map_json() 함수로 MapData.json/NewMap.agvmap 파싱 * RFID 정규화 및 별칭 지원 (007/7 등) * TP(터닝포인트) 기반 방향전환 알고리즘 검증 완료 - universal_pathfinder.py: 범용 패턴 기반 경로 탐색 * Q1-1, Q1-2, Q2-1, Q2-2 모든 시나리오 패턴 구현 * UniversalPathFormatter 방향전환 표기 수정 * 28개 테스트 케이스 중 20개 성공 (71.4%) - test_all_scenarios.py: 전체 테스트 케이스 검증 스크립트 * 4개 시나리오 × 7개 목표 = 28개 케이스 자동 검증 * 사용자 제공 정답과 비교 분석 - show_map_info.py: 맵 구조 분석 도구 * RFID 목록, 연결 정보, 갈림길 분석 * 노드 타입별 분류 및 통계 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
362 lines
13 KiB
Python
362 lines
13 KiB
Python
# agv_path_planner.py (v1.1)
|
|
# ------------------------------------------------------------
|
|
# AGV 경로 계획 + F/B(전/후진) 주석 출력 (단일 해답 보장)
|
|
# - 입력: 현재 RFID, 직전 RFID, 마지막 모터(F/B), 목표 RFID
|
|
# - MapData.json에서 맵/좌표/도킹/연결 파싱
|
|
# - 결정 규칙: 목적지 근접 TP(갈림길) 우선 + 포크 루프(Left>Right>Straight) + 최종 도킹 모터 강제
|
|
# - robust: utf-8-sig 로딩, RFID 정규화(007/7 등), --debug 모드
|
|
# ------------------------------------------------------------
|
|
import re, math, argparse, sys
|
|
from collections import defaultdict, deque
|
|
from pathlib import Path
|
|
|
|
# ---------- RFID 정규화 ----------
|
|
def norm_rfid_token(s: str) -> str:
|
|
"""
|
|
숫자만 추출하여 3자리 zero-pad (예: '7'->'007', '007'->'007').
|
|
숫자가 하나도 없으면 원문 반환.
|
|
"""
|
|
if s is None: return s
|
|
digits = re.sub(r'\D+', '', s)
|
|
return digits.zfill(3) if digits else s.strip()
|
|
|
|
def make_rfid_aliases(raw: str):
|
|
"""
|
|
한 RFID에 대해 가능한 별칭 세트를 만듭니다.
|
|
예: '007' -> {'007','7'}
|
|
'40' -> {'040','40'}
|
|
"""
|
|
if raw is None: return set()
|
|
raw = raw.strip()
|
|
z3 = norm_rfid_token(raw)
|
|
nz = raw.lstrip('0') or '0'
|
|
znz = z3.lstrip('0') or '0'
|
|
return {raw, z3, nz, znz}
|
|
|
|
# ---------- Map 파서 ----------
|
|
import json
|
|
|
|
def parse_map_json(text: str):
|
|
"""JSON 형식 맵 데이터 파싱 (MapData.json, NewMap.agvmap용)"""
|
|
try:
|
|
data = json.loads(text)
|
|
nodes = {}
|
|
|
|
for node_data in data.get("Nodes", []):
|
|
nid = node_data.get("NodeId")
|
|
if not nid:
|
|
continue
|
|
|
|
# Position 파싱 "x, y" → (x, y)
|
|
pos_str = node_data.get("Position", "0, 0")
|
|
try:
|
|
x, y = map(int, pos_str.split(", "))
|
|
pos = (x, y)
|
|
except:
|
|
pos = (0, 0)
|
|
|
|
nodes[nid] = {
|
|
'NodeId': nid,
|
|
'RfidId': node_data.get('RfidId'),
|
|
'Type': node_data.get('Type', 0),
|
|
'DockDirection': node_data.get('DockDirection', 0),
|
|
'Position': pos,
|
|
'ConnectedNodes': node_data.get('ConnectedNodes', []),
|
|
}
|
|
|
|
# 양방향 인접
|
|
adj = defaultdict(set)
|
|
for u, d in nodes.items():
|
|
for v in d['ConnectedNodes']:
|
|
if v in nodes:
|
|
adj[u].add(v); adj[v].add(u)
|
|
|
|
# RFID 인덱스(별칭 포함)
|
|
nid2rfid = {}
|
|
rfid2nid = {}
|
|
for nid, nd in nodes.items():
|
|
rf = nd.get('RfidId')
|
|
if rf:
|
|
nid2rfid[nid] = norm_rfid_token(rf) # 표준 3자리로 보관
|
|
for alias in make_rfid_aliases(rf):
|
|
rfid2nid[alias] = nid
|
|
|
|
return nodes, adj, nid2rfid, rfid2nid
|
|
|
|
except json.JSONDecodeError:
|
|
# JSON 파싱 실패 시 텍스트 파서로 폴백
|
|
return parse_map_text(text)
|
|
|
|
def parse_map_text(text: str):
|
|
parts = re.split(r'\bNodeId\s+', text)
|
|
nodes = {}
|
|
for part in parts:
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
m = re.match(r'(\S+)', part)
|
|
if not m:
|
|
continue
|
|
nid = m.group(1)
|
|
|
|
def find_num(key):
|
|
m = re.search(fr'\b{key}\s+(-?\d+)', part)
|
|
return int(m.group(1)) if m else None
|
|
|
|
def find_str(key):
|
|
m = re.search(fr'\b{key}\s+([^\s\r\n]+)', part)
|
|
return m.group(1).strip() if m else None
|
|
|
|
mpos = re.search(r'\bPosition\s+(-?\d+)\s*,\s*(-?\d+)', part)
|
|
pos = (int(mpos.group(1)), int(mpos.group(2))) if mpos else None
|
|
|
|
mconn = re.search(r'\bConnectedNodes\s+([^\r\n]+)', part)
|
|
conns = []
|
|
if mconn:
|
|
seg = mconn.group(1)
|
|
seg = re.split(r'\b(CanRotate|StationId|StationType|CreatedDate|ModifiedDate|IsActive|DisplayColor|RfidId|RfidStatus|RfidDescription|LabelText|FontFamily|FontSize|FontStyle|ForeColor|BackColor|ShowBackground|ImagePath|Scale|Opacity|Rotation|DisplayText|Name|Type|DockDirection|Position|NodeId)\b', seg)[0]
|
|
conns = re.findall(r'N\d+', seg)
|
|
|
|
nodes[nid] = {
|
|
'NodeId': nid,
|
|
'RfidId': find_str('RfidId'),
|
|
'Type': find_num('Type'), # 2=Station/Buffer, 3=Charger 등
|
|
'DockDirection': find_num('DockDirection'), # 1=전면(F), 2=후면(B)
|
|
'Position': pos,
|
|
'ConnectedNodes': conns,
|
|
}
|
|
|
|
# 양방향 인접
|
|
adj = defaultdict(set)
|
|
for u, d in nodes.items():
|
|
for v in d['ConnectedNodes']:
|
|
if v in nodes:
|
|
adj[u].add(v); adj[v].add(u)
|
|
|
|
# RFID 인덱스(별칭 포함)
|
|
nid2rfid = {}
|
|
rfid2nid = {}
|
|
for nid, nd in nodes.items():
|
|
rf = nd.get('RfidId')
|
|
if rf:
|
|
nid2rfid[nid] = norm_rfid_token(rf) # 표준 3자리로 보관
|
|
for alias in make_rfid_aliases(rf):
|
|
rfid2nid[alias] = nid
|
|
|
|
return nodes, adj, nid2rfid, rfid2nid
|
|
|
|
# ---------- 기하/유틸 ----------
|
|
def is_fork(adj, n): return len(adj[n]) >= 3
|
|
|
|
def vec(nodes, a, b):
|
|
ax, ay = nodes[a]['Position']; bx, by = nodes[b]['Position']
|
|
return (bx-ax, by-ay)
|
|
|
|
def angle_between(u, v):
|
|
ux,uy=u; vx,vy=v
|
|
du=max((ux*ux+uy*uy)**0.5,1e-9); dv=max((vx*vx+vy*vy)**0.5,1e-9)
|
|
ux/=du; uy/=du; vx/=dv; vy/=dv
|
|
dot=max(-1.0,min(1.0,ux*vx+uy*vy))
|
|
ang=math.degrees(math.acos(dot))
|
|
cross=ux*vy-uy*vx
|
|
return ang, cross
|
|
|
|
def classify_at_fork(nodes, adj, fork, came_from):
|
|
vin = vec(nodes, fork, came_from)
|
|
cand=[]
|
|
for nb in adj[fork]:
|
|
if nb==came_from: continue
|
|
v=vec(nodes, fork, nb)
|
|
ang,cross=angle_between(vin, v)
|
|
dev=abs(180-ang)
|
|
side='left' if cross>0 else 'right' # y-down 화면 기준 보정
|
|
cand.append((nb,dev,side))
|
|
if not cand:
|
|
return {'straight':None,'left':None,'right':None}
|
|
straight=min(cand, key=lambda x:x[1])[0]
|
|
lefts=[x for x in cand if x[2]=='left' and x[0]!=straight]
|
|
rights=[x for x in cand if x[2]=='right'and x[0]!=straight]
|
|
left=min(lefts, key=lambda x:x[1])[0] if lefts else None
|
|
right=min(rights, key=lambda x:x[1])[0] if rights else None
|
|
return {'straight':straight,'left':left,'right':right}
|
|
|
|
def shortest_path(adj, s, t):
|
|
q=deque([s]); prev={s:None}
|
|
while q:
|
|
u=q.popleft()
|
|
if u==t: break
|
|
for v in adj[u]:
|
|
if v not in prev:
|
|
prev[v]=u; q.append(v)
|
|
if t not in prev: return None
|
|
path=[]; cur=t
|
|
while cur is not None:
|
|
path.append(cur); cur=prev[cur]
|
|
return list(reversed(path))
|
|
|
|
def desired_final_motor(nodes, goal_nid):
|
|
dd = nodes[goal_nid].get('DockDirection')
|
|
ty = nodes[goal_nid].get('Type')
|
|
if dd in (1,2): return 'F' if dd==1 else 'B'
|
|
if ty==3: return 'F'
|
|
if ty==2: return 'B'
|
|
return 'F'
|
|
|
|
def choose_turning_point(nodes, adj, nid2rfid, rfid2nid, goal_nid, current_nid, preferred_tp_rfids=('004','039','040','038','005')):
|
|
forks=[n for n in adj if is_fork(adj,n) and n!=goal_nid and n!=current_nid]
|
|
if not forks: return None
|
|
# 목표에서 BFS 거리
|
|
q=deque([goal_nid]); dist={goal_nid:0}
|
|
while q:
|
|
u=q.popleft()
|
|
for v in adj[u]:
|
|
if v not in dist:
|
|
dist[v]=dist[u]+1; q.append(v)
|
|
pref_index={rfid2nid.get(rf, None):i for i,rf in enumerate(preferred_tp_rfids) if rf in rfid2nid}
|
|
best=None; best_key=None
|
|
for n in forks:
|
|
if n in dist:
|
|
key=(dist[n], pref_index.get(n, 9999))
|
|
if best_key is None or key<best_key:
|
|
best_key=key; best=n
|
|
return best
|
|
|
|
# ---------- 플래너(단일 해답) ----------
|
|
def plan_route_with_fb(nodes, adj, nid2rfid, rfid2nid, current_rfid, prev_rfid, last_motor, goal_rfid,
|
|
preferred_tp_rfids=('004','039','040','038','005')):
|
|
# 입력 RF 정규화
|
|
cur_key = norm_rfid_token(current_rfid)
|
|
prv_key = norm_rfid_token(prev_rfid)
|
|
goal_key= norm_rfid_token(goal_rfid)
|
|
|
|
# 노드 찾기(별칭 허용)
|
|
def resolve(rf):
|
|
if rf in rfid2nid: return rfid2nid[rf]
|
|
# 별칭 생성 후 재시도
|
|
for alias in make_rfid_aliases(rf):
|
|
if alias in rfid2nid: return rfid2nid[alias]
|
|
return None
|
|
|
|
cur = resolve(cur_key)
|
|
prv = resolve(prv_key)
|
|
goal = resolve(goal_key)
|
|
if cur is None: raise SystemExit(f"[오류] 미등록 RFID: {current_rfid}")
|
|
if prv is None: raise SystemExit(f"[오류] 미등록 RFID: {prev_rfid}")
|
|
if goal is None: raise SystemExit(f"[오류] 미등록 RFID: {goal_rfid}")
|
|
|
|
sp = shortest_path(adj, cur, goal)
|
|
if not sp: raise SystemExit("[오류] 최단경로가 존재하지 않습니다.")
|
|
|
|
final_motor = desired_final_motor(nodes, goal)
|
|
last_motor = 'F' if str(last_motor).lower().startswith('f') else 'B'
|
|
annotated=[]
|
|
|
|
def push(a,b,m):
|
|
annotated.append((nid2rfid[a], m, nid2rfid[b]))
|
|
|
|
# 모터 일치 -> 바로 최단경로
|
|
if last_motor == final_motor:
|
|
for i in range(len(sp)-1):
|
|
a,b=sp[i], sp[i+1]
|
|
push(a,b,final_motor)
|
|
rpath=[nid2rfid[n] for n in sp]
|
|
return rpath, annotated
|
|
|
|
# 모터 불일치 -> TP 사용 + 포크 루프
|
|
tp = choose_turning_point(nodes, adj, nid2rfid, rfid2nid, goal, cur, preferred_tp_rfids)
|
|
if tp is None:
|
|
# 비권장 fallback: 마지막 홉만 final_motor
|
|
for i in range(len(sp)-1):
|
|
a,b=sp[i], sp[i+1]
|
|
motor = final_motor if i==len(sp)-2 else last_motor
|
|
push(a,b,motor)
|
|
rpath=[nid2rfid[n] for n in sp]
|
|
return rpath, annotated
|
|
|
|
# A) cur -> TP : F
|
|
path_to_tp = shortest_path(adj, cur, tp)
|
|
for i in range(len(path_to_tp)-1):
|
|
a,b=path_to_tp[i], path_to_tp[i+1]
|
|
push(a,b,'F')
|
|
|
|
# B) TP 포크 루프 결정(결정적): exit_to_goal, came_from
|
|
path_back = shortest_path(adj, tp, goal)
|
|
exit_to_goal = path_back[1] if len(path_back)>=2 else None
|
|
came_from = path_to_tp[-2] if len(path_to_tp)>=2 else None
|
|
|
|
loop_branch=None
|
|
if is_fork(adj, tp) and came_from is not None:
|
|
cls = classify_at_fork(nodes, adj, tp, came_from)
|
|
for cand in [cls.get('left'), cls.get('right'), cls.get('straight')]:
|
|
if cand and cand != came_from and cand != exit_to_goal:
|
|
loop_branch = cand; break
|
|
if loop_branch is None:
|
|
for nb in adj[tp]:
|
|
if nb != came_from and nb != exit_to_goal:
|
|
loop_branch=nb; break
|
|
|
|
if loop_branch:
|
|
push(tp, loop_branch, 'F') # 가지로 전진
|
|
push(loop_branch, tp, 'B') # TP로 역진입
|
|
|
|
# C) TP -> goal : final_motor
|
|
path_back = shortest_path(adj, tp, goal)
|
|
for i in range(len(path_back)-1):
|
|
a,b=path_back[i], path_back[i+1]
|
|
push(a,b, final_motor)
|
|
|
|
# RFID 경로 구성 (annotated 기반)
|
|
rpath=[annotated[0][0]] if annotated else [nid2rfid[cur]]
|
|
for (_,_,to_rfid) in annotated: rpath.append(to_rfid)
|
|
return rpath, annotated
|
|
|
|
def format_annotated(annotated):
|
|
return " ".join([f"{a} -({m})-> {b}" for (a,m,b) in annotated])
|
|
|
|
# ---------- CLI ----------
|
|
def main():
|
|
ap = argparse.ArgumentParser(description="AGV RFID 경로 계획 + F/B 주석 (단일 해답)")
|
|
ap.add_argument("--map", default="MapData.json", help="맵 데이터 파일 (기본: MapData.json)")
|
|
ap.add_argument("--current", required=True, help="현재 RFID (예: 007)")
|
|
ap.add_argument("--prev", required=True, help="직전 RFID (예: 006)")
|
|
ap.add_argument("--last", required=True, help="마지막 모터(F/B 또는 forward/backward)")
|
|
ap.add_argument("--goal", required=True, help="목표 RFID (예: 040)")
|
|
ap.add_argument("--tp-order", default="004,039,040,038,005", help="TP 우선순위 RFID(쉼표구분)")
|
|
args = ap.parse_args()
|
|
|
|
# 파일 읽기: utf-8-sig (BOM 안전)
|
|
map_path = Path(args.map).resolve()
|
|
try:
|
|
text = map_path.read_text(encoding="utf-8-sig", errors="ignore")
|
|
except Exception as e:
|
|
print(f"[오류] 맵 파일을 읽을 수 없습니다: {map_path}\n{e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
nodes, adj, nid2rfid, rfid2nid = parse_map_json(text)
|
|
|
|
if getattr(args, "debug", False):
|
|
rfids = sorted({*rfid2nid.keys()})
|
|
# 숫자형 RFID만 보기 좋게 필터(3자리 패드)
|
|
rfids_num = sorted({norm_rfid_token(r) for r in rfids if re.fullmatch(r'\d+', re.sub(r'\D','','r'))})
|
|
print(">> DEBUG")
|
|
print(f" Map path : {map_path}")
|
|
print(f" RFID count : {len({nid2rfid[n] for n in nid2rfid})}")
|
|
print(f" Sample RFIDs (정규화) :", ", ".join(sorted({nid2rfid[n] for n in nid2rfid})[:20]))
|
|
print()
|
|
|
|
last_motor = 'F' if str(args.last).lower().startswith('f') else 'B'
|
|
tp_pref = tuple([x.strip() for x in args.tp_order.split(",") if x.strip()])
|
|
|
|
rpath, annotated = plan_route_with_fb(
|
|
nodes, adj, nid2rfid, rfid2nid,
|
|
args.current, args.prev, last_motor, args.goal,
|
|
preferred_tp_rfids=tp_pref
|
|
)
|
|
|
|
print("\n=== 결과 ===")
|
|
print("RFID Path :", " → ".join(rpath))
|
|
print("F/B Path :", format_annotated(annotated))
|
|
|
|
if __name__ == "__main__":
|
|
main()
|