优化处理非常大的文件

我的任务相对简单:对于输入文件中的每一行,测试该行是否满足给定的条件集,如果满足,则将该行的特定列写入新文件。我已经编写了一个python脚本来执行此操作,但我想要一些帮助1)提高速度,2)在列名称方面工作的最佳方式(因为列号可能因文件而异),以及3 )指定过滤条件和所需输出列的最佳方法。 1)我使用的文件包含天文图像的测光。每个文件大约1e6行,150列浮点数,通常超过1GB。我有一个旧的AWK脚本,将在大约1分钟内处理这样的文件;我的python脚本需要5到7分钟。我经常需要调整过滤条件并重新运行几次,直到输出文件是我想要的,所以速度绝对是可取的。我发现for循环很快;这就是我在循环中做的事情,它减慢了它的速度。使用itemgetter来挑选我想要的列比将整行读入内存有了很大的改进,但我不确定我能做些什么来进一步提高速度。这可能和AWK一样快吗? 2)我想在列名而不是列号方面工作,因为特定数量的列号(光子数,背景,信噪比等)可以在文件之间改变。在我的AWK脚本中,我总是需要在指定条件和输出列的位置检查列号是否正确,即使过滤和输出适用于相同的数量。我在python中的解决方案是创建一个字典,为每个数量分配一个列号。当文件具有不同的列时,我只需要指定一个新的字典。也许有更好的方法来做到这一点? 3)理想情况下,我只需要指定输入和输出文件的名称,过滤条件和要输出的所需列,它们可以在我的脚本顶部找到,所以我不需要去搜索代码只是为了调整一些东西。我的主要问题是未定义的变量。例如,典型的条件是'SNR> 4',但是在从测光文件开始读取行之前,实际上没有为'SNR'(信噪比)分配值。我的解决方案是使用字符串和eval / exec的组合。再说一次,也许有更好的方法? 我从未接受过计算机科学方面的培训(我是天文学的研究生) - 我通常只是将一些东西放在一起并调试直到它起作用。然而,就我的三点而言,优化对我的研究来说变得非常重要。我为冗长的帖子道歉,但我觉得细节会有所帮助。除了清理/编码风格之外,您对我的任何建议都将非常感激。 非常感谢, 可靠的人
#! /usr/bin/env python2.6

from operator import itemgetter


infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

# names must belong to dicitonary
conditions = 'OBJ <= 2 and SNR1 > 4 and SNR2 > 4 and FLAG1 < 8 and FLAG2 < 8 and (SHARP1 + SHARP2)**2 < 0.075 and (CROWD1 + CROWD2) < 0.1'

input = 'OBJ, SNR1, SNR2, FLAG1, FLAG2, SHARP1, SHARP2, CROWD1, CROWD2'
    # should contain all quantities used in conditions

output = 'X, Y, OBJ, COUNTS1, BG1, ACS1, ERR1, CHI1, SNR1, SHARP1, ROUND1, CROWD1, FLAG1, COUNTS2, BG2, ACS2, ERR2, CHI2, SNR2, SHARP2, ROUND2, CROWD2, FLAG2'

# dictionary of col. numbers for the more important qunatities
columns = dict(EXT=0, CHIP=1, X=2, Y=3, CHI_GL=4, SNR_GL=5, SHARP_GL=6, ROUND_GL=7, MAJAX_GL=8, CROWD_GL=9, OBJ=10, COUNTS1=11, BG1=12, ACS1=13, STD1=14, ERR1=15, CHI1=16, SNR1=17, SHARP1=18, ROUND1=19, CROWD1=20, FWHM1=21, ELLIP1=22, PSFA1=23, PSFB1=24, PSFC1=25, FLAG1=26, COUNTS2=27, BG2=28, ACS2=29, STD2=30, ERR2=31, CHI2=32, SNR2=33, SHARP2=34, ROUND2=35, CROWD2=36, FWHM2=37, ELLIP2=38, PSFA2=39, PSFB2=40, PSFC2=41, FLAG2=42)



f = open(infile)
g = open(outfile, 'w')


# make string that extracts values for testing
input_items = []
for i in input.replace(',', ' ').split():
    input_items.append(columns[i])
input_items = ', '.join(str(i) for i in input_items)

var_assign = '%s = [eval(i) for i in itemgetter(%s)(line.split())]' % (input, input_items) 


# make string that specifies values for writing
output_items = []
for i in output.replace(',', ' ').split():
    output_items.append(columns[i])
output_items = ', '.join(str(i) for i in output_items)

output_values = 'itemgetter(%s)(line.split())' % output_items


# make string that specifies format for writing
string_format = []
for i in output.replace(',', ' ').split():
    string_format.append('%s')
string_format = ' '.join(string_format)+'n'


# main loop
for line in f:
   exec(var_assign)
   if eval(conditions):
      g.write(string_format % tuple(eval(output_values)))
f.close()
g.close()
    
已邀请:
我认为你没有提到它,但看起来你的数据是在csv中。您可能会因使用csv.DictReader而获得很多。您可以一次迭代文件1行(避免将整个内容加载到内存中)并按名称引用列。 你还应该看一下cProfile,Python的分析器,如果你还没有。它会告诉您程序的哪些位执行时间最长。     
我在这里迈出的第一步,就是摆脱
exec()
eval()
的召唤。每次eval一个字符串时,都必须编译,然后执行,在文件的每一行添加函数调用的开销。更不用说,
eval
往往会导致代码混乱,难以调试,并且通常应该避免。 您可以通过将逻辑放入一个易于理解的小功能中来开始重构。例如,你可以用一个函数替换
eval(conditions)
,例如:
def conditions(d):
    return (d[OBJ] <= 2 and
            d[SNRI] > 4 and
            d[SNR2] > 4 and
            d[FLAG1] < 8 and ...
提示:如果你的一些条件有更高的失败概率,先将它们放在第一位,python将跳过其余条件的评估。 我会删除列名字典,只需在文件顶部设置一堆变量,然后按
line[COLNAME]
引用列。这可以帮助您简化条件函数之类的某些部分,并且可以按名称引用列,而无需分配每个变量。     
这就是我将如何做这样的事情...... 这可以在约35秒内运行,而在我的机器上运行原始脚本约需3分钟。可以添加一些更优化(例如,我们只需要将一些列转换为浮点数),但这只会缩短运行时间的几秒钟。 你可以在这里轻松使用
csv.DictReader
,正如几个人所建议的那样。我正在避免它,因为你必须定义一个自定义方言,并且只有几个额外的行来做同样的事情而没有它。 (各种
csv
模块类还检查更复杂的行为(例如引用的字符串等),在这种特殊情况下你不需要担心。它们在很多情况下非常非常方便,但它们有点过度杀伤力在这种情况下。) 请注意,当您调用脚本而不是硬编码时,您也可以轻松地将infile和outfile名称添加为参数(即
infile = sys.argv[0]
等)。这也可以让你轻松地输入或输出数据......(你可以检查ѭ​​10ѭ的长度,然后将
infile
outfile
设置为
sys.stdin
和/或
sys.stdout
def main():
    infile = 'ugc4305_1.phot'
    outfile = 'ugc4305_1_filt.phot'
    process_data(infile, outfile)

def filter_conditions(row):
    for key, value in row.iteritems():
        row[key] = float(value)

    cond = (row['OBJ'] <= 2 and row['SNR1'] > 4 
       and row['SNR2'] > 4 and row['FLAG1'] < 8 
       and row['FLAG2'] < 8 
       and (row['SHARP1'] + row['SHARP2'])**2 < 0.075 
       and (row['CROWD1'] + row['CROWD2']) < 0.1
       )
    return cond

def format_output(row):
    output_columns = ('X', 'Y', 'OBJ', 'COUNTS1', 'BG1', 'ACS1', 'ERR1', 'CHI1', 
                     'SNR1', 'SHARP1', 'ROUND1', 'CROWD1', 'FLAG1', 'COUNTS2', 
                     'BG2', 'ACS2', 'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 
                     'CROWD2', 'FLAG2')
    delimiter = 't'
    return delimiter.join((row[name] for name in output_columns))

def process_data(infilename, outfilename):
    column_names = ('EXT', 'CHIP', 'X', 'Y', 'CHI_GL', 'SNR_GL', 'SHARP_GL', 
                    'ROUND_GL', 'MAJAX_GL', 'CROWD_GL', 'OBJ', 'COUNTS1', 
                    'BG1', 'ACS1', 'STD1', 'ERR1', 'CHI1', 'SNR1', 'SHARP1', 
                    'ROUND1', 'CROWD1', 'FWHM1', 'ELLIP1', 'PSFA1', 'PSFB1', 
                    'PSFC1', 'FLAG1', 'COUNTS2', 'BG2', 'ACS2', 'STD2', 
                    'ERR2', 'CHI2', 'SNR2', 'SHARP2', 'ROUND2', 'CROWD2', 
                    'FWHM2', 'ELLIP2', 'PSFA2', 'PSFB2', 'PSFC2', 'FLAG2')

    with open(infilename) as infile:
        with open(outfilename, 'w') as outfile:
            for line in infile:
                line = line.strip().split()
                row = dict(zip(column_names, line))
                if filter_conditions(row.copy()):
                    outfile.write(format_output(row) + 'n')

if __name__ == '__main__':
    main()
    
就像nmichaels所说,你可以使用
csv.DictReader
fieldnames
dialect
参数来读取这个文件。然后,对于每一行,您将拥有一本字典。使用字典,您不必使用
eval
,并且可以使用类似的语句
if data_item['OBJ'] <= 2 and data_item['SNR1']:
     g.write(data_item['X'], data_item['Y'], data_item['OBJ'])
你现在这样做的方式是缓慢而复杂的,因为所有的
eval
s。没有必要那么复杂。     
如果分析显示在实际读取和解析文件上花费了大量时间,并且您将多次处理相同的原始文件,则可以尝试创建针对使用Python进行读取而优化的中间文件格式。 尝试一件事就是读取文件一次,解析并用pickle / cPickle输出结果。然后在过滤器脚本中使用pickle / cpickle读取中间文件。 在不知道python足够好,以告诉这是否会比读取每一行并拆分它们更快。 (在c#中我会使用二进制序列化器,但我不知道它是否在python中可用)。 如果磁盘IO是瓶颈,您也可以尝试压缩输入文件并使用gzip模块读取它们。     
你有没有尝试过熊猫? 我相信OBJ,SNR1,...是列名,我希望你在所有行上应用相同的条件。 如果是这种情况,我建议你和熊猫一起去。 你的代码片段会是这样的......
import pandas as pd

infile = 'ugc4305_1.phot'
outfile = 'ugc4305_1_filt.phot'

df = pd.read_csv(infile)

condition = (df['OBJ'] <= 2) & (df['SRN1'] > 4) & (df['SRN2'] > 4) & (df['FLAG1'] < 8) & (df['FLAG2'] < 8) & ((df['SHARP1'] + df['SHARP2'])**2 < 0.075) & ((df['CROWD1'] + df['CROWD2']) < 0.1)
newDf = df[condition]

columnNames = ['col1', 'col2', ...] # column names you want in result

newDf = df[columnNames]

newDf.to_csv(outfile)
    

要回复问题请先登录注册