本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:
import gzip
import os
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
from datetime import datetime
def sort_worker(input,output):
while True:
lines = input.get().splitlines()
element_set = {}
for line in lines:
if line.strip() == 'STOP':
return
try:
element = line.split(' ')[0]
if not element_set.get(element): element_set[element] = ''
except:
pass
sorted_element = sorted(element_set)
#print sorted_element
output.put('\n'.join(sorted_element))
def write_worker(input, pre):
os.system('mkdir %s'%pre)
i = 0
while True:
content = input.get()
if content.strip() == 'STOP':
return
write_sorted_bulk(content, '%s/%s'%(pre, i))
i += 1
def write_sorted_bulk(content, filename):
f = file(filename, 'w')
f.write(content)
f.close()
def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4):
t = datetime.now()
pre, ext = os.path.splitext(filename)
if ext == '.gz':
file_file = gzip.open(filename, 'rb')
else:
file_file = open(filename)
bulk_queue = Queue(10)
sorted_queue = Queue(10)
NUM_SORT = num_sort
sort_worker_pool = []
for i in range(NUM_SORT):
sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) )
sort_worker_pool[i].start()
NUM_WRITE = 1
write_worker_pool = []
for i in range(NUM_WRITE):
write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) )
write_worker_pool[i].start()
buf = file_file.read(buf_size)
sorted_count = 0
while len(buf):
end_line = buf.rfind('\n')
#print buf[:end_line+1]
bulk_queue.put(buf[:end_line+1])
sorted_count += 1
if end_line != -1:
buf = buf[end_line+1:] + file_file.read(buf_size)
else:
buf = file_file.read(buf_size)
for i in range(NUM_SORT):
bulk_queue.put('STOP')
for i in range(NUM_SORT):
sort_worker_pool[i].join()
for i in range(NUM_WRITE):
sorted_queue.put('STOP')
for i in range(NUM_WRITE):
write_worker_pool[i].join()
print 'elasped ', datetime.now() - t
return sorted_count
from heapq import heappush, heappop
from datetime import datetime
from multiprocessing import Process, Queue, Pipe, current_process, freeze_support
import os
class file_heap:
def __init__(self, dir, idx = 0, count = 1):
files = os.listdir(dir)
self.heap = []
self.files = {}
self.bulks = {}
self.pre_element = None
for i in range(len(files)):
file = files[i]
if hash(file) % count != idx: continue
input = open(os.path.join(dir, file))
self.files[i] = input
self.bulks[i] = ''
heappush(self.heap, (self.get_next_element_buffered(i), i))
def get_next_element_buffered(self, i):
if len(self.bulks[i]) < 256:
if self.files[i] is not None:
buf = self.files[i].read(65536)
if buf:
self.bulks[i] += buf
else:
self.files[i].close()
self.files[i] = None
end_line = self.bulks[i].find('\n')
if end_line == -1:
end_line = len(self.bulks[i])
element = self.bulks[i][:end_line]
self.bulks[i] = self.bulks[i][end_line+1:]
return element
def poppush_uniq(self):
while True:
element = self.poppush()
if element is None:
return None
if element != self.pre_element:
self.pre_element = element
return element
def poppush(self):
try:
element, index = heappop(self.heap)
except IndexError:
return None
new_element = self.get_next_element_buffered(index)
if new_element:
heappush(self.heap, (new_element, index))
return element
def heappoppush(dir, queue, idx = 0, count = 1):
heap = file_heap(dir, idx, count)
while True:
d = heap.poppush_uniq()
queue.put(d)
if d is None: return
def heappoppush2(dir, queue, count = 1):
heap = []
procs = []
queues = []
pre_element = None
for i in range(count):
q = Queue(1024)
q_buf = queue_buffer(q)
queues.append(q_buf)
p = Process(target=heappoppush, args=(dir, q_buf, i, count))
procs.append(p)
p.start()
queues = tuple(queues)
for i in range(count):
heappush(heap, (queues[i].get(), i))
while True:
try:
d, i= heappop(heap)
except IndexError:
queue.put(None)
for p in procs:
p.join()
return
else:
if d is not None:
heappush(heap,(queues[i].get(), i))
if d != pre_element:
pre_element = d
queue.put(d)
def merge_file(dir):
heap = file_heap( dir )
os.system('rm -f '+dir+'.merge')
fmerge = open(dir+'.merge', 'a')
element = heap.poppush_uniq()
fmerge.write(element+'\n')
while element is not None:
element = heap.poppush_uniq()
fmerge.write(element+'\n')
class queue_buffer:
def __init__(self, queue):
self.q = queue
self.rbuf = []
self.wbuf = []
def get(self):
if len(self.rbuf) == 0:
self.rbuf = self.q.get()
r = self.rbuf[0]
del self.rbuf[0]
return r
def put(self, d):
self.wbuf.append(d)
if d is None or len(self.wbuf) > 1024:
self.q.put(self.wbuf)
self.wbuf = []
def diff_file(file_old, file_new, file_diff, buf = 268435456):
print 'buffer size', buf
from file_split import split_sort_file
os.system('rm -rf '+ os.path.splitext(file_old)[0] )
os.system('rm -rf '+ os.path.splitext(file_new)[0] )
t = datetime.now()
split_sort_file(file_old,5,buf)
split_sort_file(file_new,5,buf)
print 'split elasped ', datetime.now() - t
os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0])
os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0])
os.system('rm -f '+file_diff)
t = datetime.now()
zdiff = open(file_diff, 'a')
old_q = Queue(1024)
new_q = Queue(1024)
old_queue = queue_buffer(old_q)
new_queue = queue_buffer(new_q)
h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3))
h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3))
h1.start(), h2.start()
old = old_queue.get()
new = new_queue.get()
old_count, new_count = 0, 0
while old is not None or new is not None:
if old > new or old is None:
zdiff.write('< '+new+'\n')
new = new_queue.get()
new_count +=1
elif old < new or new is None:
zdiff.write('> '+old+'\n')
old = old_queue.get()
old_count +=1
else:
old = old_queue.get()
new = new_queue.get()
print 'new_count:', new_count
print 'old_count:', old_count
print 'diff elasped ', datetime.now() - t
h1.join(), h2.join()
希望本文所述对大家的Python程序设计有所帮助。
京东创始人刘强东和其妻子章泽天最近成为了互联网舆论关注的焦点。有关他们“移民美国”和在美国购买豪宅的传言在互联网上广泛传播。然而,京东官方通过微博发言人发布的消息澄清了这些传言,称这些言论纯属虚假信息和蓄意捏造。
日前,据博主“@超能数码君老周”爆料,国内三大运营商中国移动、中国电信和中国联通预计将集体采购百万台规模的华为Mate60系列手机。
据报道,荷兰半导体设备公司ASML正看到美国对华遏制政策的负面影响。阿斯麦(ASML)CEO彼得·温宁克在一档电视节目中分享了他对中国大陆问题以及该公司面临的出口管制和保护主义的看法。彼得曾在多个场合表达了他对出口管制以及中荷经济关系的担忧。
今年早些时候,抖音悄然上线了一款名为“青桃”的 App,Slogan 为“看见你的热爱”,根据应用介绍可知,“青桃”是一个属于年轻人的兴趣知识视频平台,由抖音官方出品的中长视频关联版本,整体风格有些类似B站。
日前,威马汽车首席数据官梅松林转发了一份“世界各国地区拥车率排行榜”,同时,他发文表示:中国汽车普及率低于非洲国家尼日利亚,每百户家庭仅17户有车。意大利世界排名第一,每十户中九户有车。
近日,一项新的研究发现,维生素 C 和 E 等抗氧化剂会激活一种机制,刺激癌症肿瘤中新血管的生长,帮助它们生长和扩散。
据媒体援引消息人士报道,苹果公司正在测试使用3D打印技术来生产其智能手表的钢质底盘。消息传出后,3D系统一度大涨超10%,不过截至周三收盘,该股涨幅回落至2%以内。
9月2日,坐拥千万粉丝的网红主播“秀才”账号被封禁,在社交媒体平台上引发热议。平台相关负责人表示,“秀才”账号违反平台相关规定,已封禁。据知情人士透露,秀才近期被举报存在违法行为,这可能是他被封禁的部分原因。据悉,“秀才”年龄39岁,是安徽省亳州市蒙城县人,抖音网红,粉丝数量超1200万。他曾被称为“中老年...
9月3日消息,亚马逊的一些股东,包括持有该公司股票的一家养老基金,日前对亚马逊、其创始人贝索斯和其董事会提起诉讼,指控他们在为 Project Kuiper 卫星星座项目购买发射服务时“违反了信义义务”。
据消息,为推广自家应用,苹果现推出了一个名为“Apps by Apple”的网站,展示了苹果为旗下产品(如 iPhone、iPad、Apple Watch、Mac 和 Apple TV)开发的各种应用程序。
特斯拉本周在美国大幅下调Model S和X售价,引发了该公司一些最坚定支持者的不满。知名特斯拉多头、未来基金(Future Fund)管理合伙人加里·布莱克发帖称,降价是一种“短期麻醉剂”,会让潜在客户等待进一步降价。
据外媒9月2日报道,荷兰半导体设备制造商阿斯麦称,尽管荷兰政府颁布的半导体设备出口管制新规9月正式生效,但该公司已获得在2023年底以前向中国运送受限制芯片制造机器的许可。
近日,根据美国证券交易委员会的文件显示,苹果卫星服务提供商 Globalstar 近期向马斯克旗下的 SpaceX 支付 6400 万美元(约 4.65 亿元人民币)。用于在 2023-2025 年期间,发射卫星,进一步扩展苹果 iPhone 系列的 SOS 卫星服务。
据报道,马斯克旗下社交平台𝕏(推特)日前调整了隐私政策,允许 𝕏 使用用户发布的信息来训练其人工智能(AI)模型。新的隐私政策将于 9 月 29 日生效。新政策规定,𝕏可能会使用所收集到的平台信息和公开可用的信息,来帮助训练 𝕏 的机器学习或人工智能模型。
9月2日,荣耀CEO赵明在采访中谈及华为手机回归时表示,替老同事们高兴,觉得手机行业,由于华为的回归,让竞争充满了更多的可能性和更多的魅力,对行业来说也是件好事。
《自然》30日发表的一篇论文报道了一个名为Swift的人工智能(AI)系统,该系统驾驶无人机的能力可在真实世界中一对一冠军赛里战胜人类对手。
近日,非营利组织纽约真菌学会(NYMS)发出警告,表示亚马逊为代表的电商平台上,充斥着各种AI生成的蘑菇觅食科普书籍,其中存在诸多错误。
社交媒体平台𝕏(原推特)新隐私政策提到:“在您同意的情况下,我们可能出于安全、安保和身份识别目的收集和使用您的生物识别信息。”
2023年德国柏林消费电子展上,各大企业都带来了最新的理念和产品,而高端化、本土化的中国产品正在不断吸引欧洲等国际市场的目光。
罗永浩日前在直播中吐槽苹果即将推出的 iPhone 新品,具体内容为:“以我对我‘子公司’的了解,我认为 iPhone 15 跟 iPhone 14 不会有什么区别的,除了序(列)号变了,这个‘不要脸’的东西,这个‘臭厨子’。