From fe9710b47a9e6339b7b4048d9729bf256e49741b Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 4 Dec 2024 13:22:39 +0000 Subject: [PATCH 1/4] Allow db_id prefix to be empty when undefined (#464) There is a db_id of 0 (for example for vnode 0 in a riak cluster) - so naming databases without an id 0 creates confusion --- src/leveled_log.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/leveled_log.erl b/src/leveled_log.erl index af64b543..afa96425 100644 --- a/src/leveled_log.erl +++ b/src/leveled_log.erl @@ -41,7 +41,7 @@ -record(log_options, {log_level = info :: log_level(), forced_logs = [] :: [atom()], - database_id = 0 :: non_neg_integer()}). + database_id :: non_neg_integer()|undefined}). -type log_level() :: debug | info | warning | error | critical. -type log_options() :: #log_options{}. @@ -490,11 +490,15 @@ log_randomtimer(LogReference, Subs, StartTime, RandomProb) -> ok end. --spec log_prefix(atom(), non_neg_integer(), pid()) -> io_lib:chars(). +-spec log_prefix(atom(), non_neg_integer()|undefined, pid()) -> io_lib:chars(). +log_prefix(LogRef, undefined, Pid) -> + ["log_ref=", atom_to_list(LogRef), " pid=", pid_to_list(Pid), " "]; log_prefix(LogRef, DBid, Pid) -> - ["log_ref=", atom_to_list(LogRef), + [ + "log_ref=", atom_to_list(LogRef), " db_id=", integer_to_list(DBid), - " pid=", pid_to_list(Pid), " "]. + " pid=", pid_to_list(Pid), " " + ]. -spec duration_text(erlang:timestamp()) -> io_lib:chars(). duration_text(StartTime) -> From 03a2092ec7e1639acca705d9f71ff4253a4bdd13 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Wed, 15 Jan 2025 21:00:17 +0000 Subject: [PATCH 2/4] Mas d34 leveled.i465 stopfold (#467) * Test and fix - issue with folding beyond JournalSQN Test previously fails, as even on a fast machine the fold goes on for 5s beyond the last object found. With change to reduce batch size, and stop when batch goes beyond JournalSQN - success with << 100ms spent folding after the last object discovered * Wait after suite for delete_pending to close https://github.com/martinsumner/leveled/issues/462 * Avoid processing key changes in object fold runner As the key changes are going to be discarded --- include/leveled.hrl | 2 +- rebar.config | 4 +- src/leveled_codec.erl | 1 + src/leveled_inker.erl | 104 ++++++++++++++++++++------------- src/leveled_runner.erl | 18 +++--- test/end_to_end/riak_SUITE.erl | 59 +++++++++++++++++++ test/end_to_end/testutil.erl | 3 +- 7 files changed, 137 insertions(+), 54 deletions(-) diff --git a/include/leveled.hrl b/include/leveled.hrl index 5244e380..e6596330 100644 --- a/include/leveled.hrl +++ b/include/leveled.hrl @@ -35,7 +35,7 @@ -define(MAX_SSTSLOTS, 256). -define(MAX_MERGEBELOW, 24). -define(LOADING_PAUSE, 1000). --define(LOADING_BATCH, 1000). +-define(LOADING_BATCH, 200). -define(CACHE_SIZE_JITTER, 25). -define(JOURNAL_SIZE_JITTER, 20). -define(LONG_RUNNING, 1000000). diff --git a/rebar.config b/rebar.config index 454f40fc..c9696cad 100644 --- a/rebar.config +++ b/rebar.config @@ -31,8 +31,8 @@ ]}. {deps, [ - {lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}}, - {zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}}, + {lz4, ".*", {git, "https://github.com/OpenRiak/erlang-lz4", {branch, "openriak-3.4"}}}, + {zstd, ".*", {git, "https://github.com/OpenRiak/zstd-erlang", {branch, "openriak-3.2"}}}, {eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}} ]}. diff --git a/src/leveled_codec.erl b/src/leveled_codec.erl index 517f9c60..81797993 100644 --- a/src/leveled_codec.erl +++ b/src/leveled_codec.erl @@ -46,6 +46,7 @@ maybe_compress/2, create_value_for_journal/3, revert_value_from_journal/1, + revert_value_from_journal/2, generate_ledgerkv/5, get_size/2, get_keyandobjhash/2, diff --git a/src/leveled_inker.erl b/src/leveled_inker.erl index 9721267a..8d129c02 100644 --- a/src/leveled_inker.erl +++ b/src/leveled_inker.erl @@ -565,7 +565,8 @@ handle_call({fold, Folder = fun() -> fold_from_sequence( - StartSQN, + StartSQN, + State#state.journal_sqn, {FilterFun, InitAccFun, FoldFun}, Acc, Manifest @@ -1240,8 +1241,12 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> --spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list()) - -> any(). +-spec fold_from_sequence( + non_neg_integer(), + pos_integer(), + {fun(), fun(), fun()}, + any(), + list()) -> any(). %% @doc %% %% Scan from the starting sequence number to the end of the Journal. Apply @@ -1249,71 +1254,88 @@ start_new_activejournal(SQN, RootPath, CDBOpts) -> %% objects - and then apply the FoldFun to the batch once the batch is %% complete %% -%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest +%% Inputs - MinSQN, JournalSQN, FoldFuns, OverallAccumulator, Inker's Manifest %% %% The fold loops over all the CDB files in the Manifest. Each file is looped %% over in batches using foldfile_between_sequence/7. The batch is a range of %% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted %% files -fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) -> +fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) -> Acc; -fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) - when LowSQN >= MinSQN -> - {NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN), - fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest); -fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> +fold_from_sequence( + MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest]) + when LowSQN >= MinSQN -> + {NextMinSQN, Acc0} = + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ), + fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest); +fold_from_sequence( + MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) -> % If this file has a LowSQN less than the minimum, we can skip it if the % next file also has a LowSQN below the minimum {NextMinSQN, Acc0} = case Rest of [] -> - foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN); + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ); [{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN -> - foldfile_between_sequence(MinSQN, - MinSQN + ?LOADING_BATCH, - FoldFuns, - Acc, - Pid, - undefined, - FN); + foldfile_between_sequence( + MinSQN, + MinSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + Acc, + Pid, + undefined, + FN + ); _ -> {MinSQN, Acc} end, - fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest). + fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest). -foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns, - Acc, CDBpid, StartPos, FN) -> +foldfile_between_sequence( + MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) -> {FilterFun, InitAccFun, FoldFun} = FoldFuns, InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)}, case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of {eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} -> {AccMinSQN, FoldFun(BatchAcc, Acc)}; + {_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}} + when AccMinSQN >= JournalSQN -> + {AccMinSQN, FoldFun(BatchAcc, Acc)}; {LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} -> UpdAcc = FoldFun(BatchAcc, Acc), NextSQN = MaxSQN + 1, - foldfile_between_sequence(NextSQN, - NextSQN + ?LOADING_BATCH, - FoldFuns, - UpdAcc, - CDBpid, - LastPosition, - FN) + foldfile_between_sequence( + NextSQN, + NextSQN + ?LOADING_BATCH, + JournalSQN, + FoldFuns, + UpdAcc, + CDBpid, + LastPosition, + FN + ) end. - sequencenumbers_fromfilenames(Filenames, Regex, IntName) -> lists:foldl( fun(FN, Acc) -> diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index b1c3f219..c8d241de 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -322,7 +322,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> FilterFun = fun(JKey, JVal, _Pos, Acc, ExtractFun) -> {SQN, InkTag, LedgerKey} = JKey, - case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of + case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of {?INKT_STND, {B, K}} -> % Ignore tombstones and non-matching Tags and Key changes % objects. @@ -335,14 +335,14 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) -> _ -> {VBin, _VSize} = ExtractFun(JVal), {Obj, _IdxSpecs} = - leveled_codec:revert_value_from_journal(VBin), - ToLoop = - case SQN of - MaxSQN -> stop; - _ -> loop - end, - {ToLoop, - {MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}} + leveled_codec:revert_value_from_journal( + VBin, + true + ), + { + case SQN of MaxSQN -> stop; _ -> loop end, + {MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]} + } end; _ -> {loop, Acc} diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 8d31dd71..81c1f6fa 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -9,6 +9,7 @@ fetchclocks_modifiedbetween/1, crossbucket_aae/1, handoff/1, + handoff_close/1, dollar_bucket_index/1, dollar_key_index/1, bigobject_memorycheck/1, @@ -22,6 +23,7 @@ all() -> [ fetchclocks_modifiedbetween, crossbucket_aae, handoff, + handoff_close, dollar_bucket_index, dollar_key_index, bigobject_memorycheck, @@ -1697,6 +1699,63 @@ dollar_key_index(_Config) -> ok = leveled_bookie:book_close(Bookie1), testutil:reset_filestructure(). +handoff_close(_Config) -> + RootPath = testutil:reset_filestructure(), + KeyCount = 500000, + Bucket = {<<"BType">>, <<"BName">>}, + StartOpts1 = + [ + {root_path, RootPath}, + {max_journalobjectcount, KeyCount + 1}, + {max_pencillercachesize, 12000}, + {sync_strategy, testutil:sync_strategy()} + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjList1 = + testutil:generate_objects( + KeyCount div 10, + {fixed_binary, 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + ObjList2 = + testutil:generate_objects( + KeyCount - (KeyCount div 10), + {fixed_binary, KeyCount div 10 + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList1), + FoldObjectsFun = + fun(_, _, _, Acc) -> + [os:timestamp()|Acc] + end, + {async, Runner} = + leveled_bookie:book_objectfold( + Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + true, + sqn_order + ), + testutil:riakload(Bookie1, ObjList2), + TSList = Runner(), + QueryCompletionTime = os:timestamp(), + LastTS = hd(TSList), + io:format( + "Found ~w objects with Last TS ~w completion time ~w~n", + [length(TSList), LastTS, QueryCompletionTime] + ), + true = KeyCount div 10 == length(TSList), + TimeSinceLastObjectTouchedMS = + timer:now_diff(QueryCompletionTime, LastTS) div 1000, + true = TimeSinceLastObjectTouchedMS < 1000, + leveled_bookie:book_destroy(Bookie1), + testutil:reset_filestructure(). + + %% @doc test that the riak specific $bucket indexes can be iterated %% using leveled's existing folders dollar_bucket_index(_Config) -> diff --git a/test/end_to_end/testutil.erl b/test/end_to_end/testutil.erl index e437b1bb..32a569de 100644 --- a/test/end_to_end/testutil.erl +++ b/test/end_to_end/testutil.erl @@ -143,7 +143,8 @@ end_per_suite(_Config) -> ok = logger:set_primary_config(level, notice), ok = logger:set_handler_config(default, level, all), ok = logger:set_handler_config(cth_log_redirect, level, all), - + % 10s delay to allow for any delete_pending files to close wihtout crashing + timer:sleep(10000), ok. riak_object(Bucket, Key, Value, MetaData) -> From 8960234b12c85fdb716e3e4e01ebe13ff48c2649 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Thu, 16 Jan 2025 12:04:53 +0000 Subject: [PATCH 3/4] Test of sqn_order fold with compaction (#469) * Test of sqn_order fold with compaction Exposes issue with delete_pending not supporting cdb_scan. * Reduce default max_run_length to ensure compaction triggers --- src/leveled_cdb.erl | 2 + test/end_to_end/riak_SUITE.erl | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/src/leveled_cdb.erl b/src/leveled_cdb.erl index d53f8e92..f8e3860f 100644 --- a/src/leveled_cdb.erl +++ b/src/leveled_cdb.erl @@ -808,6 +808,8 @@ delete_pending( leveled_log:log(cdb05, [FN, delete_pending, cdb_close]), close_pendingdelete(IO, FN, State#state.waste_path), {stop_and_reply, normal, [{reply, From, ok}]}; +delete_pending({call, From}, Event, State) -> + handle_sync_event(Event, From, State); delete_pending( cast, delete_confirmed, State = #state{handle = IO, filename = FN}) when ?IS_DEF(FN), ?IS_DEF(IO) -> diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index 81c1f6fa..be2ed5a8 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -10,6 +10,7 @@ crossbucket_aae/1, handoff/1, handoff_close/1, + handoff_withcompaction/1, dollar_bucket_index/1, dollar_key_index/1, bigobject_memorycheck/1, @@ -24,6 +25,7 @@ all() -> [ crossbucket_aae, handoff, handoff_close, + handoff_withcompaction, dollar_bucket_index, dollar_key_index, bigobject_memorycheck, @@ -1756,6 +1758,92 @@ handoff_close(_Config) -> testutil:reset_filestructure(). +handoff_withcompaction(_Config) -> + RootPath = testutil:reset_filestructure(), + KeyCount = 100000, + Bucket = {<<"BType">>, <<"BName">>}, + StartOpts1 = + [ + {root_path, RootPath}, + {max_journalobjectcount, KeyCount div 4}, + {max_pencillercachesize, 12000}, + {sync_strategy, testutil:sync_strategy()}, + {max_run_length, 4} + ], + {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), + ObjList1 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList1), + ObjList2 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, (KeyCount div 4) + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList2), + ObjList3 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, (KeyCount div 4) * 2 + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList3), + ObjList4 = + testutil:generate_objects( + KeyCount div 4, + {fixed_binary, (KeyCount div 4) * 3 + 1}, [], + crypto:strong_rand_bytes(512), + fun() -> [] end, + Bucket + ), + testutil:riakload(Bookie1, ObjList4), + % Now update some objects to prompt compaction + testutil:update_some_objects(Bookie1, ObjList1, KeyCount div 8), + testutil:update_some_objects(Bookie1, ObjList2, KeyCount div 8), + testutil:update_some_objects(Bookie1, ObjList3, KeyCount div 8), + testutil:update_some_objects(Bookie1, ObjList4, KeyCount div 8), + + % Setup a handoff-style fold to snapshot journal + FoldObjectsFun = + fun(_, K, _, Acc) -> + [K|Acc] + end, + {async, Runner} = + leveled_bookie:book_objectfold( + Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + true, + sqn_order + ), + + % Now compact the journal, twice to be sure + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), + ok = leveled_bookie:book_compactjournal(Bookie1, 30000), + testutil:wait_for_compaction(Bookie1), + + % Run the fold - some cdb files should now be delete_pending + {TC0, Results} = timer:tc(Runner), + io:format( + "Found ~w objects in ~w ms~n", + [length(Results), TC0 div 1000] + ), + true = KeyCount == length(Results), + leveled_bookie:book_destroy(Bookie1), + testutil:reset_filestructure(). + + %% @doc test that the riak specific $bucket indexes can be iterated %% using leveled's existing folders dollar_bucket_index(_Config) -> From 514526fafb702c2de075041eb644ca7be326b7a1 Mon Sep 17 00:00:00 2001 From: Martin Sumner Date: Mon, 20 Jan 2025 18:56:50 +0000 Subject: [PATCH 4/4] Add ability to defer fetching a value in the fold - but without requiring Journal check (#472) * Add ability to defer fetching a value in the fold - but without requring check presence Reduces the cost of using head folds in partition repairs, where the check is not necessary as a fetch failure will be caught and prompt read repair. * Update types for JournalCheck * Fix tuple mistake (#473) Otherwise leveled_pclerk will crash if there is a partial merge that results in an an empty addition --- src/leveled_bookie.erl | 98 ++++++----- src/leveled_pclerk.erl | 2 +- src/leveled_runner.erl | 28 ++-- test/end_to_end/riak_SUITE.erl | 295 ++++++++++++++++++++------------- 4 files changed, 255 insertions(+), 168 deletions(-) diff --git a/src/leveled_bookie.erl b/src/leveled_bookie.erl index 459cc99d..5236d448 100644 --- a/src/leveled_bookie.erl +++ b/src/leveled_bookie.erl @@ -946,15 +946,22 @@ book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) -> %% `Acc'. The ProxyObject is an object that only contains the %% head/metadata, and no object data from the journal. The `Acc' in %% the first call is that provided as the second element of `FoldAccT' -%% and thereafter the return of the previous all to the fold fun. If -%% `JournalCheck' is `true' then the journal is checked to see if the -%% object in the ledger is present, which means a snapshot of the -%% whole store is required, if `false', then no such check is -%% performed, and onlt ledger need be snapshotted. `SnapPreFold' is a -%% boolean that determines if the snapshot is taken when the folder is -%% requested `true', or when when run `false'. `SegmentList' can be -%% `false' meaning, all heads, or a list of integers that designate -%% segments in a TicTac Tree. +%% and thereafter the return of the previous all to the fold fun. +%% +%% If `JournalCheck' is `true' then the journal is checked to see if the +%% object in the ledger is present, which means a snapshot of the whole store +%% is required, if `false', then no such check is performed, and only ledger +%% need be snapshotted. However, if the intention is to defer fetching the +%% value but don't wish to cost of chekcing the Journal to be made during the +%% fold (e.g. as any exception will be handled later), then the `defer` +%% option can be used. This will snapshot the Journal, but not check for +%% presence. Note that the fetch must still be made within the timefroma of +%% the fold (as the snapshot will expire with the fold). +%% +%% `SnapPreFold' is a boolean that determines if the snapshot is taken when +%% the folder is requested `true', or when when run `false'. `SegmentList' can +%% be `false' meaning, all heads, or a list of integers that designate segments +%% in a TicTac Tree. -spec book_headfold(pid(), Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> {async, Runner} when Tag :: leveled_codec:tag(), @@ -964,7 +971,7 @@ book_objectfold(Pid, Tag, Bucket, Limiter, FoldAccT, SnapPreFold) -> Bucket :: term(), Key :: term(), Value :: term(), - JournalCheck :: boolean(), + JournalCheck :: boolean()|defer, SnapPreFold :: boolean(), SegmentList :: false | list(integer()), Runner :: fun(() -> Acc). @@ -999,7 +1006,7 @@ book_headfold(Pid, Tag, FoldAccT, JournalCheck, SnapPreFold, SegmentList) -> Bucket :: term(), Key :: term(), Value :: term(), - JournalCheck :: boolean(), + JournalCheck :: boolean()|defer, SnapPreFold :: boolean(), SegmentList :: false | list(integer()), Runner :: fun(() -> Acc). @@ -1032,7 +1039,7 @@ book_headfold(Pid, Tag, Limiter, FoldAccT, JournalCheck, SnapPreFold, SegmentLis Bucket :: term(), Key :: term(), Value :: term(), - JournalCheck :: boolean(), + JournalCheck :: boolean()|defer, SnapPreFold :: boolean(), SegmentList :: false | list(integer()), LastModRange :: false | leveled_codec:lastmod_range(), @@ -1989,7 +1996,7 @@ return_snapfun( fun() -> {ok, LS, JS, fun() -> ok end} end end. --spec snaptype_by_presence(boolean()) -> store|ledger. +-spec snaptype_by_presence(boolean()|defer) -> store|ledger. %% @doc %% Folds that traverse over object heads, may also either require to return %% the object, or at least confirm the object is present in the Ledger. This @@ -1998,6 +2005,8 @@ return_snapfun( %% rather than just the ledger. snaptype_by_presence(true) -> store; +snaptype_by_presence(defer) -> + store; snaptype_by_presence(false) -> ledger. @@ -2030,9 +2039,8 @@ get_runner(State, {keylist, Tag, Bucket, FoldAccT}) -> leveled_runner:bucketkey_query(SnapFun, Tag, Bucket, FoldAccT); get_runner(State, {keylist, Tag, Bucket, KeyRange, FoldAccT, TermRegex}) -> SnapFun = return_snapfun(State, ledger, no_lookup, true, true), - leveled_runner:bucketkey_query(SnapFun, - Tag, Bucket, KeyRange, - FoldAccT, TermRegex); + leveled_runner:bucketkey_query( + SnapFun, Tag, Bucket, KeyRange, FoldAccT, TermRegex); %% Set of runners for object or metadata folds get_runner(State, {foldheads_allkeys, @@ -2041,10 +2049,15 @@ get_runner(State, LastModRange, MaxObjectCount}) -> SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold), - leveled_runner:foldheads_allkeys(SnapFun, - Tag, FoldFun, - JournalCheck, SegmentList, - LastModRange, MaxObjectCount); + leveled_runner:foldheads_allkeys( + SnapFun, + Tag, + FoldFun, + JournalCheck, + SegmentList, + LastModRange, + MaxObjectCount + ); get_runner(State, {foldobjects_allkeys, Tag, FoldFun, SnapPreFold}) -> get_runner(State, @@ -2071,13 +2084,16 @@ get_runner(State, end, SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, no_lookup, true, SnapPreFold), - leveled_runner:foldheads_bybucket(SnapFun, - Tag, - lists:map(KeyRangeFun, BucketList), - FoldFun, - JournalCheck, - SegmentList, - LastModRange, MaxObjectCount); + leveled_runner:foldheads_bybucket( + SnapFun, + Tag, + lists:map(KeyRangeFun, BucketList), + FoldFun, + JournalCheck, + SegmentList, + LastModRange, + MaxObjectCount + ); get_runner(State, {foldheads_bybucket, Tag, @@ -2088,13 +2104,16 @@ get_runner(State, {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), SnapType = snaptype_by_presence(JournalCheck), SnapFun = return_snapfun(State, SnapType, SnapQ, true, SnapPreFold), - leveled_runner:foldheads_bybucket(SnapFun, - Tag, - [{StartKey, EndKey}], - FoldFun, - JournalCheck, - SegmentList, - LastModRange, MaxObjectCount); + leveled_runner:foldheads_bybucket( + SnapFun, + Tag, + [{StartKey, EndKey}], + FoldFun, + JournalCheck, + SegmentList, + LastModRange, + MaxObjectCount + ); get_runner(State, {foldobjects_bybucket, Tag, Bucket, KeyRange, @@ -2102,19 +2121,16 @@ get_runner(State, SnapPreFold}) -> {StartKey, EndKey, SnapQ} = return_ledger_keyrange(Tag, Bucket, KeyRange), SnapFun = return_snapfun(State, store, SnapQ, true, SnapPreFold), - leveled_runner:foldobjects_bybucket(SnapFun, - Tag, - [{StartKey, EndKey}], - FoldFun); + leveled_runner:foldobjects_bybucket( + SnapFun, Tag, [{StartKey, EndKey}], FoldFun); get_runner(State, {foldobjects_byindex, Tag, Bucket, {Field, FromTerm, ToTerm}, FoldObjectsFun, SnapPreFold}) -> SnapFun = return_snapfun(State, store, no_lookup, true, SnapPreFold), - leveled_runner:foldobjects_byindex(SnapFun, - {Tag, Bucket, Field, FromTerm, ToTerm}, - FoldObjectsFun); + leveled_runner:foldobjects_byindex( + SnapFun, {Tag, Bucket, Field, FromTerm, ToTerm},FoldObjectsFun); get_runner(State, {bucket_list, Tag, FoldAccT}) -> {FoldBucketsFun, Acc} = FoldAccT, SnapFun = return_snapfun(State, ledger, no_lookup, false, false), diff --git a/src/leveled_pclerk.erl b/src/leveled_pclerk.erl index 810c481a..245ce98b 100644 --- a/src/leveled_pclerk.erl +++ b/src/leveled_pclerk.erl @@ -421,7 +421,7 @@ do_merge( add_entry(empty, FileName, _TS1, Additions) -> leveled_log:log(pc013, [FileName]), - {[], [], Additions}; + {Additions, [], []}; add_entry({ok, Pid, Reply, Bloom}, FileName, TS1, Additions) -> {{KL1Rem, KL2Rem}, SmallestKey, HighestKey} = Reply, Entry = diff --git a/src/leveled_runner.erl b/src/leveled_runner.erl index c8d241de..43d96e1f 100644 --- a/src/leveled_runner.erl +++ b/src/leveled_runner.erl @@ -269,11 +269,14 @@ tictactree(SnapFun, {Tag, Bucket, Query}, JournalCheck, TreeSize, Filter) -> end, {async, Runner}. --spec foldheads_allkeys(snap_fun(), leveled_codec:tag(), - fold_objects_fun()|{fold_objects_fun(), foldacc()}, - boolean(), false|list(integer()), - false|leveled_codec:lastmod_range(), - false|pos_integer()) -> {async, runner_fun()}. +-spec foldheads_allkeys( + snap_fun(), + leveled_codec:tag(), + fold_objects_fun()|{fold_objects_fun(), foldacc()}, + boolean()|defer, + false|list(integer()), + false|leveled_codec:lastmod_range(), + false|pos_integer()) -> {async, runner_fun()}. %% @doc %% Fold over all heads in the store for a given tag - applying the passed %% function to each proxy object @@ -412,7 +415,7 @@ foldobjects_bybucket(SnapFun, Tag, KeyRanges, FoldFun) -> leveled_codec:tag(), list(key_range()), fold_objects_fun()|{fold_objects_fun(), foldacc()}, - boolean(), + boolean()|defer, false|list(integer()), false|leveled_codec:lastmod_range(), false|pos_integer()) @@ -501,7 +504,7 @@ foldobjects(SnapFun, Tag, KeyRanges, FoldObjFun, DeferredFetch, SegmentList) -> -spec foldobjects(snap_fun(), atom(), list(), fold_objects_fun()|{fold_objects_fun(), foldacc()}, - false|{true, boolean()}, + false|{true, boolean()|defer}, false|list(integer()), false|leveled_codec:lastmod_range(), false|pos_integer()) -> {async, runner_fun()}. @@ -609,9 +612,14 @@ get_hashaccumulator(JournalCheck, InkerClone, AddKeyFun) -> AccFun. -spec accumulate_objects - (fold_objects_fun(), pid(), leveled_head:object_tag(), false|{true, boolean()}) + (fold_objects_fun(), + pid(), + leveled_head:object_tag(), + false|{true, boolean()|defer}) -> objectacc_fun(); - (fold_objects_fun(), null, leveled_head:headonly_tag(), {true, false}) + (fold_objects_fun(), + null, leveled_head:headonly_tag(), + {true, false}) -> objectacc_fun(). accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> AccFun = @@ -652,7 +660,7 @@ accumulate_objects(FoldObjectsFun, InkerClone, Tag, DeferredFetch) -> missing -> Acc end; - {false, _} -> + _ -> FoldObjectsFun(B, K, ProxyObj, Acc) end; false -> diff --git a/test/end_to_end/riak_SUITE.erl b/test/end_to_end/riak_SUITE.erl index be2ed5a8..2446b785 100644 --- a/test/end_to_end/riak_SUITE.erl +++ b/test/end_to_end/riak_SUITE.erl @@ -1129,11 +1129,14 @@ crossbucket_aae(_Config) -> {max_pencillercachesize, 16000}, {sync_strategy, riak_sync}], {ok, Bookie1} = leveled_bookie:book_start(StartOpts1), - {B1, K1, V1, S1, MD} = {<<"Bucket">>, - <<"Key1.1.4567.4321">>, - <<"Value1">>, - [], - [{<<"MDK1">>, <<"MDV1">>}]}, + {B1, K1, V1, S1, MD} = + { + <<"Bucket">>, + <<"Key1.1.4567.4321">>, + <<"Value1">>, + [], + [{<<"MDK1">>, <<"MDV1">>}] + }, {TestObject, TestSpec} = testutil:generate_testobject(B1, K1, V1, S1, MD), ok = testutil:book_riakput(Bookie1, TestObject, TestSpec), testutil:check_forobject(Bookie1, TestObject), @@ -1151,12 +1154,15 @@ crossbucket_aae(_Config) -> GenList = [{binary, 2}, {binary, 40002}, {binary, 80002}, {binary, 120002}], - CLs = testutil:load_objects(40000, - GenList, - Bookie2, - TestObject, - fun testutil:generate_smallobjects/2, - 40000), + CLs = + testutil:load_objects( + 40000, + GenList, + Bookie2, + TestObject, + fun testutil:generate_smallobjects/2, + 40000 + ), %% Check all the objects are found - used to trigger HEAD performance log ok = testutil:checkhead_forlist(Bookie2, lists:nth(1, CLs)), @@ -1480,12 +1486,14 @@ handoff(_Config) -> GenList = [binary_uuid, binary_uuid, binary_uuid, binary_uuid], [CL0, CL1, CL2, CL3] = - testutil:load_objects(40000, - GenList, - Bookie1, - no_check, - fun testutil:generate_smallobjects/2, - 40000), + testutil:load_objects( + 40000, + GenList, + Bookie1, + no_check, + fun testutil:generate_smallobjects/2, + 40000 + ), % Update an delete some objects testutil:update_some_objects(Bookie1, CL0, 1000), @@ -1609,34 +1617,29 @@ handoff(_Config) -> %% leveled's existing folders dollar_key_index(_Config) -> RootPath = testutil:reset_filestructure(), - {ok, Bookie1} = leveled_bookie:book_start(RootPath, - 2000, - 50000000, - testutil:sync_strategy()), + {ok, Bookie1} = + leveled_bookie:book_start( + RootPath, 2000, 50000000, testutil:sync_strategy()), ObjectGen = testutil:get_compressiblevalue_andinteger(), IndexGen = fun() -> [] end, - ObjL1 = testutil:generate_objects(1300, - {fixed_binary, 1}, - [], - ObjectGen, - IndexGen, - <<"Bucket1">>), + ObjL1 = + testutil:generate_objects( + 1300, {fixed_binary, 1}, [], ObjectGen, IndexGen, <<"Bucket1">>), testutil:riakload(Bookie1, ObjL1), - FoldKeysFun = fun(_B, K, Acc) -> - [ K |Acc] - end, + FoldKeysFun = fun(_B, K, Acc) -> [ K |Acc] end, StartKey = testutil:fixed_bin_key(123), EndKey = testutil:fixed_bin_key(779), {async, Folder} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket1">>, - {StartKey, EndKey}, - {FoldKeysFun, []} - ), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {StartKey, EndKey}, + {FoldKeysFun, []} + ), ResLen = length(Folder()), io:format("Length of Result of folder ~w~n", [ResLen]), true = 657 == ResLen, @@ -1645,19 +1648,23 @@ dollar_key_index(_Config) -> {ok, REMiss} = re:compile("key"), {async, FolderREMatch} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket1">>, - {StartKey, EndKey}, - {FoldKeysFun, []}, - REMatch), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {StartKey, EndKey}, + {FoldKeysFun, []}, + REMatch + ), {async, FolderREMiss} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket1">>, - {StartKey, EndKey}, - {FoldKeysFun, []}, - REMiss), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {StartKey, EndKey}, + {FoldKeysFun, []}, + REMiss + ), true = 657 == length(FolderREMatch()), true = 0 == length(FolderREMiss()), @@ -1666,34 +1673,39 @@ dollar_key_index(_Config) -> % $key index query DeleteFun = fun(KeyID) -> - ok = leveled_bookie:book_put(Bookie1, - <<"Bucket1">>, - testutil:fixed_bin_key(KeyID), - delete, [], - ?RIAK_TAG) + ok = + leveled_bookie:book_put( + Bookie1, + <<"Bucket1">>, + testutil:fixed_bin_key(KeyID), + delete, + [], + ?RIAK_TAG + ) end, DelList = [200, 400, 600, 800, 1200], lists:foreach(DeleteFun, DelList), {async, DeleteFolder0} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket1">>, - {StartKey, EndKey}, - {FoldKeysFun, []} - ), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {StartKey, EndKey}, + {FoldKeysFun, []} + ), ResultsDeleteFolder0 = length(DeleteFolder0()), io:format("Length of Result of folder ~w~n", [ResultsDeleteFolder0]), true = 657 - 3 == ResultsDeleteFolder0, {async, DeleteFolder1} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket1">>, - {testutil:fixed_bin_key(1151), - testutil:fixed_bin_key(1250)}, - {FoldKeysFun, []} - ), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket1">>, + {testutil:fixed_bin_key(1151), testutil:fixed_bin_key(1250)}, + {FoldKeysFun, []} + ), ResultsDeleteFolder1 = length(DeleteFolder1()), io:format("Length of Result of folder ~w~n", [ResultsDeleteFolder1]), true = 100 -1 == ResultsDeleteFolder1, @@ -1815,8 +1827,8 @@ handoff_withcompaction(_Config) -> % Setup a handoff-style fold to snapshot journal FoldObjectsFun = - fun(_, K, _, Acc) -> - [K|Acc] + fun(_B, _K, Obj, Acc) -> + [Obj|Acc] end, {async, Runner} = leveled_bookie:book_objectfold( @@ -1836,10 +1848,69 @@ handoff_withcompaction(_Config) -> % Run the fold - some cdb files should now be delete_pending {TC0, Results} = timer:tc(Runner), io:format( - "Found ~w objects in ~w ms~n", + "Found ~w objects (sqn_order) in ~w ms~n", [length(Results), TC0 div 1000] ), true = KeyCount == length(Results), + + FoldAndFetchFun = + fun(_B, _K, PO, Acc) -> + { + proxy_object, + _HeadBin, + _Size, + {FetchFun, Clone, Ref} + } = binary_to_term(PO), + Obj = FetchFun(Clone, Ref), + [Obj|Acc] + end, + {async, HeadFolder} = + leveled_bookie:book_headfold( + Bookie1, + ?RIAK_TAG, + {FoldAndFetchFun, []}, + true, + false, + false + ), + {TC1, HeadWithFetchResults} = timer:tc(HeadFolder), + io:format( + "Found ~w objects (check_presence) in ~w ms~n", + [length(HeadWithFetchResults), TC1 div 1000] + ), + {async, KO_ObjRunner} = + leveled_bookie:book_objectfold( + Bookie1, + ?RIAK_TAG, + {FoldObjectsFun, []}, + false, + key_order + ), + {TC2, ObjectInKeyOrderResults} = timer:tc(KO_ObjRunner), + io:format( + "Found ~w objects (key_order) in ~w ms~n", + [length(HeadWithFetchResults), TC2 div 1000] + ), + + true = ObjectInKeyOrderResults == HeadWithFetchResults, + + {async, HeadFolderDefer} = + leveled_bookie:book_headfold( + Bookie1, + ?RIAK_TAG, + {FoldAndFetchFun, []}, + defer, + false, + false + ), + {TC3, HeadWithDeferFetchResults} = timer:tc(HeadFolderDefer), + io:format( + "Found ~w objects (no check_presence) in ~w ms~n", + [length(HeadWithDeferFetchResults), TC3 div 1000] + ), + + true = HeadWithFetchResults == HeadWithDeferFetchResults, + leveled_bookie:book_destroy(Bookie1), testutil:reset_filestructure(). @@ -1848,45 +1919,31 @@ handoff_withcompaction(_Config) -> %% using leveled's existing folders dollar_bucket_index(_Config) -> RootPath = testutil:reset_filestructure(), - {ok, Bookie1} = leveled_bookie:book_start(RootPath, - 2000, - 50000000, - testutil:sync_strategy()), + {ok, Bookie1} = + leveled_bookie:book_start( + RootPath, 2000, 50000000, testutil:sync_strategy()), ObjectGen = testutil:get_compressiblevalue_andinteger(), IndexGen = fun() -> [] end, - ObjL1 = testutil:generate_objects(1300, - uuid, - [], - ObjectGen, - IndexGen, - <<"Bucket1">>), + ObjL1 = + testutil:generate_objects( + 1300, uuid, [], ObjectGen, IndexGen, <<"Bucket1">>), testutil:riakload(Bookie1, ObjL1), - ObjL2 = testutil:generate_objects(1700, - uuid, - [], - ObjectGen, - IndexGen, - <<"Bucket2">>), + ObjL2 = + testutil:generate_objects( + 1700, uuid, [], ObjectGen, IndexGen, <<"Bucket2">>), testutil:riakload(Bookie1, ObjL2), - ObjL3 = testutil:generate_objects(7000, - uuid, - [], - ObjectGen, - IndexGen, - <<"Bucket3">>), + ObjL3 = + testutil:generate_objects( + 7000, uuid, [], ObjectGen, IndexGen, <<"Bucket3">>), testutil:riakload(Bookie1, ObjL3), - FoldKeysFun = fun(B, K, Acc) -> - [{B, K}|Acc] - end, + FoldKeysFun = fun(B, K, Acc) -> [{B, K}|Acc] end, FoldAccT = {FoldKeysFun, []}, {async, Folder} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket2">>, - FoldAccT), + leveled_bookie:book_keylist( + Bookie1, ?RIAK_TAG, <<"Bucket2">>, FoldAccT), Results = Folder(), true = 1700 == length(Results), @@ -1897,26 +1954,32 @@ dollar_bucket_index(_Config) -> {ok, REMiss} = re:compile("no_key"), {async, FolderREMiss} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket2">>, - {null, null}, - {FoldKeysFun, []}, - REMiss), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket2">>, + {null, null}, + {FoldKeysFun, []}, + REMiss + ), {async, FolderRESingleMatch} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket2">>, - {null, null}, - {FoldKeysFun, []}, - RESingleMatch), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket2">>, + {null, null}, + {FoldKeysFun, []}, + RESingleMatch + ), {async, FolderREAllMatch} = - leveled_bookie:book_keylist(Bookie1, - ?RIAK_TAG, - <<"Bucket2">>, - {null, null}, - {FoldKeysFun, []}, - REAllMatch), + leveled_bookie:book_keylist( + Bookie1, + ?RIAK_TAG, + <<"Bucket2">>, + {null, null}, + {FoldKeysFun, []}, + REAllMatch + ), true = 0 == length(FolderREMiss()), true = 1 == length(FolderRESingleMatch()),