@Test public void readWriteCycleThreaded() throws IOException { final MapperCore mapper = new MapperCore("/tmp/MemoryMap", BIG_SIZE); final AtomicInteger fails = new AtomicInteger(); final AtomicInteger done = new AtomicInteger(); Runnable r = new Runnable() { public void run() { try { // Set to 0 for sequential test long off = (long) ((BIG_SIZE - 1024) * Math.random()); System.out.println("Running new thread"); byte[] bOut = new byte[1024]; double counts = 10000000; for (long i = 0; i < counts; ++i) { ByteBuffer buf = ByteBuffer.wrap(bOut); long pos = (long) (((BIG_SIZE - 1024) * (i / counts)) + off) % (BIG_SIZE - 1024); // Align with 8 byte boundary pos = pos / 8; pos = pos * 8; for (int j = 0; j < 128; ++j) { buf.putLong(pos + j * 8); } mapper.put(pos, bOut); byte[] bIn = mapper.get(pos, 1024); buf = ByteBuffer.wrap(bIn); for (int j = 0; j < 128; ++j) { long val = buf.getLong(); if (val != pos + j * 8) { throw new RuntimeException("Error at " + (pos + j * 8) + " was " + val); } } } System.out.println("Thread Complete"); } catch (Throwable e) { e.printStackTrace(); fails.incrementAndGet(); } finally { done.incrementAndGet(); } } }; int nThreads = 128; for (int i = 0; i < nThreads; ++i) { new Thread(r).start(); } while (done.intValue() != nThreads) { try { Thread.sleep(1000); } catch (InterruptedException e) { // ignore } } if (fails.intValue() != 0) { throw new RuntimeException("It failed " + fails.intValue()); } } |