1.使用Beautiful Soup 爬取金融數據


import requests
import re
import json
def normalize_ticker(raw: str) -> str:
"""
如果使用者只輸入數字 (如 2330、2881),就加上 .TW
否則保留原樣 (可以是 AAPL、TSM、2330.TW...)
"""
t = raw.strip().upper()
if t.isdigit(): # 純數字就視為台灣股票
return f"{t}.TW"
return t
def get_stock_price_api(ticker: str) -> float:
"""
透過 Yahoo Finance Chart API 取得即時股價
"""
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{ticker}"
params = {
"interval": "2m",
"range": "1d",
"includePrePost": "false"
}
headers = {"User-Agent": "Mozilla/5.0"}
resp = requests.get(url, params=params, headers=headers)
resp.raise_for_status()
data = resp.json()
return data["chart"]["result"][0]["meta"]["regularMarketPrice"]
def main():
raw = input("請輸入股票代號 (例如 2330.TW、2881、AAPL): ").strip()
if not raw:
print("未輸入股票代號,程式結束。")
return
ticker = normalize_ticker(raw)
print(f"\n=== 抓取 {ticker} 即時股價 ===")
try:
price1 = get_stock_price_api(ticker)
print(f" {ticker} 股價:{price1:.2f}")
except Exception as e:
print(f" 取得失敗:{e}")
if __name__ == "__main__":
main()
2.將爬取之金融數據並以csv 或json格式存檔,可以的話,用排程方式記錄兩筆



import yfinance as yf
import pandas as pd
import json
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def normalize_ticker(raw: str) -> str:
"""
將純數字自動補上 .TW,其它代號原樣回傳
"""
t = raw.strip().upper()
return f"{t}.TW" if t.isdigit() else t
def fetch_history(ticker: str,
period: str = "1mo",
interval: str = "1d") -> pd.DataFrame:
"""
用 yfinance 拿歷史資料,回傳 pandas.DataFrame
"""
tk = yf.Ticker(ticker)
df = tk.history(period=period, interval=interval, auto_adjust=False)
if df.empty:
raise ValueError(f"No data found for {ticker}")
df = df.reset_index()
df["Date"] = df["Date"].dt.strftime("%Y-%m-%d")
return df
def save_csv(df: pd.DataFrame, filename: str):
df.to_csv(filename, index=False, encoding="utf-8-sig")
print(f"[{datetime.datetime.now()}] 已將資料存成 CSV:{filename}")
def save_json(df: pd.DataFrame, filename: str):
records = df.to_dict(orient="records")
with open(filename, "w", encoding="utf-8") as f:
json.dump(records, f, ensure_ascii=False, indent=4)
print(f"[{datetime.datetime.now()}] 已將資料存成 JSON:{filename}")
def job(ticker: str, fmt: str):
"""
真正在排程時呼叫的函式:抓取資料、加時間戳、存檔
"""
try:
df = fetch_history(ticker, period="1mo", interval="1d")
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{ticker.replace('.', '_')}_{ts}.{fmt}"
if fmt == "json":
save_json(df, filename)
else:
save_csv(df, filename)
except Exception as e:
print(f"[{datetime.datetime.now()}] 下載失敗:{e}")
def main():
raw = input("請輸入股票代號(如 2330、2330.TW、AAPL): ").strip()
if not raw:
print("未輸入任何代號,程式結束。")
return
ticker = normalize_ticker(raw)
fmt = input("請選擇儲存格式 (csv/json) [預設 csv]: ").strip().lower() or "csv"
scheduler = BlockingScheduler(timezone="Asia/Taipei")
# 立刻執行一次
scheduler.add_job(
job,
trigger='date',
run_date=datetime.datetime.now(),
args=[ticker, fmt]
)
# 每天 09:00 執行一次
scheduler.add_job(
job,
trigger='cron',
args=[ticker, fmt],
hour=9, minute=0
)
# 每天 15:30 再執行一次
scheduler.add_job(
job,
trigger='cron',
args=[ticker, fmt],
hour=15, minute=30
)
print(f"→ 已排程:立刻下載一次;之後每日 09:00 與 15:30 自動下載 {ticker} 並存成 {fmt} 檔")
scheduler.start()
if __name__ == "__main__":
main()
3.舉例將csv資料寫入sqlite資料庫並以sql語法設定條件呈現所篩選的數據



import sqlite3
import csv
import os
DB_FILE = "financial_data.db"
CSV_FILE = "2330_TW_20250418_100530.csv"
def import_csv_to_sqlite(csv_path: str, db_path: str):
"""匯入 CSV 到 SQLite(重複執行不會重複插入)。"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS history (
Date TEXT PRIMARY KEY,
Open REAL,
High REAL,
Low REAL,
Close REAL,
Volume INTEGER
)""")
conn.commit()
with open(csv_path, newline='', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
rows = [
(r['Date'],
float(r['Open']),
float(r['High']),
float(r['Low']),
float(r['Close']),
int(r['Volume']))
for r in reader
]
# 使用 INSERT OR IGNORE 避免重複鍵錯誤
cur.executemany(
"INSERT OR IGNORE INTO history VALUES (?,?,?,?,?,?)",
rows
)
conn.commit()
print(f"已匯入 {len(rows)} 筆資料到 {db_path} 的 history 表。")
return conn, cur
def query_with_params(cur):
# 1) 查詢收盤價門檻
thresh = input("\n請輸入要篩選的「收盤價」下限: ").strip()
try:
thresh = float(thresh)
except ValueError:
print("門檻必須是數字,跳過此查詢。")
else:
sql1 = "SELECT Date, Close FROM history WHERE Close > ? ORDER BY Date"
rows1 = cur.execute(sql1, (thresh,)).fetchall()
print(f"\n>>> 收盤價 > {thresh} 的結果共 {len(rows1)} 筆:")
if rows1:
for d, c in rows1:
print(f"{d} 收盤:{c}")
else:
print("(無資料)")
# 2) 查詢日期區間
start = input("\n請輸入查詢起始日期(YYYY-MM-DD),或留空跳過: ").strip()
end = input("請輸入查詢結束日期(YYYY-MM-DD),或留空跳過: ").strip()
if start and end:
sql2 = "SELECT * FROM history WHERE Date BETWEEN ? AND ? ORDER BY Date"
rows2 = cur.execute(sql2, (start, end)).fetchall()
print(f"\n>>> {start} ~ {end} 區間內的資料共 {len(rows2)} 筆:")
if rows2:
for row in rows2:
# row = (Date, Open, High, Low, Close, Volume)
print(row)
else:
print("(無資料)")
else:
print("跳過日期區間查詢。")
# 3) 查詢成交量大於平均量
# 先計算全表平均量
avg_vol = cur.execute("SELECT AVG(Volume) FROM history").fetchone()[0]
sql3 = "SELECT Date, Volume FROM history WHERE Volume > ? ORDER BY Volume DESC"
rows3 = cur.execute(sql3, (avg_vol,)).fetchall()
print(f"\n>>> 成交量 > 全期平均 ({int(avg_vol):,}) 的資料共 {len(rows3)} 筆:")
for d, v in rows3:
print(f"{d} 量:{v:,}")
def main():
if not os.path.isfile(CSV_FILE):
print(f"找不到 CSV:{CSV_FILE},請確認檔案位置。")
return
conn, cur = import_csv_to_sqlite(CSV_FILE, DB_FILE)
query_with_params(cur)
conn.close()
print("\nDone. 已關閉資料庫連線。")
if __name__ == "__main__":
main()
4.範例

import twstock
import pandas as pd
# 導入twstock及pandas模組,pandas模組縮寫為pd
target_stock = '00940' #股票代號變數
stock = twstock.Stock(target_stock) #告訴twstock我們要查詢的股票
target_price = stock.fetch_from(2020, 5) #取用2020/05至今每天的交易資料
name_attribute = [
'Date', 'Capacity', 'Turnover', 'Open', 'High', 'Low', 'Close', 'Change',
'Transcation'
] #幫收集到的資料設定表頭
df = pd.DataFrame(columns=name_attribute, data=target_price)
#將twstock抓到的清單轉成Data Frame格式的資料表
filename = f'./data/{target_stock}_kay.csv'
#指定Data Frame轉存csv檔案的檔名與路徑
df.to_csv(filename)
#將Data Frame轉存為csv檔案