代码拉取完成,页面将自动刷新
"""
爬取 Instagram 一个博主所有图片视频
用法: 在代码中加上自己的cookie, 修改图片保存路径, 在命令行运行 python instagram.py user_name # 这里的user_name写上要爬的博主账号名称即可
"""
import os
import re
import sys
import json
import time
import random
import requests
from hashlib import md5
from pyquery import PyQuery as pq
url_base = 'https://www.instagram.com/'
uri = 'https://www.instagram.com/graphql/query/?query_hash=a5164aed103f24b03e7b7747a2d94e3c&variables=%7B%22id%22%3A%22{user_id}%22%2C%22first%22%3A12%2C%22after%22%3A%22{cursor}%22%7D'
headers = {
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36',
'cookie': '这里加上自己的cookie'
}
def get_html(url):
try:
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response.text
else:
print('请求网页源代码错误, 错误状态码:', response.status_code)
except Exception as e:
print(e)
return None
def get_json(url):
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.json()
else:
print('请求网页json错误, 错误状态码:', response.status_code)
except Exception as e:
print(e)
time.sleep(60 + float(random.randint(1, 4000))/100)
return get_json(url)
def get_content(url):
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print('请求照片二进制流错误, 错误状态码:', response.status_code)
except Exception as e:
print(e)
return None
def get_urls(html):
urls = []
user_id = re.findall('"profilePage_([0-9]+)"', html, re.S)[0]
print('user_id:' + user_id)
doc = pq(html)
items = doc('script[type="text/javascript"]').items()
for item in items:
if item.text().strip().startswith('window._sharedData'):
js_data = json.loads(item.text()[21:-1], encoding='utf-8')
edges = js_data["entry_data"]["ProfilePage"][0]["graphql"]["user"]["edge_owner_to_timeline_media"]["edges"]
page_info = js_data["entry_data"]["ProfilePage"][0]["graphql"]["user"]["edge_owner_to_timeline_media"]['page_info']
cursor = page_info['end_cursor']
flag = page_info['has_next_page']
for edge in edges:
if edge['node']['display_url']:
display_url = edge['node']['display_url']
print(display_url)
urls.append(display_url)
print(cursor, flag)
while flag:
url = uri.format(user_id=user_id, cursor=cursor)
js_data = get_json(url)
infos = js_data['data']['user']['edge_owner_to_timeline_media']['edges']
cursor = js_data['data']['user']['edge_owner_to_timeline_media']['page_info']['end_cursor']
flag = js_data['data']['user']['edge_owner_to_timeline_media']['page_info']['has_next_page']
for info in infos:
if info['node']['is_video']:
video_url = info['node']['video_url']
if video_url:
print(video_url)
urls.append(video_url)
else:
if info['node']['display_url']:
display_url = info['node']['display_url']
print(display_url)
urls.append(display_url)
print(cursor, flag)
# time.sleep(4 + float(random.randint(1, 800))/200) # if count > 2000, turn on
return urls
def main(user):
url = url_base + user + '/'
html = get_html(url)
urls = get_urls(html)
dirpath = r'.\{0}'.format(user)
if not os.path.exists(dirpath):
os.mkdir(dirpath)
for i in range(len(urls)):
print('\n正在下载第{0}张: '.format(i) + urls[i], ' 还剩{0}张'.format(len(urls)-i-1))
try:
content = get_content(urls[i])
endw = 'mp4' if r'mp4?_nc_ht=scontent' in urls[i] else 'jpg'
file_path = r'.\{0}\{1}.{2}'.format(user, md5(content).hexdigest(), endw)
if not os.path.exists(file_path):
with open(file_path, 'wb') as f:
print('第{0}张下载完成: '.format(i) + urls[i])
f.write(content)
f.close()
else:
print('第{0}张照片已下载'.format(i))
except Exception as e:
print(e)
print('这张图片or视频下载失败')
if __name__ == '__main__':
user_name = sys.argv[1]
start = time.time()
main(user_name)
print('Complete!!!!!!!!!!')
end = time.time()
spend = end - start
hour = spend // 3600
minu = (spend - 3600 * hour) // 60
sec = spend - 3600 * hour - 60 * minu
print(f'一共花费了{hour}小时{minu}分钟{sec}秒')
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。