@@ -6,8 +6,8 @@ defmodule B4.KeydirOwner do
66 alias B4 . { Files , Keydir , Writer }
77
88 defmodule State do
9- @ enforce_keys [ :directory , :tid ]
10- defstruct [ :directory , :tid ]
9+ @ enforce_keys [ :directory , :tid , :target_file_size ]
10+ defstruct [ :directory , :tid , :target_file_size ]
1111 end
1212
1313 def start_link ( % { directory: directory } = args ) do
@@ -23,7 +23,7 @@ defmodule B4.KeydirOwner do
2323 end
2424
2525 @ impl GenServer
26- def init ( % { directory: directory } = _init_arg ) do
26+ def init ( % { directory: directory , options: [ target_file_size: target_file_size ] } = _init_arg ) do
2727 tid = Keydir . new ( )
2828
2929 :ok = :persistent_term . put ( { :tid , directory } , tid )
@@ -34,12 +34,15 @@ defmodule B4.KeydirOwner do
3434 Files . apply_file_to_keydir ( path , tid )
3535 end )
3636
37- { :ok , % State { directory: directory , tid: tid } }
37+ { :ok , % State { directory: directory , tid: tid , target_file_size: target_file_size } }
3838 end
3939
4040 @ impl GenServer
41- def handle_call ( :merge , _from , % State { directory: directory } = state ) do
42- # TODO do merge here
41+ def handle_call (
42+ :merge ,
43+ _from ,
44+ % State { directory: directory , tid: tid , target_file_size: target_file_size } = state
45+ ) do
4346 # two sets:
4447 # old read file set
4548 # new read file set
@@ -51,20 +54,145 @@ defmodule B4.KeydirOwner do
5154 # ELSE
5255 # skip
5356 # END
54- write_file_id = Writer . write_file_id ( directory )
55-
56- read_only_database_files = Files . read_only_database_files ( directory , write_file_id )
57-
58- Enum . each ( read_only_database_files , fn path ->
59- nil
57+ #
58+ # at given any time, there can only be ONE live entry
59+ # for a given key,
60+ # meaning any other entries for a given key are
61+ # therefor old, and free for deletion
62+ current_write_file_id = Writer . write_file_id ( directory )
63+
64+ read_only_database_files = Files . read_only_database_files ( directory , current_write_file_id )
65+
66+ { :ok , % { write_file: merge_write_file , file_id: merge_write_file_id } } =
67+ Writer . new_write_file ( directory )
68+
69+ Enum . reduce ( read_only_database_files , % { merge_file_ids: MapSet . new ( ) } , fn path , outer_acc ->
70+ acc_for_file =
71+ path
72+ |> Files . stream_entries ( )
73+ |> Enum . reduce (
74+ % {
75+ merge_write_file: merge_write_file ,
76+ merge_write_file_id: merge_write_file_id ,
77+ merge_file_ids: MapSet . new ( ) ,
78+ merge_write_file_position: 0
79+ } ,
80+ fn % {
81+ entry: % {
82+ crc32: crc32 ,
83+ entry_id: on_disk_entry_id ,
84+ key_size: key_size ,
85+ value_size: value_size ,
86+ key_bytes: key_bytes ,
87+ value_bytes: value_bytes
88+ } ,
89+ meta: % { }
90+ } ,
91+ % {
92+ merge_write_file: merge_write_file ,
93+ merge_write_file_id: merge_write_file_id ,
94+ merge_write_file_position: merge_write_file_position
95+ } =
96+ acc ->
97+ key = :erlang . binary_to_term ( key_bytes )
98+
99+ case Keydir . fetch ( tid , key ) do
100+ { :ok , { _key , _file_id , _entry_size , _file_position , keydir_entry_id } }
101+ when keydir_entry_id == on_disk_entry_id ->
102+ { :ok ,
103+ % {
104+ merge_write_file: merge_write_file ,
105+ merge_write_file_id: merge_write_file_id ,
106+ merge_write_file_position: merge_write_file_position
107+ } } =
108+ if merge_write_file_position >= target_file_size do
109+ { :ok , % { write_file: merge_write_file , file_id: merge_write_file_id } } =
110+ Writer . new_write_file ( directory )
111+
112+ { :ok ,
113+ % {
114+ merge_write_file: merge_write_file ,
115+ merge_write_file_id: merge_write_file_id ,
116+ merge_write_file_position: 0
117+ } }
118+ else
119+ { :ok ,
120+ % {
121+ merge_write_file: merge_write_file ,
122+ merge_write_file_id: merge_write_file_id ,
123+ merge_write_file_position: merge_write_file_position
124+ } }
125+ end
126+
127+ entry =
128+ [
129+ Writer . int_to_u32_bytes ( crc32 ) ,
130+ Writer . int_to_u128_bytes ( on_disk_entry_id ) ,
131+ Writer . int_to_u32_bytes ( key_size ) ,
132+ Writer . int_to_u32_bytes ( value_size ) ,
133+ key_bytes ,
134+ value_bytes
135+ ]
136+
137+ :ok = :file . write ( merge_write_file , entry )
138+
139+ entry_size = :erlang . iolist_size ( entry )
140+
141+ true =
142+ Keydir . insert (
143+ tid ,
144+ key ,
145+ merge_write_file_id ,
146+ entry_size ,
147+ merge_write_file_position ,
148+ on_disk_entry_id
149+ )
150+
151+ % {
152+ acc
153+ | merge_write_file: merge_write_file ,
154+ merge_write_file_id: merge_write_file_id ,
155+ merge_write_file_position: acc . merge_write_file_position + entry_size ,
156+ merge_file_ids: MapSet . put ( acc . merge_file_ids , merge_write_file_id )
157+ }
158+
159+ # the entry isn't in the keydir,
160+ # so it isn't live anymore,
161+ # so skip it
162+ :error ->
163+ acc
164+
165+ # the ids for the given key didn't match,
166+ # so they are old version of that key,
167+ # so we ignore them
168+ _ ->
169+ acc
170+ end
171+ end
172+ )
173+
174+ Map . update! ( outer_acc , :merge_file_ids , fn merge_file_ids ->
175+ MapSet . union ( merge_file_ids , acc_for_file [ :merge_file_ids ] )
176+ end )
60177 end )
61178
62- # - read every entry in the read_only_database_files
63- # - if the key is in keydir and the id == current id:
64- # keep the key, add to new read file set, update keydir
65- # else
66- # skip
67- # end
179+ # |> IO.inspect(label: "merge file acc state")
180+
181+ # deleting these files happens AFTER
182+ # the keydir has been updated with the new location
183+ # on disk for the given entry,
184+ # so this data is "dead",
185+ # and there are no readers of it,
186+ # barring some reader that has initiated some incredibly
187+ # slow read prior to the merge starting.
188+ #
189+ # TODO
190+ # this is a possibility, but not a huge one,
191+ # so we should probably guard for this in the future in some way.
192+ Enum . each ( read_only_database_files , fn old_read_only_file ->
193+ File . rm! ( old_read_only_file )
194+ # IO.inspect("deleted #{old_read_only_file}")
195+ end )
68196
69197 { :reply , :ok , state }
70198 end
0 commit comments