@@ -23,7 +23,7 @@ defmodule B4.KeydirOwner do
2323 end
2424
2525 @ impl GenServer
26- def init ( % { directory: directory , options: [ target_file_size: target_file_size ] } = _init_arg ) do
26+ def init ( % { directory: directory , options: options } = _init_arg ) do
2727 tid = Keydir . new ( )
2828
2929 :ok = :persistent_term . put ( { :b4_keydir_tid , directory } , tid )
@@ -34,7 +34,7 @@ defmodule B4.KeydirOwner do
3434 Files . apply_file_to_keydir ( path , tid )
3535 end )
3636
37- { :ok , % State { directory: directory , tid: tid , target_file_size: target_file_size } }
37+ { :ok , % State { directory: directory , tid: tid , target_file_size: options [ : target_file_size] } }
3838 end
3939
4040 @ impl GenServer
@@ -66,114 +66,126 @@ defmodule B4.KeydirOwner do
6666 { :ok , % { write_file: merge_write_file , file_id: merge_write_file_id } } =
6767 Writer . new_write_file ( directory )
6868
69- Enum . reduce ( read_only_database_files , % { merge_file_ids: MapSet . new ( ) } , fn path , outer_acc ->
70- acc_for_file =
71- path
72- |> Files . stream_entries ( )
73- |> Enum . reduce (
74- % {
75- merge_write_file: merge_write_file ,
76- merge_write_file_id: merge_write_file_id ,
77- merge_file_ids: MapSet . new ( ) ,
78- merge_write_file_position: 0
79- } ,
80- fn % {
81- entry: % {
82- crc32: crc32 ,
83- entry_id: on_disk_entry_id ,
84- key_size: key_size ,
85- value_size: value_size ,
86- key_bytes: key_bytes ,
87- value_bytes: value_bytes
69+ merge_state =
70+ Enum . reduce ( read_only_database_files , % { merge_file_ids: MapSet . new ( ) } , fn path , outer_acc ->
71+ acc_for_file =
72+ path
73+ |> Files . stream_entries ( )
74+ |> Enum . reduce (
75+ % {
76+ merge_write_file: merge_write_file ,
77+ merge_write_file_id: merge_write_file_id ,
78+ merge_file_ids: MapSet . new ( ) ,
79+ merge_write_file_position: 0
80+ } ,
81+ fn % {
82+ entry: % {
83+ crc32: crc32 ,
84+ entry_id: on_disk_entry_id ,
85+ key_size: key_size ,
86+ value_size: value_size ,
87+ key_bytes: key_bytes ,
88+ value_bytes: value_bytes
89+ } ,
90+ meta: % { }
8891 } ,
89- meta: % { }
90- } ,
91- % {
92- merge_write_file: merge_write_file ,
93- merge_write_file_id: merge_write_file_id ,
94- merge_write_file_position: merge_write_file_position
95- } =
96- acc ->
97- key = :erlang . binary_to_term ( key_bytes )
98-
99- case Keydir . fetch ( tid , key ) do
100- { :ok , { _key , _file_id , _entry_size , _file_position , keydir_entry_id } }
101- when keydir_entry_id == on_disk_entry_id ->
102- { :ok ,
103- % {
104- merge_write_file: merge_write_file ,
105- merge_write_file_id: merge_write_file_id ,
106- merge_write_file_position: merge_write_file_position
107- } } =
108- if merge_write_file_position >= target_file_size do
109- { :ok , % { write_file: merge_write_file , file_id: merge_write_file_id } } =
110- Writer . new_write_file ( directory )
111-
112- { :ok ,
113- % {
114- merge_write_file: merge_write_file ,
115- merge_write_file_id: merge_write_file_id ,
116- merge_write_file_position: 0
117- } }
118- else
119- { :ok ,
120- % {
121- merge_write_file: merge_write_file ,
122- merge_write_file_id: merge_write_file_id ,
123- merge_write_file_position: merge_write_file_position
124- } }
125- end
126-
127- entry =
128- [
129- Writer . int_to_u32_bytes ( crc32 ) ,
130- Writer . int_to_u128_bytes ( on_disk_entry_id ) ,
131- Writer . int_to_u32_bytes ( key_size ) ,
132- Writer . int_to_u32_bytes ( value_size ) ,
133- key_bytes ,
134- value_bytes
135- ]
136-
137- :ok = :file . write ( merge_write_file , entry )
138-
139- entry_size = :erlang . iolist_size ( entry )
140-
141- true =
142- Keydir . insert (
143- tid ,
144- key ,
145- merge_write_file_id ,
146- entry_size ,
147- merge_write_file_position ,
148- on_disk_entry_id
149- )
150-
151- % {
92+ % {
93+ merge_write_file: merge_write_file ,
94+ merge_write_file_id: merge_write_file_id ,
95+ merge_write_file_position: merge_write_file_position
96+ } =
97+ acc ->
98+ key = :erlang . binary_to_term ( key_bytes )
99+
100+ case Keydir . fetch ( tid , key ) do
101+ { :ok , { _key , _file_id , _entry_size , _file_position , keydir_entry_id } }
102+ when keydir_entry_id == on_disk_entry_id ->
103+ { :ok ,
104+ % {
105+ merge_write_file: merge_write_file ,
106+ merge_write_file_id: merge_write_file_id ,
107+ merge_write_file_position: merge_write_file_position
108+ } } =
109+ if merge_write_file_position >= target_file_size do
110+ :file . close ( merge_write_file )
111+
112+ { :ok , % { write_file: merge_write_file , file_id: merge_write_file_id } } =
113+ Writer . new_write_file ( directory )
114+
115+ { :ok ,
116+ % {
117+ merge_write_file: merge_write_file ,
118+ merge_write_file_id: merge_write_file_id ,
119+ merge_write_file_position: 0
120+ } }
121+ else
122+ { :ok ,
123+ % {
124+ merge_write_file: merge_write_file ,
125+ merge_write_file_id: merge_write_file_id ,
126+ merge_write_file_position: merge_write_file_position
127+ } }
128+ end
129+
130+ entry =
131+ [
132+ Writer . int_to_u32_bytes ( crc32 ) ,
133+ Writer . int_to_u128_bytes ( on_disk_entry_id ) ,
134+ Writer . int_to_u32_bytes ( key_size ) ,
135+ Writer . int_to_u32_bytes ( value_size ) ,
136+ key_bytes ,
137+ value_bytes
138+ ]
139+
140+ # TODO should we be fsyncing here?
141+ :ok = :file . write ( merge_write_file , entry )
142+
143+ entry_size = :erlang . iolist_size ( entry )
144+
145+ true =
146+ Keydir . insert (
147+ tid ,
148+ key ,
149+ merge_write_file_id ,
150+ entry_size ,
151+ merge_write_file_position ,
152+ on_disk_entry_id
153+ )
154+
155+ % {
156+ acc
157+ | merge_write_file: merge_write_file ,
158+ merge_write_file_id: merge_write_file_id ,
159+ merge_write_file_position: acc . merge_write_file_position + entry_size ,
160+ merge_file_ids: MapSet . put ( acc . merge_file_ids , merge_write_file_id )
161+ }
162+
163+ # the entry isn't in the keydir,
164+ # so it isn't live anymore,
165+ # so skip it
166+ :error ->
152167 acc
153- | merge_write_file: merge_write_file ,
154- merge_write_file_id: merge_write_file_id ,
155- merge_write_file_position: acc . merge_write_file_position + entry_size ,
156- merge_file_ids: MapSet . put ( acc . merge_file_ids , merge_write_file_id )
157- }
158-
159- # the entry isn't in the keydir,
160- # so it isn't live anymore,
161- # so skip it
162- :error ->
163- acc
164-
165- # the ids for the given key didn't match,
166- # so they are old version of that key,
167- # so we ignore them
168- _ ->
169- acc
168+
169+ # the ids for the given key didn't match,
170+ # so they are old version of that key,
171+ # so we ignore them
172+ _ ->
173+ acc
174+ end
170175 end
171- end
172- )
176+ )
173177
174- Map . update! ( outer_acc , :merge_file_ids , fn merge_file_ids ->
175- MapSet . union ( merge_file_ids , acc_for_file [ :merge_file_ids ] )
178+ Map . update! ( outer_acc , :merge_file_ids , fn merge_file_ids ->
179+ MapSet . union ( merge_file_ids , acc_for_file [ :merge_file_ids ] )
180+ end )
176181 end )
182+
183+ # fsync all merge files
184+ Enum . each ( merge_state [ :merge_file_ids ] , fn merge_file_id ->
185+ merge_file_path = Path . join ( [ directory , "#{ merge_file_id } .b4" ] )
186+ { :ok , f } = :file . open ( merge_file_path , [ :raw , :binary , :read ] )
187+ :ok = :file . sync ( f )
188+ :ok = :file . close ( f )
177189 end )
178190
179191 # |> IO.inspect(label: "merge file acc state")
0 commit comments