Squashed 'src/leveldb/' changes from aca1ffc..ae6c262
ae6c262 Merge branch 'leveldb' into ripple-fork 28fa222 Looks like a bit more delay is needed to smooth the latency. a18f3e6 Tidy up JobQueue, add ripple_core module ab82e57 Release leveldb 1.12 02c6259 Release leveldb 1.11 5bbb544 Rate limit compactions with a 25ms pause after each complete file. 8c29c47 LevelDB issue 178 fix: cannot resize a level 0 compaction set 18b245c Added GNU/kFreeBSD kernel name (TARGET_OS) 8be9d12 CondVar::SignalAll was broken, leading to deadlocks on Windows builds. http://code.google.com/p/leveldb/issues/detail?id=149 c9fc070 Upgrade LevelDB to 1.10.0, mostly for better write stall logging. 8215b15 Tweak to variable name to support unity build git-subtree-dir: src/leveldb git-subtree-split: ae6c2620b2ef3d5c69e63dc0eda865d6a39fa061
This commit is contained in:
@@ -33,8 +33,11 @@ class AtomicCounter {
|
||||
public:
|
||||
AtomicCounter() : count_(0) { }
|
||||
void Increment() {
|
||||
IncrementBy(1);
|
||||
}
|
||||
void IncrementBy(int count) {
|
||||
MutexLock l(&mu_);
|
||||
count_++;
|
||||
count_ += count;
|
||||
}
|
||||
int Read() {
|
||||
MutexLock l(&mu_);
|
||||
@@ -45,6 +48,10 @@ class AtomicCounter {
|
||||
count_ = 0;
|
||||
}
|
||||
};
|
||||
|
||||
void DelayMilliseconds(int millis) {
|
||||
Env::Default()->SleepForMicroseconds(millis * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
// Special Env used to delay background operations
|
||||
@@ -69,6 +76,7 @@ class SpecialEnv : public EnvWrapper {
|
||||
AtomicCounter random_read_counter_;
|
||||
|
||||
AtomicCounter sleep_counter_;
|
||||
AtomicCounter sleep_time_counter_;
|
||||
|
||||
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
|
||||
delay_sstable_sync_.Release_Store(NULL);
|
||||
@@ -103,7 +111,7 @@ class SpecialEnv : public EnvWrapper {
|
||||
Status Flush() { return base_->Flush(); }
|
||||
Status Sync() {
|
||||
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
|
||||
env_->SleepForMicroseconds(100000);
|
||||
DelayMilliseconds(100);
|
||||
}
|
||||
return base_->Sync();
|
||||
}
|
||||
@@ -174,8 +182,9 @@ class SpecialEnv : public EnvWrapper {
|
||||
|
||||
virtual void SleepForMicroseconds(int micros) {
|
||||
sleep_counter_.Increment();
|
||||
target()->SleepForMicroseconds(micros);
|
||||
sleep_time_counter_.IncrementBy(micros);
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
class DBTest {
|
||||
@@ -461,6 +470,20 @@ class DBTest {
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
bool DeleteAnSSTFile() {
|
||||
std::vector<std::string> filenames;
|
||||
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) {
|
||||
ASSERT_OK(env_->DeleteFile(TableFileName(dbname_, number)));
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
TEST(DBTest, Empty) {
|
||||
@@ -611,7 +634,7 @@ TEST(DBTest, GetEncountersEmptyLevel) {
|
||||
}
|
||||
|
||||
// Step 4: Wait for compaction to finish
|
||||
env_->SleepForMicroseconds(1000000);
|
||||
DelayMilliseconds(1000);
|
||||
|
||||
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
|
||||
} while (ChangeOptions());
|
||||
@@ -1295,7 +1318,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_a) {
|
||||
Reopen();
|
||||
Reopen();
|
||||
ASSERT_EQ("(a->v)", Contents());
|
||||
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
|
||||
DelayMilliseconds(1000); // Wait for compaction to finish
|
||||
ASSERT_EQ("(a->v)", Contents());
|
||||
}
|
||||
|
||||
@@ -1311,7 +1334,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
|
||||
Put("","");
|
||||
Reopen();
|
||||
Put("","");
|
||||
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
|
||||
DelayMilliseconds(1000); // Wait for compaction to finish
|
||||
Reopen();
|
||||
Put("d","dv");
|
||||
Reopen();
|
||||
@@ -1321,7 +1344,7 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
|
||||
Delete("b");
|
||||
Reopen();
|
||||
ASSERT_EQ("(->)(c->cv)", Contents());
|
||||
env_->SleepForMicroseconds(1000000); // Wait for compaction to finish
|
||||
DelayMilliseconds(1000); // Wait for compaction to finish
|
||||
ASSERT_EQ("(->)(c->cv)", Contents());
|
||||
}
|
||||
|
||||
@@ -1506,6 +1529,30 @@ TEST(DBTest, NoSpace) {
|
||||
ASSERT_GE(env_->sleep_counter_.Read(), 5);
|
||||
}
|
||||
|
||||
TEST(DBTest, ExponentialBackoff) {
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
Reopen(&options);
|
||||
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
ASSERT_EQ("v1", Get("foo"));
|
||||
Compact("a", "z");
|
||||
env_->non_writable_.Release_Store(env_); // Force errors for new files
|
||||
env_->sleep_counter_.Reset();
|
||||
env_->sleep_time_counter_.Reset();
|
||||
for (int i = 0; i < 5; i++) {
|
||||
dbfull()->TEST_CompactRange(2, NULL, NULL);
|
||||
}
|
||||
env_->non_writable_.Release_Store(NULL);
|
||||
|
||||
// Wait for compaction to finish
|
||||
DelayMilliseconds(1000);
|
||||
|
||||
ASSERT_GE(env_->sleep_counter_.Read(), 5);
|
||||
ASSERT_LT(env_->sleep_counter_.Read(), 10);
|
||||
ASSERT_GE(env_->sleep_time_counter_.Read(), 10e6);
|
||||
}
|
||||
|
||||
TEST(DBTest, NonWritableFileSystem) {
|
||||
Options options = CurrentOptions();
|
||||
options.write_buffer_size = 1000;
|
||||
@@ -1519,7 +1566,7 @@ TEST(DBTest, NonWritableFileSystem) {
|
||||
fprintf(stderr, "iter %d; errors %d\n", i, errors);
|
||||
if (!Put("foo", big).ok()) {
|
||||
errors++;
|
||||
env_->SleepForMicroseconds(100000);
|
||||
DelayMilliseconds(100);
|
||||
}
|
||||
}
|
||||
ASSERT_GT(errors, 0);
|
||||
@@ -1567,6 +1614,24 @@ TEST(DBTest, ManifestWriteError) {
|
||||
}
|
||||
}
|
||||
|
||||
TEST(DBTest, MissingSSTFile) {
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
|
||||
// Dump the memtable to disk.
|
||||
dbfull()->TEST_CompactMemTable();
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
|
||||
Close();
|
||||
ASSERT_TRUE(DeleteAnSSTFile());
|
||||
Options options = CurrentOptions();
|
||||
options.paranoid_checks = true;
|
||||
Status s = TryReopen(&options);
|
||||
ASSERT_TRUE(!s.ok());
|
||||
ASSERT_TRUE(s.ToString().find("issing") != std::string::npos)
|
||||
<< s.ToString();
|
||||
}
|
||||
|
||||
TEST(DBTest, FilesDeletedAfterCompaction) {
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Compact("a", "z");
|
||||
@@ -1711,13 +1776,13 @@ TEST(DBTest, MultiThreaded) {
|
||||
}
|
||||
|
||||
// Let them run for a while
|
||||
env_->SleepForMicroseconds(kTestSeconds * 1000000);
|
||||
DelayMilliseconds(kTestSeconds * 1000);
|
||||
|
||||
// Stop the threads and wait for them to finish
|
||||
mt.stop.Release_Store(&mt);
|
||||
for (int id = 0; id < kNumThreads; id++) {
|
||||
while (mt.thread_done[id].Acquire_Load() == NULL) {
|
||||
env_->SleepForMicroseconds(100000);
|
||||
DelayMilliseconds(100);
|
||||
}
|
||||
}
|
||||
} while (ChangeOptions());
|
||||
|
||||
Reference in New Issue
Block a user