ニコニコ関連スクリプトまとめ
コメント一覧取得
コメント増量の全表示機能をpythonで書き直しただけ
ログイン状態でないと過去ログが取得できないのでuser_sessionを用意してください
ソースコード(クリックで展開)
import requests, json, html, time, re, os
from bs4 import BeautifulSoup
from datetime import datetime, timezone
from typing import Optional, Dict, Any
def extract_video_id(url):
match = re.search(r"(sm\d+)", url)
if match:
return match.group(1)
raise ValueError()
def get_server_response(session, video_url):
r = session.get(video_url)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
meta = soup.find("meta", attrs={"name": "server-response"})
if meta is None or "content" not in meta.attrs:
raise RuntimeError("server-response meta not found")
raw_content = meta["content"]
unescaped = html.unescape(raw_content)
data = json.loads(unescaped)
return data
def gen_payload(sv_rsp, fork=None, when=None):
additionals = {}
additionals["res_from"] = -1000
if when:
additionals["when"] = when
params = sv_rsp["data"]["response"]["comment"]["nvComment"]["params"]
if fork:
params["targets"] = [t for t in params["targets"] if t["fork"] == fork]
return {
"params": params,
"threadKey": sv_rsp["data"]["response"]["comment"]["nvComment"]["threadKey"],
"additionals": additionals
}
def get_comments(session, payload):
apiurl = "https://public.nvcomment.nicovideo.jp/v1/threads"
r = session.post(apiurl, json=payload)
print(r.json())
r.raise_for_status()
return r.json()
def get_cursor(comments):
oldest = None
for thread in comments["data"]["threads"]:
for c in thread.get("comments", []):
if c.get("source") != "trunk":
continue
dt = datetime.fromisoformat(c["postedAt"])
if oldest is None or dt < oldest:
oldest = dt
if oldest is None:
return None
return int(oldest.timestamp()) - 1
def load_all_comments(dir_path):
all_comments = []
for name in os.listdir(dir_path):
if not name.endswith(".json") or name == "merged.json":
continue
path = os.path.join(dir_path, name)
with open(path, encoding="utf-8") as f:
data = json.load(f)
for thread in data.get("data", {}).get("threads", []):
all_comments.extend(thread.get("comments", []))
return all_comments
def deduplicate_comments(comments):
unique = {}
for c in comments:
cid = c.get("id")
if cid is None:
continue
unique[cid] = c
return list(unique.values())
def sort_by_posted_at(comments):
return sorted(
comments,
key=lambda c: datetime.fromisoformat(c["postedAt"])
)
def merge_and_sort(dir_path):
comments = load_all_comments(dir_path)
comments = deduplicate_comments(comments)
comments = sort_by_posted_at(comments)
return comments
def main():
url = "https://www.nicovideo.jp/watch/sm45784612"
dir_out = f"./{extract_video_id(url)}"
os.makedirs(dir_out, exist_ok=True)
headers = {
"Accept": "*/*",
"Accept-Language": "ja",
"Origin": "https://www.nicovideo.jp",
"Referer": "https://www.nicovideo.jp/",
"X-Frontend-Id": "6",
"X-Frontend-Version": "0",
"X-Niconico-Language": "ja-jp",
}
session = requests.Session()
session.headers.update(headers)
session.cookies.set(
"user_session",
USER_SESSION,
domain=".nicovideo.jp"
)
sv_rsp = get_server_response(session, url)
when = int(time.time())
while True:
path_out = os.path.join(dir_out, f"{when}.json")
payload = gen_payload(sv_rsp, when=when)
comments = get_comments(session, payload)
with open(path_out, "w", encoding="utf-8") as f:
json.dump(comments, f, indent=2, ensure_ascii=False)
when_next = get_cursor(comments)
if (when_next is not None) and (when_next < when):
when = when_next
else:
break
merged = merge_and_sort(dir_out)
path_out = os.path.join(dir_out, f"merged.json")
with open(path_out, "w", encoding="utf-8") as f:
json.dump(merged, f, indent=2, ensure_ascii=False)
print(f"saved {len(merged)} comments")
if __name__ == "__main__":
main()
コメント投稿
user_sessionを用意してください
ソースコード(クリックで展開)
import requests, json, html, re
from bs4 import BeautifulSoup
def extract_video_id(url):
match = re.search(r"(sm\d+)", url)
if match:
return match.group(1)
raise ValueError()
def get_server_response(session, video_url):
r = session.get(video_url)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
meta = soup.find("meta", attrs={"name": "server-response"})
if meta is None or "content" not in meta.attrs:
raise RuntimeError("server-response meta not found")
raw_content = meta["content"]
unescaped = html.unescape(raw_content)
data = json.loads(unescaped)
return data
def get_postKey(session, sv_rsp):
threadId = sv_rsp["data"]["response"]["comment"]["nvComment"]["params"]["targets"][0]["id"]
apiurl = f"https://nvapi.nicovideo.jp/v1/comment/keys/post?threadId={threadId}"
r = session.get(apiurl)
r.raise_for_status()
return r.json()["data"]["postKey"]
def gen_payload(commentBody, postKey, videoId, vposMs=0, commands=["184"]):
return {
"body": commentBody,
"postKey": postKey,
"videoId": videoId,
"vposMs": vposMs,
"commands": commands
}
def post_comment(session, sv_rsp, payload):
threadId = sv_rsp["data"]["response"]["comment"]["nvComment"]["params"]["targets"][0]["id"]
apiurl = f"https://public.nvcomment.nicovideo.jp/v1/threads/{threadId}/comments"
r = session.post(apiurl, json=payload)
r.raise_for_status()
return r.json()
def main():
url = "https://www.nicovideo.jp/watch/smXXXXXX"
commentBody = "おまどうま!"
headers = {
"Accept": "*/*",
"Accept-Language": "ja",
"Origin": "https://www.nicovideo.jp",
"Referer": "https://www.nicovideo.jp/",
"X-Frontend-Id": "6",
"X-Frontend-Version": "0",
"X-Niconico-Language": "ja-jp",
}
session = requests.Session()
session.headers.update(headers)
session.cookies.set(
"user_session",
USER_SESSION,
domain=".nicovideo.jp"
)
sv_rsp = get_server_response(session, url)
postKey = get_postKey(session, sv_rsp)
payload = gen_payload(commentBody, postKey, extract_video_id(url), 0, ["184"])
rsp = post_comment(session, sv_rsp, payload)
print(json.dumps(rsp, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
投稿動画一覧取得
ソースコード(クリックで展開)
import requests, json
def get_videos(session, userId):
videos = []
pageNum = 1
while True:
apiurl = f"https://nvapi.nicovideo.jp/v3/users/{userId}/videos?sortKey=registeredAt&sortOrder=desc&sensitiveContents=mask&pageSize=100&page={pageNum}"
print(f"[{pageNum}] {apiurl}")
r = session.get(apiurl)
r.raise_for_status()
r_json = r.json()
videos += r_json["data"]["items"]
if len(videos) >= int(r_json["data"]["totalCount"]):
break
pageNum += 1
return videos
def main():
userId = "25799312"
headers = {
"Accept": "*/*",
"Accept-Language": "ja",
"Origin": "https://www.nicovideo.jp",
"Referer": "https://www.nicovideo.jp/",
"X-Frontend-Id": "6",
"X-Frontend-Version": "0",
"X-Niconico-Language": "ja-jp",
}
session = requests.Session()
session.headers.update(headers)
videos = get_videos(session, userId)
with open(f"./user_{userId}.json", "w", encoding="utf-8") as f:
json.dump(videos, f, indent=2, ensure_ascii=False)
print(f"saved {len(videos)} videos")
if __name__ == "__main__":
main()
動画IDのリストが欲しい場合
ソースコード(クリックで展開)
import json
def main():
userId = "25799312"
videos = []
videoIds = []
with open(f"./user_{userId}.json", "r", encoding="utf-8") as f:
videos = json.load(f)
videoIds = [video["essential"]["id"] for video in videos]
print(videoIds)
if __name__ == "__main__":
main()
タグ:
#ニコニコ動画