2016-02-23 8 views
8

Ich habe eine Funktion, die binäre Daten von einem Bereich zu einem anderen kopiert, aber nur wenn die Bytes von einem bestimmten Wert abweichen. Hier ist ein Codebeispiel:Fast byteweise ersetzen, wenn

void copy_if(char* src, char* dest, size_t size, char ignore) 
{ 
    for (size_t i = 0; i < size; ++i) 
    { 
    if (src[i] != ignore) 
     dest[i] = src[i]; 
    } 
} 

Das Problem ist, dass dies für meinen aktuellen Bedarf zu langsam ist. Gibt es eine Möglichkeit, das gleiche Ergebnis schneller zu erhalten?

Update: Basierend auf Antworten, die ich zwei neue Implementierungen versucht:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore) 
{ 
    for (size_t i = 0; i < size; ++i) 
    { 
     char temps = src[i]; 
     char tempd = dest[i]; 
     dest[i] = temps == ignore ? tempd : temps; 
    } 
} 

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore) 
{ 
    const __m128i vignore = _mm_set1_epi8(ignore); 

    size_t i; 

    for (i = 0; i + 16 <= size; i += 16) 
    { 
     __m128i v = _mm_loadu_si128((__m128i *)&src[i]); 
     __m128i vmask = _mm_cmpeq_epi8(v, vignore); 
     vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1)); 
     _mm_maskmoveu_si128(v, vmask, (char *)&dest[i]); 
    } 
    for (; i < size; ++i) 
    { 
     if (src[i] != ignore) 
      dest[i] = src[i]; 
    } 

} 

Und ich habe folgende Ergebnisse:

Naive: 
Duration: 2.04844s 
Vectorized: 
Pass: PASS 
Duration: 3.18553s 
SIMD: 
Pass: PASS 
Duration: 0.481888s 

Ich denke, meine Compiler (letzte MSVC) vektorisiert gescheitert , aber die SIMD Lösung ist gut genug, danke!

Update (bis) ich es mit einigen Pragma Anweisungen für meine Kompilierung (MSVC) vektorisiert verwaltet und in der Tat ist es tatsächlich schneller, dass SIMD, hier der letzte Code ist:

void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore) 
{ 

#pragma loop(hint_parallel(0)) 
#pragma loop(ivdep) 

for (int i = 0; i < size; ++i) // Sadly no parallelization if i is unsigned, but more than 2Go of data is very unlikely 
{ 
    char temps = src[i]; 
    char tempd = dest[i]; 
    dest[i] = temps == ignore ? tempd : temps; 
} 
} 
+0

Was passiert mit den Bytes in 'dest', die nicht zugewiesen sind? – dbush

+1

Paralelisierung? Ein paar Threads, die jeweils einen Teil des Kopiervorgangs bearbeiten? –

+0

Sie sind bereits einem Standardwert zugewiesen. –

Antwort

6

Meine gcc 4.8.4 vektorisiert den folgenden Code ein:

#include <stddef.h> 
void copy_if(char* src, char* dest, size_t size, char ignore) 
{ 
    for (size_t i = 0; i < size; ++i) 
    { 
    char temps = src[i]; 
    char tempd = dest[i]; 
    dest[i] = temps == ignore ? tempd : temps; 
    } 
} 

Beachten Sie, dass sowohl die Last Von- und die Zuordnung zu dest[i] keine Bedingungen geknüpft sind, so dass der Compiler durch das Verbot nicht gegen Erfinden beschränkt speichert in einem Multi-Thread-Programm.

Für -march=core-avx2, die erzeugte Baugruppe enthält diese vektorisierten Schleife, zu einer Zeit auf 32 Bytes arbeiten:

.L9: 
    vmovdqu (%rdi,%rcx), %ymm1 
    addq $1, %r10 
    vmovdqu (%rsi,%rcx), %ymm2 
    vpcmpeqb %ymm0, %ymm1, %ymm3 
    vpblendvb %ymm3, %ymm2, %ymm1, %ymm1 
    vmovdqu %ymm1, (%rsi,%rcx) 
    addq $32, %rcx 
    cmpq %r10, %r8 
    ja .L9 

Für generischen x86-64, die erzeugte Baugruppe enthält diese vektorisierten Schlinge mit einem auf 16 Byte Arbeits time:

.L9: 
    movdqu (%rdi,%r8), %xmm3 
    addq $1, %r10 
    movdqa %xmm3, %xmm1 
    movdqu (%rsi,%r8), %xmm2 
    pcmpeqb %xmm0, %xmm1 
    pand %xmm1, %xmm2 
    pandn %xmm3, %xmm1 
    por %xmm2, %xmm1 
    movdqu %xmm1, (%rsi,%r8) 
    addq $16, %r8 
    cmpq %r9, %r10 
    jb .L9 

Für armv7l-Neon, clang-3.7 erzeugt die folgende Schleife, auf 16 Bytes zu einer Zeit arbeiten:

.LBB0_9:        @ %vector.body 
             @ =>This Inner Loop Header: Depth=1 
     vld1.8 {d18, d19}, [r5]! 
     subs.w lr, lr, #16 
     vceq.i8 q10, q9, q8 
     vld1.8 {d22, d23}, [r4] 
     vbsl q10, q11, q9 
     vst1.8 {d20, d21}, [r4]! 
     bne  .LBB0_9 

So ist der Code nicht nur lesbarer als Assembly oder intrinsics, es ist auch portable zu mehreren Architekturen und Compilern. Neue Architekturen und Befehlssatzerweiterungen können leicht durch Neukompilierung verwendet werden.

+0

Mit dem Intel-Compiler 13.1.3.198 musste ich die Zeiger "restrict" markieren, um die Vektorisierung zu bekommen. Sonst klagt es über "Vektorabhängigkeit", zu Recht, denke ich (beachte 'dst == src + 1'). Meine Empfehlung wäre also, den Zeiger den Modifikator 'restrict' hinzuzufügen, um die Wahrscheinlichkeit der Vektorisierung über Plattformen und Compiler zu erhöhen. – njuffa

+0

Einige Compiler können dies ohne Verwendung von 'restrict' vektorisieren, wenn ein Flag" kein Aliasing annehmen "angegeben wird (entweder explizit in der Befehlszeile oder implizit als Teil bestimmter Optimierungsebenen). – njuffa

+1

@njuffa: Nun, GCC überprüft einfach zur Laufzeit die Überlappung. Es dauert ~ 8 einfache Anweisungen ('lea',' or', 'setcc'). Sie können sie loswerden, indem Sie die Zeiger "einschränken", aber warum stören? – EOF

0

Die folgende wäre eine Verbesserung, obwohl Compiler dies selbst entwickeln könnten.

void copy_if(char* src, char* dest, size_t size, char ignore) 
{ 
    while (size--) 
    { 
    if (*src != ignore) 
     *dest = *src; 
    src++; dest++; 
    } 
} 
+0

Ihre Implementierung unterscheidet sich von der ursprünglichen - Sie "Löcher" im Ziel-Array nicht verlassen, wenn Sie Bytes in der Kopie ignorieren. (Und die size_t in der While-Schleife ist offensichtlich ein Tippfehler) .- Und ich bin bei dir, dass die meisten modernen Compiler das selbst ausarbeiten sollten. – tofro

+0

@tofro Sorry, aber ich bin neugierig. Kannst du mir sagen, wo das von der ursprünglichen Frage abweicht? Für mich sehen sie so aus, als ob sie das Gleiche machen. – muXXmit2X

+0

Das Original inkrementierte den Zielzeiger, auch wenn das Zeichen nicht kopiert, sondern ignoriert wurde - also "Löcher" im Zielarray erzeugte. Pauls Version inkrementiert es in diesem Fall nicht, so verschiebt Array-Inhalt mit jeder übersprungenen Kopie. Und ich sehe es jetzt anscheinend bearbeitet worden, so dass das nicht mehr der Fall ist. Also ignoriere bitte meinen Kommentar. – tofro

5

Hier ist ein Beispiel mit SSE2-Instrinsics zur Ausnutzung der maskmovdqu-Anweisung.Die SIMD-Version scheint bei etwa 2x die Geschwindigkeit der Original-Version auf einem Haswell CPU (Code kompiliert mit Klirren) auszuführen:

#include <stdio.h> 
    #include <string.h> 
    #include <emmintrin.h> // SSE2 
    #include <sys/time.h> // gettimeofday 

    void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore) 
    { 
     for (size_t i = 0; i < size; ++i) 
     { 
      if (src[i] != ignore) 
       dest[i] = src[i]; 
     } 
    } 

    void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore) 
    { 
     const __m128i vignore = _mm_set1_epi8(ignore); 

     size_t i; 

     for (i = 0; i + 16 <= size; i += 16) 
     { 
      __m128i v = _mm_loadu_si128((__m128i *)&src[i]); 
      __m128i vmask = _mm_cmpeq_epi8(v, vignore); 
      vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1)); 
      _mm_maskmoveu_si128 (v, vmask, (char *)&dest[i]); 
     } 
     for (; i < size; ++i) 
     { 
      if (src[i] != ignore) 
       dest[i] = src[i]; 
     } 
    } 

    #define TIME_IT(init, copy_if, src, dest, size, ignore) \ 
    do { \ 
     const int kLoops = 1000; \ 
     struct timeval t0, t1; \ 
     double t_ms = 0.0; \ 
    \ 
     for (int i = 0; i < kLoops; ++i) \ 
     { \ 
      init; \ 
      gettimeofday(&t0, NULL); \ 
      copy_if(src, dest, size, ignore); \ 
      gettimeofday(&t1, NULL); \ 
      t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \ 
     } \ 
     printf("%s: %.3g ns/element\n", #copy_if, t_ms * 1.0e6/(double)(kLoops * size)); \ 
    } while (0) 

    int main() 
    { 
     const size_t N = 10000000; 

     uint8_t *src = malloc(N); 
     uint8_t *dest_ref = malloc(N); 
     uint8_t *dest_init = malloc(N); 
     uint8_t *dest_test = malloc(N); 

     for (size_t i = 0; i < N; ++i) 
     { 
      src[i] = (uint8_t)rand(); 
      dest_init[i] = (uint8_t)rand(); 
     } 

     memcpy(dest_ref, dest_init, N); 
     copy_if_ref(src, dest_ref, N, 0x42); 

     memcpy(dest_test, dest_init, N); 
     copy_if_SSE(src, dest_test, N, 0x42); 
     printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL"); 

     TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42); 
     TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42); 

     return 0; 
    } 

Compile und Test:

$ gcc -Wall -msse2 -O3 copy_if.c && ./a.out 
copy_if_SSE: PASS 
copy_if_ref: 0.416 ns/element 
copy_if_SSE: 0.239 ns/element 

(Hinweis: frühere Version von diese Antwort hatte einen Streufaktor von 16 in dem Zeitcode, so frühere Zahlen waren 16x höher als sie sein sollten.)


UPDATE

Inspiriert durch Lösung und Compiler generierte Code des @ EOF habe ich versucht, einen anderen Ansatz mit SSE4 und bekam viel bessere Ergebnisse:

#include <stdio.h> 
#include <string.h> 
#include <smmintrin.h> // SSE4 
#include <sys/time.h> // gettimeofday 

void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore) 
{ 
    for (size_t i = 0; i < size; ++i) 
    { 
     if (src[i] != ignore) 
      dest[i] = src[i]; 
    } 
} 

void copy_if_EOF(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore) 
{ 
    for (size_t i = 0; i < size; ++i) 
    { 
     char temps = src[i]; 
     char tempd = dest[i]; 
     dest[i] = temps == ignore ? tempd : temps; 
    } 
} 

void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore) 
{ 
    const __m128i vignore = _mm_set1_epi8(ignore); 

    size_t i; 

    for (i = 0; i + 16 <= size; i += 16) 
    { 
     __m128i vsrc = _mm_loadu_si128((__m128i *)&src[i]); 
     __m128i vdest = _mm_loadu_si128((__m128i *)&dest[i]); 
     __m128i vmask = _mm_cmpeq_epi8(vsrc, vignore); 
     vdest = _mm_blendv_epi8(vsrc, vdest, vmask); 
     _mm_storeu_si128 ((__m128i *)&dest[i], vdest); 
    } 
    for (; i < size; ++i) 
    { 
     if (src[i] != ignore) 
      dest[i] = src[i]; 
    } 
} 

#define TIME_IT(init, copy_if, src, dest, size, ignore) \ 
do { \ 
    const int kLoops = 1000; \ 
    struct timeval t0, t1; \ 
    double t_ms = 0.0; \ 
\ 
    for (int i = 0; i < kLoops; ++i) \ 
    { \ 
     init; \ 
     gettimeofday(&t0, NULL); \ 
     copy_if(src, dest, size, ignore); \ 
     gettimeofday(&t1, NULL); \ 
     t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \ 
    } \ 
    printf("%s: %.3g ns/element\n", #copy_if, t_ms * 1.0e6/(double)(kLoops * size)); \ 
} while (0) 

int main() 
{ 
    const size_t N = 10000000; 

    uint8_t *src = malloc(N); 
    uint8_t *dest_ref = malloc(N); 
    uint8_t *dest_init = malloc(N); 
    uint8_t *dest_test = malloc(N); 

    for (size_t i = 0; i < N; ++i) 
    { 
     src[i] = (uint8_t)rand(); 
     dest_init[i] = (uint8_t)rand(); 
    } 

    memcpy(dest_ref, dest_init, N); 
    copy_if_ref(src, dest_ref, N, 0x42); 

    memcpy(dest_test, dest_init, N); 
    copy_if_EOF(src, dest_test, N, 0x42); 
    printf("copy_if_EOF: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL"); 

    memcpy(dest_test, dest_init, N); 
    copy_if_SSE(src, dest_test, N, 0x42); 
    printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL"); 

    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42); 
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_EOF, src, dest_test, N, 0x42); 
    TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42); 

    return 0; 
} 

Compile und Test:

$ gcc -Wall -msse4 -O3 copy_if_2.c && ./a.out 
copy_if_EOF: PASS 
copy_if_SSE: PASS 
copy_if_ref: 0.419 ns/element 
copy_if_EOF: 0.114 ns/element 
copy_if_SSE: 0.114 ns/element 

Schlussfolgerung: während _mm_maskmoveu_si128 scheint eine gute Lösung für dieses Problem aus Sicht der Funktionalität scheint es nicht so ef Genau wie das Verwenden expliziter Lasten, Maskieren und Speichern. Außerdem scheint der vom Compiler generierte Code (siehe @ EOFs Antwort) in diesem Fall genauso schnell zu sein wie der explizit codierte SIMD.

+0

Wenn ich die 'SSE' Version mit meiner C-Version auf meinem' Kern-avx2' ersetzen, erhalte ich 'copy_if_vec: PASS copy_if_SSE_unrolled: copy_if_ref PASS: 11,7 ns/Element copy_if_vec: 1,7 ns/Element copy_if_SSE_unrolled: 4,16 ns/Element', mehr als 2x schneller als Ihre abgerollte Version. Auf x86-64 ('SSE2'), erhalte ich' copy_if_vec: PASS copy_if_SSE_unrolled: PASS copy_if_ref: 11,7 ns/Element copy_if_vec: 1,89 ns/Element copy_if_SSE_unrolled: 4,54 ns/element', 2x auch schneller. – EOF

+1

@EOF: Ich habe gerade festgestellt, dass mein Timing-Code um den Faktor 16 deaktiviert war (ich benutzte einen alten Code und habe ihn vorher nicht bemerkt). Ich habe die ursprüngliche SSE2-Implementierung oben aktualisiert und eine SSE4-Version hinzugefügt, die vom Compilercode für Ihre Lösung inspiriert wurde. Das Timing stimmt jetzt ziemlich gut überein. Ich denke, der Vektorcode wird für diesen speziellen Fall schwer zu schlagen sein. –

+2

Vergessen Sie nicht zu erwähnen, dass "maskmovdqu" einen nicht-temporalen Hinweis hat, so dass das Ziel aus dem Cache entfernt wird, wenn es fertig ist. Es ist ein 10-Uop-Befehl mit einem Durchsatz von einem pro 6 Zyklen auf Haswell. 'vmaskmovps' /' pd' und 'vpmaskmovd/q' haben keinen NT-Hinweis und sind schneller (besonders wenn Sie das Ziel erneut lesen, solange es sich noch im Cache befindet). Sie maskieren jedoch nur bei einer Granularität von 32b oder 64b. Es gibt kein AVX2 'vmaskmovdqu ymm', nur die 128b-Version, die noch einen NT-Hinweis hat. –

0

Wenn die Häufigkeit der Ignorierung nicht zu hoch ist, kann ein Memcpy-Code wie unten hilfreich sein.

size_t copy_if(char* src, char* dest, size_t size, char ignore) 
{ 
    size_t i=0, count =0 , res= 0; 
    while (count < size) 
    { 
    while (*src != ignore){ 
     count++; 
     if (count > size) 
      break; 
     src++; 
     i++; 
     res++; 
    } 
    count++; 
    if (i> 0){ 
     memcpy(dest,src-i, i); 
     dest += i; 
    } 
    i = 0; 
    src++; 
    } 
return res; 
} 
+0

Mit den meisten (allen?) Implementierungen von memcpy übergeben Sie das Array jetzt zweimal. Einmal zum Suchen nach Vorkommen von Ignorieren, das zweite Mal (versteckt) innerhalb von memcpy(). Wahrscheinlich nicht besser als vorher;) – tofro