2013-07-24 5 views
5

In meinem Code muss ich "Demaskierung" von Websocket-Paketen behandeln, was im Wesentlichen XOR'ing nicht ausgerichtete Daten beliebiger Länge bedeutet. Dank SO (Websocket data unmasking/multi byte xor) habe ich bereits herausgefunden, wie man dies (hoffentlich) mit SSE2/AVX2-Erweiterungen beschleunigen kann, aber wenn ich es jetzt betrachte, scheint mir mein Umgang mit nicht ausgerichteten Daten absolut nicht optimal zu sein. Gibt es eine Möglichkeit, meinen Code zu optimieren oder ihn bei gleicher Leistung zumindest einfacher zu machen, oder ist mein Code bereits der beste?optimieren unausgerichtete SSE2/AVX2 XOR

Hier ist der wichtige Teil des Codes (für die Frage nehme ich an, dass Daten immer mindestens genug sein werden, um den AVX2-Zyklus einmal auszuführen, aber zur gleichen Zeit wird es meistens nur ein paar Mal laufen) :

// circular shift left for uint32 
int cshiftl_u32(uint32_t num, uint8_t shift) { 
    return (num << shift) | (num >> (32 - shift));                  
}                              

// circular shift right for uint32 
int cshiftr_u32(uint32_t num, uint8_t shift) { 
    return (num >> shift) | (num << (32 - shift));                  
}                              

void optimized_xor_32(uint32_t mask, uint8_t *ds, uint8_t *de) { 
    if (ds == de) return; // zero data len -> nothing to do 

    uint8_t maskOffset = 0; 

// process single bytes till 4 byte alignment (<= 3) 
    for (; ds < de && ((uint64_t)ds & (uint64_t)3); ds++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

    if (ds == de) return; // done, return 

    if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions 
     mask = cshiftl_u32(mask, maskOffset); 

     maskOffset = 0; 
    } 

// process 4 byte block till 8 byte alignment (<= 1) 
    uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31)); 

    if (ds < de32 && ((uint64_t)de & (uint64_t)7)) { 
     *(uint32_t *)ds ^= mask; // mask is uint32_t 

     if (++ds == de) return; 
    } 

// process 8 byte block till 16 byte alignment (<= 1) 
    uint64_t mask64 = mask | (mask << 4); 
    uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63)); 

    if (ds < de64 && ((uint64_t)ds & (uint64_t)15)) { 
     *(uint64_t *)ds ^= mask64; 

     if (++ds == de) return; // done, return 
    } 


// process 16 byte block till 32 byte alignment (<= 1) (if supported) 
#ifdef CPU_SSE2 
    __m128i v128, v128_mask; 
    v128_mask = _mm_set1_epi32(mask); 

    uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127)); 

    if (ds < de128 && ((uint64_t)ds & (uint64_t)31)) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 

     if (++ds == de) return; // done, return 
    } 

#endif 
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards) 
    __m256i v256, v256_mask; 
    v256_mask = _mm256_set1_epi32(mask); 

    uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255)); 

    for (; ds < de256; ds+=32) { 
     v256 = _mm256_load_si256((__m256i *)ds); 
     v256 = _mm256_xor_si256(v256, v256_mask); 
     _mm256_store_si256((__m256i *)ds, v256); 
    } 

    if (ds == de) return; // done, return 
#endif 
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported) 
    for (; ds < de128; ds+=16) { 
     v128 = _mm_load_si128((__m128i *)ds); 
     v128 = _mm_xor_si128(v128, v128_mask); 
     _mm_store_si128((__m128i *)ds, v128); 
    } 

    if (ds == de) return; // done, return 

#endif 
    // process remaining 8 byte blocks 
    // this should always be supported, so remaining can be assumed to be executed <= 1 times 
    for (; ds < de64; ds += 8) { 
     *(uint64_t *)ds ^= mask64; 
    } 

    if (ds == de) return; // done, return 

    // process remaining 4 byte blocks (<= 1) 
    if (ds < de32) { 
     *(uint32_t *)ds ^= mask; 

     if (++ds == de) return; // done, return 
    } 


    // process remaining bytes (<= 3) 

    for (; ds < de; ds ++) { 
     *ds ^= *((uint8_t *)(&mask) + maskOffset); 
     maskOffset = (maskOffset + 1) & (uint8_t)3; 
    } 

} 

PS: die Verwendung von #ifdef statt cpuid oder dergleichen für die CPU-Markierungserkennung Bitte ignorieren.

+0

Haben Sie versucht, Ihren Code zu takten? (Sie könnten auch das bitweise '&' in Ihren Bedingungen mit runden Klammern umschließen) –

+1

Timing würde nicht wirklich helfen, da ich nur Annahmen über die Daten machen kann, die ich als Eingabe erhalten werde, aber keine echten erhalten werde Eingang für ein paar Monate zu kommen. Außerdem würde ich nur eine absolute Zahl mit dem Timing bekommen, was mir nicht wirklich hilft, da mein Problem nicht herauszufinden ist, wie lange dieser Code benötigt, um mit xy-Eingabe auszuführen, sondern wie man es schneller macht, z. Ich habe keine Vorstellung davon, was ich ändern soll. S.S .: Bitweise eingepackt & zum leichteren Verständnis, danke für den Hinweis! – griffin

+1

Ich denke, Sie werden feststellen, dass die Datenabhängigkeit den Nutzen von Aligned/Unaligned überwiegt. Wenn Sie Ihre Schleifen 2x ausrollen können, sollten Sie eine deutliche Verbesserung feststellen. – BitBank

Antwort

2

Anders als es im Handbuch heißt, sind die meisten Intel-Prozessoren im Umgang mit nicht ausgerichteten Daten eigentlich recht gut. Da Sie Intels Compiler-Builtins für die Vektorverarbeitung verwenden, nehme ich an, dass Sie Zugriff auf eine relativ aktuelle Version von icc haben.

Wenn Sie Ihre Daten nicht natürlich ausrichten können, dann fürchte ich, dass das, was Sie tun, so nah wie möglich ist, um maximale Leistung zu erreichen. Um den Code lesbarer zu machen und auf Xeon Phi (64-Byte-Vektorregister)/zukünftigen längeren Vektorprozessoren einsetzbar zu machen, würde ich vorschlagen, dass Sie anfangen, Intel Cilk Plus zu verwenden.

Beispiel:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) { 
    while (length & 0x3) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    // switch to 4 bytes per block 
    uint32_t _d = d; 
    length >>= 2; 

    // Intel Cilk Plus Array Notation 
    // Should expand automatically to the best possible SIMD instructions 
    // you are compiling for 
    _d[0:length] ^= mask; 
} 

Bitte beachten Sie, dass ich diesen Code testen habe ich Zugriff auf einen Intel-Compiler nicht jetzt haben. Wenn Sie auf Probleme stoßen, kann ich darüber sprechen, wenn ich nächste Woche wieder in meinem Büro bin.

Wenn Sie lieber intrinsics bevorzugen dann die ordnungsgemäße Verwendung von Präprozessormakros Ihr Leben erheblich erleichtern können:

#if defined(__MIC__) 
// intel Xeon Phi 
#define VECTOR_BLOCKSIZE 64 
// I do not remember the correct types/instructions right now 
#error "TODO: MIC handling" 
#elif defined(CPU_AVX2) 
#define VECTOR_BLOCKSIZE 32 
typedef __m256i my_vector_t; 
#define VECTOR_LOAD_MASK _mm256_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask)) 
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16 
typedef __m128i my_vector_t; 
#define VECTOR_LOAD_MASK _mm128_set1_epi32 
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask)) 
#else 
#define VECTOR_BLOCKSIZE 8 
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask)) 
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask) 
typedef uint64_t my_vector_t; 
#fi 

void optimized_xor_32(uint32_t mask, uint8_t *d, size_t length) { 
    size_t i; 

    // there really is no point in having extra 
    // branches for different vector lengths if they are 
    // executed at most once 
    // branch prediction is your friend here 
    // so we do one byte at a time until the block size 
    // is reached 

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) { 
     *(d++) ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left 
     length--; 
    } 

    my_vector_t * d_vector = (my_vector_t *)d; 
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask); 

    size_t vector_legth = length/VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift 
    length &= VECTOR_BLOCKSIZE -1; // remaining length 

    for (i = 0; i < vector_legth; i++) { 
     VECTOR_XOR(d_vector + i, vector_mask); 
    } 

    // process the tail 
    d = (uint8_t*)(d_vector + i); 
    for (i = 0; i < length; i++) { 
     d[i] ^= mask; 
     asm ("rold $8, %0" : "+g" (mask) :: "cc"); 
    } 

} 

Auf einer anderen Anmerkung: Sie sollten die x86 verwenden drehen Befehl statt Bit verschiebt mask zu drehen:

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc") 
+0

Ich benutze nicht icc nur gcc, und ich habe keinen besonderen Zugang zu icc. Wusste aber nicht über die rotate-Anweisung, muss nachsehen, was genau es tut, thx! – griffin

+0

@griffin OK, ich hatte den Eindruck, dass '_mm_load_si128' und family ein 'icc' eingebaut ist. In diesem Fall solltest du mein zweites Code-Snippet nehmen, nur ohne das Teil für das MIC. Leider gibt es keine intrinsischen für die Rotationsanweisungen, ich weiß jedoch, dass zum Beispiel "htons" die 2-Byte-Drehung verwendet. –

+0

Upvoted, aber ich werde es ausprobieren müssen, wenn ich Zeit habe, was wahrscheinlich nicht so früh passieren wird, aber ich werde sicherstellen, dass ich das akzeptiere, wenn ich es getestet habe und gut funktioniert. Danke für die Zwischenzeit! – griffin