素材牛VIP会员
如何提高大量的字符串比较速度
 he***ba  分类:Java代码  人气:1932  回帖:17  发布于6年前 收藏

补充数据链接:

candidates: https://pan.baidu.com/s/1nvGWbrV
bg_db: https://pan.baidu.com/s/1sllFLAd

每个字符串长度都是23,只要前面20个字符就行,由于数据太大,我只传了五分之一,大神们可以挑战一下,有速度快的可以贴一下代码,让小弟拜读一下,谢谢!

下面是正式的问题:

我现在有两个字符串数组,姑且称为candidates和bg_db,全部都是长度为20的短字符串,并且每个字符串的每个字符只有ATCG四种可能(没错!就是基因组序列啦!):

candidates = [
    'GGGAGCAGGCAAGGACTCTG',
    'GCTCGGGCTTGTCCACAGGA',
    '...',
    # 被你看出来啦,这些其实人类基因的片段
]

bg_db = [
    'CTGCTGACGGGTGACACCCA',
    'AGGAACTGGTGCTTGATGGC',
    '...',
    # 这个更多,有十亿左右
]

我的任务是对candidates的每一个candidate,找到bg_db中所有与其小于等于4个差异的记录,
举个例子来说:

# 上面一条为candidate,即candidates的一个记录
# 中间|代表相同,*代表不相同
# 下面一条代表bg_db的一条记录

A T C G A T C G A T C G A T C G A T C G
| | | | | | | | | | | | | | | | | | | |    # 差异为0
A T C G A T C G A T C G A T C G A T C G

A T C G A T C G A T C G A T C G A T C G
* | | | | | | | | | | | | | | | | | | |    # 差异为1
T T C G A T C G A T C G A T C G A T C G

A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | | | | | | | | | |    # 差异为2
T T C G T T C G A T C G A T C G A T C G

A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | * | | | | | | | |    # 差异为3
T T C G T T C G A T C C A T C G A T C G

A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | * | | | * | | | |    # 差异为4
T T C G T T C G A T C C A T C A A T C G

我的问题是如果快速地找到:每一个candidate在bg_db中与之差异小于等于4的所有记录,
如果采用暴力遍历的话,以Python为例:

def align(candidate, record_from_bg_db):
    mismatches = 0
    for i in range(20):
        if candidate[i] != record_from_bg_db[i]:
            mismatches += 1
            if mismatches >= 4:
                return False
    return True

candidate = 'GGGAGCAGGCAAGGACTCTG'
record_from_bg_db = 'CTGCTGACGGGTGACACCCA'

align(candidate, record_from_bg_db) # 1.24微秒左右

# 总时间:

10000000 * 1000000000 * 1.24 / 1000 / 1000 / 60 / 60 / 24 / 365
# = 393
# 1千万个candidates,10亿条bg_db记录
# 耗时大约393年
# 完全无法忍受啊

我的想法是,bg_db是高度有序的字符串(长度固定,每个字符的可能只有四种),有没有什么算法,可以让candidate快速比较完所有的bg_db,各位大神,求赐教。

讨论这个帖子(17)垃圾回帖将一律封号处理……

Lv5 码农
us***es 职业无 6年前#1

首先,你的时间估算完全不对,这种大规模的数据量处理,好歹跑个几万条,持续十秒以上的时间,才能拿来做乘法算总时间,只算一条的话,这个时间几乎都是初始化进程的开销,而非关键的IO、CPU开销

以下正文

ACTG四种可能性相当于2bit,用一个字符表示一个基因位太过浪费了,一个字符8bit,可以放4个基因位

即使不用任何算法,只是把你的20个基因写成二进制形式,也能节省5倍时间

另外,循环20次,CPU的指令数是20*n条,n估计至少有3,但对于二进制来说,做比较的异或运算直接是cpu指令,指令数是1

Lv5 码农
zh***ao 职业无 6年前#2

算法不是很了解 但是就经验来说 复杂的算法反而耗时更久 不如这种简单粗暴来的迅速

可以考虑下多线程和集群来处理数据

对了 还有汉明距离貌似可以算这个

Lv6 码匠
lo***ou 职业无 6年前#3

坐等高手出现

Lv5 码农
夏***t 移动开发工程师 6年前#4

抱歉,今天看到还有人回复。
仔细看了一下问题,发现我以前以为只是匹配。
所以我提出用ac自动机。

但是题主是为了找到序列的差异。
这就是找两者的编辑距离。
wiki:编辑距离
wiki:来文斯坦距离

以前刷OJ的时候是使用DP(动态规划)来找一个字符串转换成另外一个字符串的最少编辑次数。

for(i:0->len)
    for(j:0->len)
        str1[i]==str2[j] ? cost=0 : cost=1
        dp[i,j]=min(
            dp[i-1, j  ] + 1,     // 刪除
            dp[i  , j-1] + 1,     // 插入
            dp[i-1, j-1] + cost   // 替換
        )

比如 :

str1    "abcdef"
str2    "acddff"

str2 转换为 str1

插入b 算一次
删除d 算一次
修改f 算一次

对于题主的ATCG基因序列来说,是不是只需要找到修改的就行了。
然而像这种
ATCGATCG
TCGATCGA
这样应该怎么算。

如果仅仅是找到修改的话,直接比较 str1[i] 和 str2[i] 就可以了。

for(i:0->len)
if(str1[i]!=str2[i] )++count;

受到@rockford 的启发。
我们可以对 原始数据 进行预处理。

candidates 中的串
GGGAGCAGGCAAGGACTCTG
A5 T2 C4 G9

进行处理之后的额外数据 A5 T2 C4 G9

bg_db 中的串
CTGCTGACGGGTGACACCCA
A4 T3 C7 G6

进行处理之后的额外数据 A4 T3 C7 G6

A5 -> A4 记作 -1
T2 -> T3 记作 +1
C4 -> C7 记作 +3
G9 -> G6 记作 -3

很明显 A 如果修改只能变成 TCG。
同理,我们只需要统计所有的+ 或者所有的 -
就可以知道他们的至少有多少不同之处。
大于4的都可以不进行比较。

通过先对比预处理的额外数据,然后再通过单次的比较算法来 进行比对。
(星期六加班-ing,下班后写一下)

Lv7 码师
雪***狐 职业无 6年前#5

可以尝试用字典树来保存所有的字符串。然后在查询的时候就可以用在字典树上遍历的方法。
在树上遍历的时候,可以维护一个当前节点的序列,这个序列里保存着当前遍历到的节点和对应节点mismatch的数量。
在遍历下一个节点的时候,要把当前序列里所有的节点都尝试向下,并形成新的节点序列。
好处是可以把很多个串的当前位放在一起进行比较,可以节约一些时间。由于每个位置选择不多,mismatch也不大,所有应该不会出现当前节点序列膨胀过大的情况。(这是猜想… 没太认真验证过…)

def match(candidate, root):
  nset = [[],[]]
  currentId = 0
  nset[currentId].append((root, 0))
  for ch in candidate:
    nextId = 1 - currentId
    for item in nset[currentId]:
      node, mismatch = item
      for nx in node.child:
        if nx.key == ch:
          nset[nextId].append((nx, mismatch))
        else:
          if mismatch:
            nset[nextId].append((nx, mismatch - 1))
    nset[currentId].clear()
    currentId = 1 - currentId
  return nset[currentId]

上面的代码就是一个大概的意思。如果用C++写的话会再快很多。
整个过程都可以用集群做分布式计算。

Lv3 码奴
gu***di 软件测试工程师 6年前#6

算法白痴一个, 不过刚好手上有可以用的计算资源, 就按照原来的思路(暴力无脑并行计算流)做一下题主这个例子:

run.py

from multiprocessing import Pool, cpu_count


def do_align(item):
    with open("bg_db.txt") as fh, open("result.txt", "a") as rh:
        db_line = fh.readline().strip()
        while db_line:
            counts = 0
            for i in [(i, j) for (i, j) in zip(db_line, item)]:
                if i[0] != i[1]:
                    counts += 1
                if counts >= 4:
                    break
            if counts < 4:
                rh.write("{}\n".format(db_line))
            db_line = fh.readline().strip()


def main():
    pool = Pool(cpu_count())
    with open("candidates.txt") as fh:
        pool.map(do_align, map(str.strip, fh))
    pool.close()
    pool.join()


if __name__ == "__main__":
    main()

简单先生成点数据

import random
import string


def id_generator(size=8, chars=string.ascii_letters + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


with open("candidates.txt", "w") as fh:
    for i in range(10000):
        fh.write("{}\n".format(id_generator(20, "ATCG")))

with open("bg_db.txt", "w") as fh:
    for i in range(1000000):
        fh.write("{}\n".format(id_generator(20, "ATCG")))

嗯, 造了10000行的candidates.txt1000000行的bg_db.txt
运行看下时间:

$time python run.py

real    15m45.445s
user    1362m41.712s
sys     1m12.099s

题主实际的数据是千万行的candidates.txt和十亿行的bg_db.txt, 简单估算下时间
16*1000/(60*24) = 11.11
也就是11天, 这是我用的一个计算节点的配置

CPU Utilization:    1.0    0.0    98.9
user    sys    idle
Hardware
CPUs: 96 x 2.10 GHz
Memory (RAM): 1009.68 GB
Local Disk: Using 351.623 of 941.596 GB
Most Full Disk Partition: 53.5% used.

时间确实好长, 急需神优化

Lv5 码农
转***鬼 职业无 6年前#7

基本算法的复杂度是 O(M*N*c)
M =candidates数量
N = bg_db 数量

比较操作看做常数时间


优化:
    1 把20个字符分为4组 每组5个 ,每组可能的类型有 4^5=1024种 。把bg_db 先按照 第一个组 聚合 ,再按照第二个组聚合...构成一个4层的树结构
    2 建立 1024*1024 的表 表示 两个5个字符的串的差异数 内存 1MB
    2 匹配的时候,如果第一个组差异已经超过4个了,就不需要看它的子节点了,可以立刻排除大量节点

每组的长度也可以不确定,最好第一组有足够的区分性,这样可以一下子排除大量数据
Lv1 新人
何***孽 软件测试工程师 6年前#8

同样没有使用算法,暴力解法,用c写的

在我的机器上(CPU: Core 2 Duo E7500, RAM: 4G, OS: Fedora 19),测试结果

candidates    bg.db        cost
10000    1000000    318758165微秒
500      1000000    14950302微秒 

如果换成题主的24核CPU,怎么也得有20倍的性能提升,然后再加上48台机器一起运算,5000W次运算为15s, 时间为
10000000 * 1000000000 / 500 / 1000000 * 15 / 20 / 48 / 3600 / 24 = 3.616898 天

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>

#define START_CC(flag)  \
    struct timeval st_##flag##_beg; \
    gettimeofday(&st_##flag##_beg, 0)

#define STOP_CC(flag)   \
    struct timeval st_##flag##_end; \
    gettimeofday(&st_##flag##_end, 0)

#define PRINT_CC(flag)  \
    double cost_##flag = 0.0L; \
    cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
    cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec);    \
    printf(#flag" cost time %.6lf microsecond.\n", cost_##flag);

#define GENEORDER_CODE_LENGTH 20 + 1

typedef struct GeneOrder
{
    char code[GENEORDER_CODE_LENGTH];
}GeneOrder, *GeneOrderPtr;

typedef struct GOArray
{
    size_t          capacity;
    size_t          length;
    GeneOrderPtr    data;
}GOArray;

GOArray createGOAarray(size_t capacity)
{
    GOArray goa;

    goa.capacity    = capacity;
    goa.length      = 0;
    goa.data        = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));

    return goa;
}

void destroyGOArray(GOArray* goa)
{
    if (goa->capacity > 0) {
        free(goa->data);
    }
}

bool readGOFile(char const *file, GOArray *goarray)
{
    FILE* fp = NULL;

    if ((fp = fopen(file, "r+")) == NULL) {
        return false;
    }

    char buff[64];

    while (fgets(buff, 64, fp) != NULL) {
        if (goarray->length < goarray->capacity) {
            memcpy(goarray->data[goarray->length].code,
                buff,
                GENEORDER_CODE_LENGTH * sizeof(char)
            );
            goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
            goarray->length ++;
        } else {
            fclose(fp);
            return true;
        }
    }

    fclose(fp);
    return true;
}

int main(int argc, char* argv[])
{
    (void)argc;

    GOArray condgo  = createGOAarray(10000);
    GOArray bggo    = createGOAarray(1000000);

    printf("loading ...\n");

    START_CC(loading);
    if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
        destroyGOArray(&condgo);
        destroyGOArray(&bggo);
        return -1;
    }
    STOP_CC(loading);
    PRINT_CC(loading);


    int count = 0;

    START_CC(compare);
    for (size_t i = 0;i < 500;i ++) {
        const GeneOrderPtr gop = condgo.data + i;
        for (size_t j = 0;j < bggo.length;j ++) {
            const GeneOrderPtr inner_gop = bggo.data + j;
            int inner_count = 0;

            for (size_t k = 0;k < 20;k ++) {
                if (gop->code[k] != inner_gop->code[k]) {
                    if (++inner_count > 4) {
                        break;
                    }
                }
            }

            if (inner_count <= 4) {
            #ifdef DBGPRINT
                printf("%d %s - %s\n", i, gop->code, inner_gop->code);
            #endif
                count++;
            }
        }
    }
    STOP_CC(compare);
    PRINT_CC(compare);

    printf("result = %d\n", count);

    destroyGOArray(&condgo);
    destroyGOArray(&bggo);

    return 0;
}

编译参数&运行

gcc -Wall -Wextra -o ccs main.c -std=c99 -Os && ./ccs candidate.list bg.db

如果改成多线程的话速度会更快一些,在我的机器改为2个线程简单使用500条candidates测试,速度可以提升到9040257微秒,线程增加到4个性能提升就不是很大了,但是较新的CPU都具有超线程技术,速度估计会更好一些。。。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <pthread.h>

#define START_CC(flag)              \
    struct timeval st_##flag##_beg; \
    gettimeofday(&st_##flag##_beg, 0)

#define STOP_CC(flag)               \
    struct timeval st_##flag##_end; \
    gettimeofday(&st_##flag##_end, 0)

#define PRINT_CC(flag)                                                                                  \
    double cost_##flag = 0.0L;                                                                          \
    cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec);                            \
    cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
    printf(#flag " cost time %.6lf microsecond.\n", cost_##flag);

#define GENEORDER_CODE_LENGTH 20 + 1

typedef struct GeneOrder {
    char code[GENEORDER_CODE_LENGTH];
} GeneOrder, *GeneOrderPtr;

typedef struct GOArray {
    size_t capacity;
    size_t length;
    GeneOrderPtr data;
} GOArray;

GOArray createGOAarray(size_t capacity)
{
    GOArray goa;

    goa.capacity = capacity;
    goa.length = 0;
    goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));

    return goa;
}

void destroyGOArray(GOArray* goa)
{
    if (goa->capacity > 0) {
        free(goa->data);
    }
}

bool readGOFile(char const* file, GOArray* goarray)
{
    FILE* fp = NULL;

    if ((fp = fopen(file, "r+")) == NULL) {
        return false;
    }

    char buff[64];

    while (fgets(buff, 64, fp) != NULL) {
        if (goarray->length < goarray->capacity) {
            memcpy(goarray->data[goarray->length].code, buff,
                GENEORDER_CODE_LENGTH * sizeof(char));
            goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
            goarray->length++;
        }
        else {
            fclose(fp);
            return true;
        }
    }

    fclose(fp);
    return true;
}

typedef struct ProcessST {
    GOArray* pcond;
    GOArray* pbg;
    size_t beg;
    size_t end; // [beg, end)
} ProcessST;

void* processThread(void* parg)
{
    ProcessST* ppst = (ProcessST*)parg;
    GOArray* pcond = ppst->pcond;
    GOArray* pbg = ppst->pbg;
    int count = 0;

    for (size_t i = ppst->beg; i < ppst->end; i++) {
        const GeneOrderPtr gop = pcond->data + i;
        for (size_t j = 0; j < pbg->length; j++) {
            const GeneOrderPtr inner_gop = pbg->data + j;
            int inner_count = 0;

            for (size_t k = 0; k < 20; k++) {
                if (gop->code[k] != inner_gop->code[k]) {
                    if (++inner_count > 4) {
                        break;
                    }
                }
            }

            if (inner_count <= 4) {
#ifdef DBGPRINT
                printf("%d %s - %s\n", i, gop->code, inner_gop->code);
#endif
                count++;
            }
        }
    }

    return (void*)count;
}

int main(int argc, char* argv[])
{
    (void)argc;

    GOArray condgo = createGOAarray(10000);
    GOArray bggo = createGOAarray(1000000);

    printf("loading ...\n");

    START_CC(loading);
    if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
        destroyGOArray(&condgo);
        destroyGOArray(&bggo);
        return -1;
    }
    STOP_CC(loading);
    PRINT_CC(loading);

    size_t range[] = { 0, 250, 500 };
    pthread_t thr[2] = { 0 };
    ProcessST pst[2];

    START_CC(compare);

    for (size_t i = 0; i < 2; i++) {
        pst[i].pcond = &condgo;
        pst[i].pbg = &bggo;
        pst[i].beg = range[i];
        pst[i].end = range[i + 1];
        pthread_create(&thr[i], NULL, processThread, &pst[i]);
    }

    int count = 0;
    int* ret = NULL;

    for (size_t i = 0; i < 2; i++) {
        pthread_join(thr[i], (void**)&ret);
        count += (int)ret;
    }

    STOP_CC(compare);
    PRINT_CC(compare);

    printf("result = %d\n", count);

    destroyGOArray(&condgo);
    destroyGOArray(&bggo);

    return 0;
}

编译测试

gcc -Wall -Wextra -o ccs main.c -std=c99 -O3 -lpthread && ./ccs candidate.list bg.db
Lv3 码奴
空***子 职业无 6年前#9

我用blast blastn-short

:)

Lv5 码农
夏***t 移动开发工程师 6年前#10

写一个思路

candidates = [
    'GGGAGCAGGCAAGGACTCTG',
    'GCTCGGGCTTGTCCACAGGA',
    '...',
    # 被你看出来啦,这些其实人类基因的片段
]

bg_db = [
    'CTGCTGACGGGTGACACCCA',
    'AGGAACTGGTGCTTGATGGC',
    '...',
    # 这个更多,有十亿左右
]

因为你的数据其实是很有特点的,这里可以进行精简。
因为所有的字符串都是20个字符长度,且都由ATCG四个字符组成。那么可以把它们变换为整数来进行比较。
二进制表现形式如下

A  ==>  00
T  ==>  01
C  ==>  10
G  ==>  11

因为一个字符串长度固定,每个字符可以由2个比特位表示,所以每个字符串可以表示为一个40位的整数。可以表示为32+8的形式,也可以直接使用64位整形。建议使用C语言来做。

再来说说比较。
因为要找到每一个candidate在bg_db中与之差异小于等于4的所有记录,所以只要两个整数一做^按位异或操作,结果中二进制中1不超过8个,且这不超过8个1最多只能分为4个组的才有可能是符合要求的(00^11=11,10^01=11)。
把结果的40个比特位分作20个组,那么就是说最多只有4个组为b01 b10 b11这三个值,其余的全部为b00
那么比较算法就很好写了。
可以对每个字节(四个组)获取其中有几个组是为三个非零值的,来简介获取整体的比较结果。
因为每个字节只有256种可能的值,而符合条件的值只有3^4=81,所以可以先将结果存储起来,然后进行获取。
这里给出一个函数,来获取结果中有几个是非零组。

/*****************下面table中值的生成******//**
  int i;
  for( i=0;i<256;++i){
    int t =0;
    t += (i&0x01 || i&0x02)?1:0;
    t += (i&0x04 || i&0x08)?1:0;
    t += (i&0x10 || i&0x20)?1:0;
    t += (i&0x40 || i&0x80)?1:0;
    printf("%d,",t);
    if(i%10 ==9){putchar('\n');}
  }
********************************************//

int table[] = {
0,1,1,1,1,2,2,2,1,2,
2,2,1,2,2,2,1,2,2,2,
2,3,3,3,2,3,3,3,2,3,
3,3,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,1,2,2,2,2,3,
3,3,2,3,3,3,2,3,3,3,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,2,3,3,3,3,4,4,4,
3,4,4,4,3,4,4,4,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,2,3,
3,3,3,4,4,4,3,4,4,4,
3,4,4,4,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4
};

int getCount(uint64_t cmpresult)
{
    uint8_t* pb = &cmpresult;    // 这里假设是小段模式,且之前比较结果是存在低40位
    return table[pb[0]]+table[pb[1]]+table[pb[2]]+table[pb[3]]+table[pb[4]];
}
上一页12下一页
 文明上网,理性发言!   😉 阿里云幸运券,戳我领取