2014-07-23 5 views
6

ich nicht wirklich verstehen, wie ich solche input schaffen kann, die Seekable und PositionedReadable ist ...Wie FSDataInputStream mit rohem InputStream instanziiert werden?

Resource resource = new ClassPathResource("somefile"); 
InputStream bla = resource.getInputStream(); 
FSDataInputStream inputStream = new FSDataInputStream (bla); 

bei FS Linie Werfen:

java.lang.IllegalArgumentException: In is not an instance of Seekable or PositionedReadable 

ich Mocks tun müssen, und dies ist ein Blocker für mich.

Antwort

2

FSDataInputStream Konstruktor, wie unten gezeigt, definiert in FSDataInputStream.java erwartet InputStream Parameter folgende solution Hilfe eine instance von Seekable oder PositionedReadable

public FSDataInputStream(InputStream in) throws IOException 
{ 
    super(in); 
    if(!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { 
     throw new IllegalArgumentException(
      "In is not an instance of Seekable or PositionedReadable"); 
    } 
} 

Hoffnung zu sein s du.

import java.io.*; 

import org.apache.hadoop.fs.FSDataInputStream; 
import org.apache.hadoop.fs.PositionedReadable; 
import org.apache.hadoop.fs.Seekable; 
import org.springframework.core.io.ClassPathResource; 
import org.springframework.core.io.Resource; 

public class SeekableTest { 

    public static void main(String[] args) throws IOException 
    { 
     Resource resource = new ClassPathResource("somefile"); 
     InputStream in = resource.getInputStream(); 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     byte buf[] = new byte[1024]; 
     int read; 

     while ((read = in.read(buf)) > 0) 
      baos.write(buf, 0, read); 

     byte data[] = baos.toByteArray(); 
     SeekableByteArrayInputStream bais = new SeekableByteArrayInputStream(data); 
     FSDataInputStream in2 = new FSDataInputStream(bais); 
    } 

    static class SeekableByteArrayInputStream extends ByteArrayInputStream implements Seekable, PositionedReadable { 

     public SeekableByteArrayInputStream(byte[] buf) 
     { 
      super(buf); 
     } 
     @Override 
     public long getPos() throws IOException{ 
      return pos; 
     } 

     @Override 
     public void seek(long pos) throws IOException { 
      if (mark != 0) 
      throw new IllegalStateException(); 

      reset(); 
      long skipped = skip(pos); 

      if (skipped != pos) 
      throw new IOException(); 
     } 

     @Override 
     public boolean seekToNewSource(long targetPos) throws IOException { 
      return false; 
     } 

     @Override 
     public int read(long position, byte[] buffer, int offset, int length) throws IOException { 

      if (position >= buf.length) 
      throw new IllegalArgumentException(); 
      if (position + length > buf.length) 
      throw new IllegalArgumentException(); 
      if (length > buffer.length) 
      throw new IllegalArgumentException(); 

      System.arraycopy(buf, (int) position, buffer, offset, length); 
      return length; 
     } 

     @Override 
     public void readFully(long position, byte[] buffer) throws IOException { 
      read(position, buffer, 0, buffer.length); 

     } 

     @Override 
     public void readFully(long position, byte[] buffer, int offset, int length) throws IOException { 
      read(position, buffer, offset, length); 
     } 
    } 
} 

Referenz: accumulo

Verwandte Themen