代码拉取完成,页面将自动刷新
'''
Copyright (c) linlin152@foxmail.com. All rights reserved.
License: Apache 2.
'''
import sys
import time
import json
import os
import pychrome
browser = pychrome.Browser(url="http://127.0.0.1:9222")
def get_id(node):
attrs = get_attrs(node, t='attributes')
if 'action' not in attrs:
print(attrs)
return None
index = attrs.index('action')
if index < 0:
return None
url = attrs[index+1]
if 'follow' not in url:
return None
return url.split('target=', 1)[-1]
def get_attrs(node, t):
attrs = []
if 'children' in node:
for child in node['children']:
attrs.extend(get_attrs(child, t))
attr = node.get(t)
if not attr:
return attrs
if isinstance(attr, list):
attrs.extend(attr)
elif isinstance(attr, str):
attrs.append(attr)
return attrs
def is_wanted(node, locations):
values = get_attrs(node, 'nodeValue')
for loc in locations:
for value in values:
if loc.lower() in value.lower():
return True
return False
def get_by_class(root, klass):
if isinstance(root, dict):
attrs = root.get('attributes')
if attrs and 'class' in attrs and klass in attrs:
return [root]
else:
return []
nodes = []
if 'children' not in root:
return []
for child in root['children']:
ns = get_by_class(child, klass)
nodes.extend(ns)
return nodes
def get_followers(root):
klass = 'd-table table-fixed col-12 width-full py-4 border-bottom color-border-muted'
users = []
fs = get_by_class(root, klass)
for f in fs:
if is_wanted(f, locations=['shenzhen', 'chengdu']):
user = get_id(f)
if not user:
continue
email = get_email(user)
time.sleep(5)
if not email:
continue
users.append({'id': user, 'email': email})
return users
def get_all_followers(user, total, timeout=5, offset=1):
res = []
for page in range(offset, total//50+1):
print("Page %s" % page)
url = "https://github.com/{}?page={}&tab=followers".format(user, page)
doc = get_doc(url)
if not doc:
continue
tmp = get_followers(doc['root'])
if tmp:
print("found: %s" % tmp)
res.extend(tmp)
print("Page %s done, Sleep 20s" % page)
time.sleep(20)
return res
def get_doc(url, timeout=3):
tab = browser.new_tab()
tab.start()
try:
res = tab.Page.navigate(url=url, _timeout=5)
except Exception as ex:
print("Found exception")
print(ex)
tab.stop()
browser.close_tab(tab)
return None
tab.wait(timeout)
doc = tab.DOM.getDocument(depth=-1, pierce=True)
tab.stop()
browser.close_tab(tab)
return doc
def get_email(user):
url = 'https://github.com/{}'.format(user)
doc = get_doc(url)
if not doc:
return None
email = get_by_class(doc['root'], 'u-email Link--primary ')
if not email:
return None
ret = get_attrs(email[0], 'attributes')
return ret[-1].split('mailto:')[-1]
def add_newly(db, new, user):
dict_db = {}
for item in db:
dict_db[item['id']] = item
added = []
t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
for r in new:
if r['id'] in dict_db:
fs = dict_db[r['id']]['following']
if user not in fs:
fs.append(user)
continue
r['time'] = t
r['following'] = [user]
db.append(r)
added.append(r)
return added
def update2disk(res, user, filename="db.json"):
db = []
if os.path.exists(filename):
with open(filename, 'r') as f:
db = json.load(f)
added = add_newly(db, res, user)
print("Found %d newly added." % len(added))
with open(filename, 'w') as f:
json.dump(db, f, indent=2)
if __name__ == '__main__':
user = sys.argv[1]
count = int(sys.argv[2])
offset = 1
if len(sys.argv) == 4:
offset = int(sys.argv[3])
res = get_all_followers(user, count, offset=offset)
update2disk(res, user)
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。