补充数据链接:
candidates: https://pan.baidu.com/s/1nvGWbrV
bg_db: https://pan.baidu.com/s/1sllFLAd
每个字符串长度都是23,只要前面20个字符就行,由于数据太大,我只传了五分之一,大神们可以挑战一下,有速度快的可以贴一下代码,让小弟拜读一下,谢谢!
我现在有两个字符串数组,姑且称为candidates和bg_db,全部都是长度为20的短字符串,并且每个字符串的每个字符只有ATCG四种可能(没错!就是基因组序列啦!):
candidates = [
'GGGAGCAGGCAAGGACTCTG',
'GCTCGGGCTTGTCCACAGGA',
'...',
# 被你看出来啦,这些其实人类基因的片段
]
bg_db = [
'CTGCTGACGGGTGACACCCA',
'AGGAACTGGTGCTTGATGGC',
'...',
# 这个更多,有十亿左右
]
我的任务是对candidates的每一个candidate,找到bg_db中所有与其小于等于4个差异的记录,
举个例子来说:
# 上面一条为candidate,即candidates的一个记录
# 中间|代表相同,*代表不相同
# 下面一条代表bg_db的一条记录
A T C G A T C G A T C G A T C G A T C G
| | | | | | | | | | | | | | | | | | | | # 差异为0
A T C G A T C G A T C G A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | | | | | | | | | | | | | | | | | # 差异为1
T T C G A T C G A T C G A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | | | | | | | | | | # 差异为2
T T C G T T C G A T C G A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | * | | | | | | | | # 差异为3
T T C G T T C G A T C C A T C G A T C G
A T C G A T C G A T C G A T C G A T C G
* | | | * | | | | | | * | | | * | | | | # 差异为4
T T C G T T C G A T C C A T C A A T C G
我的问题是如果快速地找到:每一个candidate在bg_db中与之差异小于等于4的所有记录,
如果采用暴力遍历的话,以Python为例:
def align(candidate, record_from_bg_db):
mismatches = 0
for i in range(20):
if candidate[i] != record_from_bg_db[i]:
mismatches += 1
if mismatches >= 4:
return False
return True
candidate = 'GGGAGCAGGCAAGGACTCTG'
record_from_bg_db = 'CTGCTGACGGGTGACACCCA'
align(candidate, record_from_bg_db) # 1.24微秒左右
# 总时间:
10000000 * 1000000000 * 1.24 / 1000 / 1000 / 60 / 60 / 24 / 365
# = 393
# 1千万个candidates,10亿条bg_db记录
# 耗时大约393年
# 完全无法忍受啊
我的想法是,bg_db是高度有序的字符串(长度固定,每个字符的可能只有四种),有没有什么算法,可以让candidate快速比较完所有的bg_db,各位大神,求赐教。
抱歉,今天看到还有人回复。
仔细看了一下问题,发现我以前以为只是匹配。
所以我提出用ac自动机。
但是题主是为了找到序列的差异。
这就是找两者的编辑距离。
wiki:编辑距离
wiki:来文斯坦距离
以前刷OJ的时候是使用DP(动态规划)来找一个字符串转换成另外一个字符串的最少编辑次数。
for(i:0->len)
for(j:0->len)
str1[i]==str2[j] ? cost=0 : cost=1
dp[i,j]=min(
dp[i-1, j ] + 1, // 刪除
dp[i , j-1] + 1, // 插入
dp[i-1, j-1] + cost // 替換
)
比如 :
str1 "abcdef"
str2 "acddff"
str2 转换为 str1
插入b 算一次
删除d 算一次
修改f 算一次
对于题主的ATCG基因序列来说,是不是只需要找到修改的就行了。
然而像这种
ATCGATCG
TCGATCGA
这样应该怎么算。
如果仅仅是找到修改的话,直接比较 str1[i] 和 str2[i] 就可以了。
for(i:0->len)
if(str1[i]!=str2[i] )++count;
受到@rockford 的启发。
我们可以对 原始数据 进行预处理。
candidates 中的串
GGGAGCAGGCAAGGACTCTG
A5 T2 C4 G9
进行处理之后的额外数据 A5 T2 C4 G9
bg_db 中的串
CTGCTGACGGGTGACACCCA
A4 T3 C7 G6
进行处理之后的额外数据 A4 T3 C7 G6
A5 -> A4 记作 -1
T2 -> T3 记作 +1
C4 -> C7 记作 +3
G9 -> G6 记作 -3
很明显 A 如果修改只能变成 TCG。
同理,我们只需要统计所有的+ 或者所有的 -
就可以知道他们的至少有多少不同之处。
大于4的都可以不进行比较。
通过先对比预处理的额外数据,然后再通过单次的比较算法来 进行比对。
(星期六加班-ing,下班后写一下)
可以尝试用字典树来保存所有的字符串。然后在查询的时候就可以用在字典树上遍历的方法。
在树上遍历的时候,可以维护一个当前节点的序列,这个序列里保存着当前遍历到的节点和对应节点mismatch的数量。
在遍历下一个节点的时候,要把当前序列里所有的节点都尝试向下,并形成新的节点序列。
好处是可以把很多个串的当前位放在一起进行比较,可以节约一些时间。由于每个位置选择不多,mismatch也不大,所有应该不会出现当前节点序列膨胀过大的情况。(这是猜想… 没太认真验证过…)
def match(candidate, root):
nset = [[],[]]
currentId = 0
nset[currentId].append((root, 0))
for ch in candidate:
nextId = 1 - currentId
for item in nset[currentId]:
node, mismatch = item
for nx in node.child:
if nx.key == ch:
nset[nextId].append((nx, mismatch))
else:
if mismatch:
nset[nextId].append((nx, mismatch - 1))
nset[currentId].clear()
currentId = 1 - currentId
return nset[currentId]
上面的代码就是一个大概的意思。如果用C++写的话会再快很多。
整个过程都可以用集群做分布式计算。
算法白痴一个, 不过刚好手上有可以用的计算资源, 就按照原来的思路(暴力无脑并行计算流)做一下题主这个例子:
run.py
from multiprocessing import Pool, cpu_count
def do_align(item):
with open("bg_db.txt") as fh, open("result.txt", "a") as rh:
db_line = fh.readline().strip()
while db_line:
counts = 0
for i in [(i, j) for (i, j) in zip(db_line, item)]:
if i[0] != i[1]:
counts += 1
if counts >= 4:
break
if counts < 4:
rh.write("{}\n".format(db_line))
db_line = fh.readline().strip()
def main():
pool = Pool(cpu_count())
with open("candidates.txt") as fh:
pool.map(do_align, map(str.strip, fh))
pool.close()
pool.join()
if __name__ == "__main__":
main()
简单先生成点数据
import random
import string
def id_generator(size=8, chars=string.ascii_letters + string.digits):
return ''.join(random.choice(chars) for _ in range(size))
with open("candidates.txt", "w") as fh:
for i in range(10000):
fh.write("{}\n".format(id_generator(20, "ATCG")))
with open("bg_db.txt", "w") as fh:
for i in range(1000000):
fh.write("{}\n".format(id_generator(20, "ATCG")))
嗯, 造了10000
行的candidates.txt
和1000000
行的bg_db.txt
运行看下时间:
$time python run.py
real 15m45.445s
user 1362m41.712s
sys 1m12.099s
题主实际的数据是千万行的candidates.txt
和十亿行的bg_db.txt
, 简单估算下时间16*1000/(60*24) = 11.11
也就是11天, 这是我用的一个计算节点的配置
CPU Utilization: 1.0 0.0 98.9
user sys idle
Hardware
CPUs: 96 x 2.10 GHz
Memory (RAM): 1009.68 GB
Local Disk: Using 351.623 of 941.596 GB
Most Full Disk Partition: 53.5% used.
时间确实好长, 急需神优化
同样没有使用算法,暴力解法,用c写的
在我的机器上(CPU: Core 2 Duo E7500, RAM: 4G, OS: Fedora 19),测试结果
candidates bg.db cost
10000 1000000 318758165微秒
500 1000000 14950302微秒
如果换成题主的24核CPU,怎么也得有20倍的性能提升,然后再加上48台机器一起运算,5000W次运算为15s, 时间为
10000000 * 1000000000 / 500 / 1000000 * 15 / 20 / 48 / 3600 / 24 = 3.616898 天
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#define START_CC(flag) \
struct timeval st_##flag##_beg; \
gettimeofday(&st_##flag##_beg, 0)
#define STOP_CC(flag) \
struct timeval st_##flag##_end; \
gettimeofday(&st_##flag##_end, 0)
#define PRINT_CC(flag) \
double cost_##flag = 0.0L; \
cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
printf(#flag" cost time %.6lf microsecond.\n", cost_##flag);
#define GENEORDER_CODE_LENGTH 20 + 1
typedef struct GeneOrder
{
char code[GENEORDER_CODE_LENGTH];
}GeneOrder, *GeneOrderPtr;
typedef struct GOArray
{
size_t capacity;
size_t length;
GeneOrderPtr data;
}GOArray;
GOArray createGOAarray(size_t capacity)
{
GOArray goa;
goa.capacity = capacity;
goa.length = 0;
goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));
return goa;
}
void destroyGOArray(GOArray* goa)
{
if (goa->capacity > 0) {
free(goa->data);
}
}
bool readGOFile(char const *file, GOArray *goarray)
{
FILE* fp = NULL;
if ((fp = fopen(file, "r+")) == NULL) {
return false;
}
char buff[64];
while (fgets(buff, 64, fp) != NULL) {
if (goarray->length < goarray->capacity) {
memcpy(goarray->data[goarray->length].code,
buff,
GENEORDER_CODE_LENGTH * sizeof(char)
);
goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
goarray->length ++;
} else {
fclose(fp);
return true;
}
}
fclose(fp);
return true;
}
int main(int argc, char* argv[])
{
(void)argc;
GOArray condgo = createGOAarray(10000);
GOArray bggo = createGOAarray(1000000);
printf("loading ...\n");
START_CC(loading);
if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return -1;
}
STOP_CC(loading);
PRINT_CC(loading);
int count = 0;
START_CC(compare);
for (size_t i = 0;i < 500;i ++) {
const GeneOrderPtr gop = condgo.data + i;
for (size_t j = 0;j < bggo.length;j ++) {
const GeneOrderPtr inner_gop = bggo.data + j;
int inner_count = 0;
for (size_t k = 0;k < 20;k ++) {
if (gop->code[k] != inner_gop->code[k]) {
if (++inner_count > 4) {
break;
}
}
}
if (inner_count <= 4) {
#ifdef DBGPRINT
printf("%d %s - %s\n", i, gop->code, inner_gop->code);
#endif
count++;
}
}
}
STOP_CC(compare);
PRINT_CC(compare);
printf("result = %d\n", count);
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return 0;
}
编译参数&运行
gcc -Wall -Wextra -o ccs main.c -std=c99 -Os && ./ccs candidate.list bg.db
如果改成多线程的话速度会更快一些,在我的机器改为2
个线程简单使用500条candidates
测试,速度可以提升到9040257微秒
,线程增加到4
个性能提升就不是很大了,但是较新的CPU都具有超线程技术,速度估计会更好一些。。。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdbool.h>
#include <sys/time.h>
#include <pthread.h>
#define START_CC(flag) \
struct timeval st_##flag##_beg; \
gettimeofday(&st_##flag##_beg, 0)
#define STOP_CC(flag) \
struct timeval st_##flag##_end; \
gettimeofday(&st_##flag##_end, 0)
#define PRINT_CC(flag) \
double cost_##flag = 0.0L; \
cost_##flag = (double)(st_##flag##_end.tv_sec - st_##flag##_beg.tv_sec); \
cost_##flag = cost_##flag * 1000000L + (double)(st_##flag##_end.tv_usec - st_##flag##_beg.tv_usec); \
printf(#flag " cost time %.6lf microsecond.\n", cost_##flag);
#define GENEORDER_CODE_LENGTH 20 + 1
typedef struct GeneOrder {
char code[GENEORDER_CODE_LENGTH];
} GeneOrder, *GeneOrderPtr;
typedef struct GOArray {
size_t capacity;
size_t length;
GeneOrderPtr data;
} GOArray;
GOArray createGOAarray(size_t capacity)
{
GOArray goa;
goa.capacity = capacity;
goa.length = 0;
goa.data = (GeneOrderPtr)malloc(capacity * sizeof(GeneOrder));
return goa;
}
void destroyGOArray(GOArray* goa)
{
if (goa->capacity > 0) {
free(goa->data);
}
}
bool readGOFile(char const* file, GOArray* goarray)
{
FILE* fp = NULL;
if ((fp = fopen(file, "r+")) == NULL) {
return false;
}
char buff[64];
while (fgets(buff, 64, fp) != NULL) {
if (goarray->length < goarray->capacity) {
memcpy(goarray->data[goarray->length].code, buff,
GENEORDER_CODE_LENGTH * sizeof(char));
goarray->data[goarray->length].code[GENEORDER_CODE_LENGTH - 1] = '\0';
goarray->length++;
}
else {
fclose(fp);
return true;
}
}
fclose(fp);
return true;
}
typedef struct ProcessST {
GOArray* pcond;
GOArray* pbg;
size_t beg;
size_t end; // [beg, end)
} ProcessST;
void* processThread(void* parg)
{
ProcessST* ppst = (ProcessST*)parg;
GOArray* pcond = ppst->pcond;
GOArray* pbg = ppst->pbg;
int count = 0;
for (size_t i = ppst->beg; i < ppst->end; i++) {
const GeneOrderPtr gop = pcond->data + i;
for (size_t j = 0; j < pbg->length; j++) {
const GeneOrderPtr inner_gop = pbg->data + j;
int inner_count = 0;
for (size_t k = 0; k < 20; k++) {
if (gop->code[k] != inner_gop->code[k]) {
if (++inner_count > 4) {
break;
}
}
}
if (inner_count <= 4) {
#ifdef DBGPRINT
printf("%d %s - %s\n", i, gop->code, inner_gop->code);
#endif
count++;
}
}
}
return (void*)count;
}
int main(int argc, char* argv[])
{
(void)argc;
GOArray condgo = createGOAarray(10000);
GOArray bggo = createGOAarray(1000000);
printf("loading ...\n");
START_CC(loading);
if (!readGOFile(argv[1], &condgo) || !readGOFile(argv[2], &bggo)) {
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return -1;
}
STOP_CC(loading);
PRINT_CC(loading);
size_t range[] = { 0, 250, 500 };
pthread_t thr[2] = { 0 };
ProcessST pst[2];
START_CC(compare);
for (size_t i = 0; i < 2; i++) {
pst[i].pcond = &condgo;
pst[i].pbg = &bggo;
pst[i].beg = range[i];
pst[i].end = range[i + 1];
pthread_create(&thr[i], NULL, processThread, &pst[i]);
}
int count = 0;
int* ret = NULL;
for (size_t i = 0; i < 2; i++) {
pthread_join(thr[i], (void**)&ret);
count += (int)ret;
}
STOP_CC(compare);
PRINT_CC(compare);
printf("result = %d\n", count);
destroyGOArray(&condgo);
destroyGOArray(&bggo);
return 0;
}
编译测试
gcc -Wall -Wextra -o ccs main.c -std=c99 -O3 -lpthread && ./ccs candidate.list bg.db
写一个思路
candidates = [
'GGGAGCAGGCAAGGACTCTG',
'GCTCGGGCTTGTCCACAGGA',
'...',
# 被你看出来啦,这些其实人类基因的片段
]
bg_db = [
'CTGCTGACGGGTGACACCCA',
'AGGAACTGGTGCTTGATGGC',
'...',
# 这个更多,有十亿左右
]
因为你的数据其实是很有特点的,这里可以进行精简。
因为所有的字符串都是20个字符长度,且都由ATCG
四个字符组成。那么可以把它们变换为整数来进行比较。
二进制表现形式如下
A ==> 00
T ==> 01
C ==> 10
G ==> 11
因为一个字符串长度固定,每个字符可以由2个比特位表示,所以每个字符串可以表示为一个40
位的整数。可以表示为32+8
的形式,也可以直接使用64
位整形。建议使用C
语言来做。
再来说说比较。
因为要找到每一个candidate在bg_db中与之差异小于等于4的所有记录
,所以只要两个整数一做^
按位异或操作,结果中二进制中1不超过8个,且这不超过8个1最多只能分为4个组
的才有可能是符合要求的(00^11=11,10^01=11)。
把结果的40
个比特位分作20个组,那么就是说最多只有4
个组为b01
b10
b11
这三个值,其余的全部为b00
。
那么比较算法就很好写了。
可以对每个字节(四个组)获取其中有几个组是为三个非零值的,来简介获取整体的比较结果。
因为每个字节只有256
种可能的值,而符合条件的值只有,所以可以先将结果存储起来,然后进行获取。3^4=81
种
这里给出一个函数,来获取结果中有几个是非零组。
/*****************下面table中值的生成******//**
int i;
for( i=0;i<256;++i){
int t =0;
t += (i&0x01 || i&0x02)?1:0;
t += (i&0x04 || i&0x08)?1:0;
t += (i&0x10 || i&0x20)?1:0;
t += (i&0x40 || i&0x80)?1:0;
printf("%d,",t);
if(i%10 ==9){putchar('\n');}
}
********************************************//
int table[] = {
0,1,1,1,1,2,2,2,1,2,
2,2,1,2,2,2,1,2,2,2,
2,3,3,3,2,3,3,3,2,3,
3,3,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,1,2,2,2,2,3,
3,3,2,3,3,3,2,3,3,3,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,2,3,3,3,3,4,4,4,
3,4,4,4,3,4,4,4,1,2,
2,2,2,3,3,3,2,3,3,3,
2,3,3,3,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4,2,3,3,3,
3,4,4,4,3,4,4,4,3,4,
4,4,1,2,2,2,2,3,3,3,
2,3,3,3,2,3,3,3,2,3,
3,3,3,4,4,4,3,4,4,4,
3,4,4,4,2,3,3,3,3,4,
4,4,3,4,4,4,3,4,4,4,
2,3,3,3,3,4,4,4,3,4,
4,4,3,4,4,4
};
int getCount(uint64_t cmpresult)
{
uint8_t* pb = &cmpresult; // 这里假设是小段模式,且之前比较结果是存在低40位
return table[pb[0]]+table[pb[1]]+table[pb[2]]+table[pb[3]]+table[pb[4]];
}