ACIL FM
Dark
Refresh
Current DIR:
/home/benbot/public_html/backup
/
home
benbot
public_html
backup
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
engine.backup.py
21.73 MB
chmod
View
DL
Edit
Rename
Delete
Edit file: /home/benbot/public_html/backup/engine.backup.py
# ========================================================= # engine.py | Kiwoom OpenAPI+ Auto-Trader (Engine + Strategy Dispatcher) # # 실행 목적: # - 엔진(로그인/브로커/루프 준비) + 전략 디스패처 # - 전략 키에 따라 외부 전략 모듈(예: boxer.py)의 run_strategy 호출 # - 기존 box_range_auto 내장 전략은 호환 유지 # # 변경 이력: # 2025-10-23 # ### [ADDED] 전략 디스패처 추가 (importlib로 모듈 로드, run_strategy 실행) # ### [MODIFIED] main(): strategy 키에 따라 분기 # ### [UNCHANGED] 기존 box_range_auto 내장 로직(run) 유지 (호환) # ========================================================= import sys import os import time import math import argparse import traceback from datetime import datetime, timedelta from typing import Tuple, Optional, Dict, List import pytz import numpy as np import pandas as pd from PyQt5.QtWidgets import QApplication from pykiwoom.kiwoom import Kiwoom # ----------------------------- # 상수 / 기본값 # ----------------------------- KST = pytz.timezone("Asia/Seoul") TF_LIST_MIN = [1, 3, 5, 10, 15, 30] # 스캔 대상 분봉 DEFAULT_BOX_N = 60 # 박스 계산에 사용할 캔들 개수 DEFAULT_FEES_BPS = 10.0 # 왕복 수수료(bps) 예: 10bps = 0.10% DEFAULT_SELL_TAX = 0.20 # 매도 측 거래세(%) 예: 0.20% DEFAULT_UNIT = 1 # 1회 주문 수량 DEFAULT_DAY_STOP = 200000 # 1일 손실 한도(원) DEFAULT_LOG_CSV = os.path.join("logs", "events.csv") DEFAULT_TRADE_WINDOW = "09:00-10:00" # KST 기준 MIN_BOX_WIDTH_TICKS = 2 # 너무 좁은 박스 필터 (틱 2개 이상) SLIPPAGE_TICKS = 1 # 체결 슬리피지 가정 (틱) DELTA_TICKS = 0 # 박스 근처 허용치(틱) # ----------------------------- # 유틸 # ----------------------------- def ensure_dir(path: str): d = os.path.dirname(path) if d and not os.path.exists(d): os.makedirs(d, exist_ok=True) def now_kst() -> datetime: return datetime.now(tz=KST) def within_kst_window(window_str: str) -> Tuple[bool, datetime, datetime]: """KST 윈도우 문자열(예: '09:00-10:00')을 해석하여, 현재가 그 범위 내인지와 시작/끝 시각을 반환""" try: start_s, end_s = window_str.split("-") today = now_kst().date() start_dt = KST.localize(datetime.strptime(f"{today} {start_s}", "%Y-%m-%d %H:%M")) end_dt = KST.localize(datetime.strptime(f"{today} {end_s}", "%Y-%m-%d %H:%M")) cur = now_kst() return (start_dt <= cur <= end_dt), start_dt, end_dt except Exception: return True, now_kst(), now_kst() + timedelta(hours=1) # 파싱 실패 시 항상 true로 가정(안전치 않음) def cp949_safe_print(*args, **kwargs): """윈도우 콘솔 인코딩 문제를 최소화: 실패 시 무조건 안전하게 출력""" s = " ".join(str(a) for a in args) try: print(s, **kwargs) except UnicodeEncodeError: print(s.encode("cp949", errors="ignore").decode("cp949", errors="ignore"), **kwargs) def tick_size(price: float) -> float: """KRX 일반 종목 단일가 호가단위(간략화). 실제는 세분화되어 있으니 필요시 보정.""" if price < 2000: return 1 if price < 5000: return 5 if price < 10000: return 10 if price < 50000: return 50 if price < 100000: return 100 return 500 def round_to_tick(price: float) -> float: ts = tick_size(price) return math.floor(price / ts) * ts def fees_and_tax(buy_price: float, sell_price: float, qty: int, fees_bps: float, sell_tax_pct: float) -> float: """왕복 수수료 + 매도거래세 원화 계산""" gross = (sell_price - buy_price) * qty fees = (buy_price * qty + sell_price * qty) * (fees_bps / 10000.0) sell_tax = (sell_price * qty) * (sell_tax_pct / 100.0) return fees + sell_tax def append_event(log_csv: str, row: Dict): ensure_dir(log_csv) header_needed = not os.path.exists(log_csv) df = pd.DataFrame([row]) if header_needed: df.to_csv(log_csv, index=False, mode="w", encoding="utf-8-sig") else: df.to_csv(log_csv, index=False, mode="a", header=False, encoding="utf-8-sig") # ----------------------------- # Kiwoom 래핑 # ----------------------------- class KiwoomBroker: def __init__(self, kiwoom: Kiwoom, account: str, code: str): self.k = kiwoom self.account = account self.code = code # 종목코드 (예: '454910') # ### [ADDED] 종목명 조회 헬퍼 def get_code_name(self) -> str: try: return self.k.GetMasterCodeName(self.code) except Exception: return "" # --- 시세/데이터 --- def get_minute_candles(self, minute: int, count: int = 200) -> pd.DataFrame: """ opt10080: 주식분봉차트조회요청 - 틱범위: 1/3/5/10/15/30/45/60... - 수정주가구분: 1(수정주가), 0(무수정) """ tr = "opt10080" self.k.SetInputValue("종목코드", self.code) self.k.SetInputValue("틱범위", minute) self.k.SetInputValue("수정주가구분", 1) out = self.k.block_request(tr, output="주식분봉차트조회", next=0) df = pd.DataFrame(out) if "체결시간" in df.columns: df["dt"] = pd.to_datetime(df["체결시간"], format="%Y%m%d%H%M%S") df = df.sort_values("dt").reset_index(drop=True) for col in ["시가", "고가", "저가", "현재가"]: if col in df.columns: df[col] = pd.to_numeric(df[col], errors="coerce").abs() if len(df) > count: df = df.iloc[-count:].copy() return df def get_best_quote(self) -> Tuple[Optional[float], Optional[float]]: """ opt10004: 주식호가요청 반환: (best_ask, best_bid) """ tr = "opt10004" self.k.SetInputValue("종목코드", self.code) out = self.k.block_request(tr, output="주식호가", next=0) df = pd.DataFrame(out) ask = None bid = None if "매도호가1" in df.columns: ask = pd.to_numeric(df["매도호가1"].iloc[0], errors="coerce") if "매수호가1" in df.columns: bid = pd.to_numeric(df["매수호가1"].iloc[0], errors="coerce") return (ask, bid) # --- 주문 --- def send_order(self, name: str, rqname: str, order_type: int, qty: int, price: int, hoga: str) -> int: """ 주문 전송 - order_type: 1(신규매수), 2(신규매도), 3(매수정정), 4(매도정정), 5(매수취소), 6(매도취소) - hoga: "00"(지정가), "03"(시장가) """ screen = "1000" ret = self.k.SendOrder(name, screen, self.account, order_type, self.code, qty, price, hoga, "") return ret # ----------------------------- # 박스권/스코어 계산 (내장 전략용 - 호환 유지) # ----------------------------- def compute_box(df: pd.DataFrame, n: int) -> Optional[Tuple[float, float]]: """최근 n개 캔들 기준 박스 상단(U), 하단(L) 계산""" if df is None or len(df) < n: return None sub = df.iloc[-n:] if not set(["고가", "저가"]).issubset(sub.columns): return None U = float(np.nanmax(sub["고가"].values)) L = float(np.nanmin(sub["저가"].values)) if not math.isfinite(U) or not math.isfinite(L) or U <= L: return None return (L, U) def ticks_between(p1: float, p2: float) -> int: t = tick_size((p1 + p2) / 2.0) return int(round(abs(p2 - p1) / t)) def select_best_timeframe(broker: KiwoomBroker, tfs: List[int], n: int, fees_bps: float, sell_tax_pct: float) -> Optional[Dict]: """ 각 분봉에 대해 박스 폭과 비용/스프레드 고려 "순엣지"를 평가하고 최대값 분봉을 선택 간단화한 순엣지: (U-L) - (spread + 슬리피지 + 비용버퍼), 양수 최대 """ best = None ask, bid = broker.get_best_quote() if ask is None or bid is None: return None spread = max(0.0, float(ask - bid)) mid = (ask + bid) / 2.0 ts = tick_size(mid) slippage_cost = SLIPPAGE_TICKS * ts # 매수/매도 체결 편향 1틱 가정 for tf in tfs: try: df = broker.get_minute_candles(tf, count=max(n * 2, 120)) box = compute_box(df, n) if not box: continue L, U = box width = U - L if ticks_between(L, U) < MIN_BOX_WIDTH_TICKS: continue fee_buf = fees_and_tax(buy_price=ask, sell_price=bid, qty=1, fees_bps=fees_bps, sell_tax_pct=sell_tax_pct) net_edge = width - (spread + slippage_cost + fee_buf) item = { "tf": tf, "L": L, "U": U, "width": width, "spread": spread, "fee_buf": fee_buf, "slippage": slippage_cost, "net_edge": net_edge, } if (best is None) or (item["net_edge"] > best["net_edge"]): best = item except Exception: continue return best # ----------------------------- # 내장 전략 루프 (box_range_auto) - 호환 유지 # ----------------------------- def run(params): # CSV 로그 준비 log_csv = params.log_csv or DEFAULT_LOG_CSV ensure_dir(log_csv) # Qt App app = QApplication.instance() or QApplication(sys.argv) # Login k = Kiwoom() cp949_safe_print("[INFO] Opening Kiwoom OpenAPI+ login window...") k.CommConnect(block=True) if k.GetConnectState() != 1: cp949_safe_print("[ERROR] Login failed.") return 2 user_id = k.GetLoginInfo("USER_ID") user_name = k.GetLoginInfo("USER_NAME") cp949_safe_print(f"[SUCCESS] Login success. USER_ID={user_id}") broker = KiwoomBroker(k, account=params.account, code=params.code) daily_pnl = 0.0 position = 0 avg_price = 0.0 # 거래 윈도우 window_str = params.trade_time_kst in_window, start_dt, end_dt = within_kst_window(window_str) cp949_safe_print(f"[INFO] Trade window (KST) = {window_str} | Now in window? {in_window}") cp949_safe_print(f"[INFO] Window starts at {start_dt} ends at {end_dt}") # 이벤트 로그 헤더가 없을 경우 대비 첫 기록(상태) append_event(log_csv, { "time": now_kst().isoformat(), "event": "start", "tf": "", "L": "", "U": "", "spread": "", "fee": "", "signal": "", "qty": "", "price": "", "pnl": daily_pnl, "pos": position, # ### [ADDED] "code": params.code, "name": broker.get_code_name(), }) try: while True: app.processEvents() # 윈도우 체크 in_window, start_dt, end_dt = within_kst_window(window_str) now = now_kst() if now < start_dt: # 대기 time.sleep(1.0) continue if now >= end_dt: # 거래 종료: 포지션 정리 후 종료 if position != 0: ask, bid = broker.get_best_quote() if position > 0 and bid: qty = min(position, params.unit) ret = broker.send_order("SELL_CLOSE", "rq_sell_close", 2, qty, 0, "03") cp949_safe_print(f"[INFO] Exit SELL market ret={ret}") pnl_est = (bid - avg_price) * qty - fees_and_tax(avg_price, bid, qty, params.fees_bps, params.sell_tax) daily_pnl += pnl_est position -= qty append_event(log_csv, { "time": now_kst().isoformat(), "event": "exit", "tf": "", "L": "", "U": "", "spread": "", "fee": "", "signal": "SELL_CLOSE", "qty": qty, "price": bid, "pnl": daily_pnl, "pos": position }) # 나머지 있으면 루프에서 계속 정리 continue cp949_safe_print("[INFO] Trade window ended. Shutting down.") break # 일손실 한도 체크 if daily_pnl <= -abs(params.risk_day_stop): cp949_safe_print(f"[WARN] Day stop reached ({daily_pnl}). Trading halted.") break # 활성 분봉 선정 best = select_best_timeframe( broker=broker, tfs=TF_LIST_MIN, n=DEFAULT_BOX_N, fees_bps=params.fees_bps, sell_tax_pct=params.sell_tax ) if not best or best["net_edge"] <= 0: # 유리한 박스가 없으면 휴식 time.sleep(1.0) continue tf = best["tf"] L, U = best["L"], best["U"] ask, bid = broker.get_best_quote() if ask is None or bid is None: time.sleep(0.5) continue mid = (ask + bid) / 2.0 ts = tick_size(mid) # 포지션 없을 때: L 근처에서 매수 if position == 0: buy_trigger = L + DELTA_TICKS * ts if mid <= buy_trigger: # 시장가 매수 1주 qty = params.unit ret = broker.send_order("BUY_ENTRY", "rq_buy", 1, qty, 0, "03") cp949_safe_print(f"[INFO] BUY market ret={ret} tf={tf} L={L} U={U} mid={mid}") # 추정 체결가를 ask로 가정 fill = ask cost = fees_and_tax(fill, fill, qty, params.fees_bps, 0.0) # 매수 시 세금 없음 daily_pnl -= cost position += qty avg_price = fill append_event(log_csv, { "time": now_kst().isoformat(), "event": "buy", "tf": tf, "L": L, "U": U, "spread": float(ask - bid), "fee": cost, "signal": "BUY_MKT", "qty": qty, "price": fill, "pnl": daily_pnl, "pos": position }) time.sleep(0.7) continue # 포지션 있을 때: U 근처에서 매도 if position > 0: sell_trigger = U - DELTA_TICKS * ts if mid >= sell_trigger: qty = min(position, params.unit) ret = broker.send_order("SELL_EXIT", "rq_sell", 2, qty, 0, "03") cp949_safe_print(f"[INFO] SELL market ret={ret} tf={tf} L={L} U={U} mid={mid}") # 추정 체결가를 bid로 가정 fill = bid fee = fees_and_tax(avg_price, fill, qty, params.fees_bps, params.sell_tax) pnl = (fill - avg_price) * qty - fee daily_pnl += pnl position -= qty append_event(log_csv, { "time": now_kst().isoformat(), "event": "sell", "tf": tf, "L": L, "U": U, "spread": float(ask - bid), "fee": fee, "signal": "SELL_MKT", "qty": qty, "price": fill, "pnl": daily_pnl, "pos": position }) time.sleep(0.7) continue # 가벼운 슬립 time.sleep(0.5) except KeyboardInterrupt: cp949_safe_print("[INFO] Interrupted by user.") except Exception as e: cp949_safe_print("[ERROR] Exception:", e) traceback.print_exc() finally: # 종료 로그 append_event(log_csv, { "time": now_kst().isoformat(), "event": "end", "tf": "", "L": "", "U": "", "spread": "", "fee": "", "signal": "", "qty": "", "price": "", "pnl": daily_pnl, "pos": position }) # ----------------------------- # 엔트리포인트 / 인자 파싱 # ----------------------------- def parse_args(): p = argparse.ArgumentParser(description="Kiwoom Auto Trader (Engine with Strategy Dispatcher)") p.add_argument("code", type=str, help="종목코드 (예: 454910)") p.add_argument("account", type=str, help="계좌번호 (예: 6341677010)") p.add_argument("strategy", type=str, help="전략키 (예: boxer 또는 box_range_auto)") p.add_argument("--fees_bps", type=float, default=DEFAULT_FEES_BPS, help="왕복 수수료(bps)") p.add_argument("--sell_tax", type=float, default=DEFAULT_SELL_TAX, help="매도 거래세(%)") p.add_argument("--unit", type=int, default=DEFAULT_UNIT, help="매매 수량(주)") p.add_argument("--risk_day_stop", type=float, default=DEFAULT_DAY_STOP, help="1일 손실 한도(원)") p.add_argument("--trade_time_kst", type=str, default=DEFAULT_TRADE_WINDOW, help="KST 거래시간 예: 09:00-10:00") p.add_argument("--log_csv", type=str, default=DEFAULT_LOG_CSV, help="이벤트 로그 CSV 경로") return p.parse_args() # ----------------------------- # 메인 (전략 디스패처) # ----------------------------- def main(): params = parse_args() # ### [ADDED] 전략 디스패처 매핑 strategy_key = (params.strategy or "").lower() STRATEGY_MAP = { # 외부 모듈 전략 "boxer": "boxer", # 호환 유지: 기존 내장 전략 "box_range_auto": "__internal_box_range_auto__", } if strategy_key not in STRATEGY_MAP: cp949_safe_print(f"[ERROR] Unknown strategy key: {params.strategy}. Supported: {list(STRATEGY_MAP.keys())}") sys.exit(1) target = STRATEGY_MAP[strategy_key] if target == "__internal_box_range_auto__": # ### [UNCHANGED PATH] 기존 내장 전략 실행 cp949_safe_print("[INFO] Using internal strategy: box_range_auto") run(params) return # ### [ADDED] 외부 전략 모듈(boxer 등) 경로: 엔진에서 로그인/브로커 준비 후 모듈 run_strategy 호출 cp949_safe_print(f"[INFO] Dispatching to external strategy module: {target}") # Qt App + Login app = QApplication.instance() or QApplication(sys.argv) k = Kiwoom() cp949_safe_print("[INFO] Opening Kiwoom OpenAPI+ login window...") k.CommConnect(block=True) if k.GetConnectState() != 1: cp949_safe_print("[ERROR] Login failed.") sys.exit(2) user_id = k.GetLoginInfo("USER_ID") cp949_safe_print(f"[SUCCESS] Login success. USER_ID={user_id}") broker = KiwoomBroker(k, account=params.account, code=params.code) # ### [ADDED] 외부 전략 실행 전, 로그 CSV를 반드시 초기화/보장하고 code/name을 포함한 init 레코드 기록 log_csv = params.log_csv or DEFAULT_LOG_CSV # ### [ADDED] ensure_dir(log_csv) # ### [ADDED] code_name = broker.get_code_name() # ### [ADDED] append_event(log_csv, { # ### [ADDED] "time": now_kst().isoformat(), "event": "init", "tf": "", "L": "", "U": "", "spread": "", "fee": "", "signal": "", "qty": "", "price": "", "pnl": 0, "pos": 0, "code": params.code, "name": code_name, }) cp949_safe_print(f"[INFO] Initialized log with code={params.code} name={code_name}") # ### [ADDED] # 모듈 import 및 실행 try: import importlib strat_mod = importlib.import_module(target) # StrategyParams가 있으면 구성해서 전달, 없으면 argparse params 그대로 전달 if hasattr(strat_mod, "StrategyParams"): sp = strat_mod.StrategyParams( code=params.code, fees_bps=params.fees_bps, sell_tax=params.sell_tax, unit=params.unit, risk_day_stop=params.risk_day_stop, trade_time_kst=params.trade_time_kst, log_csv=params.log_csv ) strat_mod.run_strategy(broker, sp) else: strat_mod.run_strategy(broker, params) except SystemExit: raise except Exception as e: cp949_safe_print("[ERROR] Strategy dispatch failed:", e) traceback.print_exc() sys.exit(3) if __name__ == "__main__": main()
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply