1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.EOFException;
22 import java.io.FileNotFoundException;
23 import java.io.IOException;
24 import java.io.InterruptedIOException;
25 import java.text.ParseException;
26 import java.util.ArrayList;
27 import java.util.Collections;
28 import java.util.HashSet;
29 import java.util.LinkedList;
30 import java.util.List;
31 import java.util.Map;
32 import java.util.NavigableSet;
33 import java.util.Set;
34 import java.util.TreeMap;
35 import java.util.TreeSet;
36 import java.util.UUID;
37 import java.util.concurrent.Callable;
38 import java.util.concurrent.CompletionService;
39 import java.util.concurrent.ConcurrentHashMap;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorCompletionService;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.ThreadFactory;
44 import java.util.concurrent.ThreadPoolExecutor;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicLong;
48 import java.util.concurrent.atomic.AtomicReference;
49 import java.util.regex.Matcher;
50 import java.util.regex.Pattern;
51
52 import org.apache.commons.logging.Log;
53 import org.apache.commons.logging.LogFactory;
54 import org.apache.hadoop.conf.Configuration;
55 import org.apache.hadoop.fs.FileAlreadyExistsException;
56 import org.apache.hadoop.fs.FileStatus;
57 import org.apache.hadoop.fs.FileSystem;
58 import org.apache.hadoop.fs.Path;
59 import org.apache.hadoop.fs.PathFilter;
60 import org.apache.hadoop.hbase.Cell;
61 import org.apache.hadoop.hbase.CellScanner;
62 import org.apache.hadoop.hbase.CellUtil;
63 import org.apache.hadoop.hbase.CoordinatedStateException;
64 import org.apache.hadoop.hbase.CoordinatedStateManager;
65 import org.apache.hadoop.hbase.HBaseConfiguration;
66 import org.apache.hadoop.hbase.HConstants;
67 import org.apache.hadoop.hbase.HRegionInfo;
68 import org.apache.hadoop.hbase.HRegionLocation;
69 import org.apache.hadoop.hbase.RemoteExceptionHandler;
70 import org.apache.hadoop.hbase.ServerName;
71 import org.apache.hadoop.hbase.TableName;
72 import org.apache.hadoop.hbase.TableNotFoundException;
73 import org.apache.hadoop.hbase.TableStateManager;
74 import org.apache.hadoop.hbase.classification.InterfaceAudience;
75 import org.apache.hadoop.hbase.client.ConnectionUtils;
76 import org.apache.hadoop.hbase.client.Delete;
77 import org.apache.hadoop.hbase.client.Durability;
78 import org.apache.hadoop.hbase.client.HConnection;
79 import org.apache.hadoop.hbase.client.HConnectionManager;
80 import org.apache.hadoop.hbase.client.Mutation;
81 import org.apache.hadoop.hbase.client.Put;
82 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
83 import org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination;
84 import org.apache.hadoop.hbase.exceptions.RegionOpeningException;
85 import org.apache.hadoop.hbase.io.HeapSize;
86 import org.apache.hadoop.hbase.master.SplitLogManager;
87 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
88 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
89 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
90 import org.apache.hadoop.hbase.protobuf.RequestConverter;
91 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
92 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
93 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse;
94 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
95 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
96 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
97 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
98 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
99 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
100 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
101 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
102 import org.apache.hadoop.hbase.regionserver.HRegion;
103 import org.apache.hadoop.hbase.regionserver.LastSequenceId;
104
105 import org.apache.hadoop.hbase.regionserver.wal.FSHLog;
106 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
107 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
108 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
109 import org.apache.hadoop.hbase.regionserver.wal.WALEditsReplaySink;
110 import org.apache.hadoop.hbase.util.Bytes;
111 import org.apache.hadoop.hbase.util.CancelableProgressable;
112 import org.apache.hadoop.hbase.util.ClassSize;
113 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
114 import org.apache.hadoop.hbase.util.FSUtils;
115 import org.apache.hadoop.hbase.util.Pair;
116 import org.apache.hadoop.hbase.util.Threads;
117 import org.apache.hadoop.hbase.wal.WAL.Entry;
118 import org.apache.hadoop.hbase.wal.WAL.Reader;
119 import org.apache.hadoop.hbase.wal.WALProvider.Writer;
120 import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
121 import org.apache.hadoop.io.MultipleIOException;
122
123 import com.google.common.annotations.VisibleForTesting;
124 import com.google.common.base.Preconditions;
125 import com.google.common.collect.Lists;
126 import com.google.protobuf.ServiceException;
127
128
129
130
131
132
133 @InterfaceAudience.Private
134 public class WALSplitter {
135 static final Log LOG = LogFactory.getLog(WALSplitter.class);
136
137
138 public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
139
140
141 protected final Path rootDir;
142 protected final FileSystem fs;
143 protected final Configuration conf;
144
145
146
147 PipelineController controller;
148 OutputSink outputSink;
149 EntryBuffers entryBuffers;
150
151 private Set<TableName> disablingOrDisabledTables =
152 new HashSet<TableName>();
153 private BaseCoordinatedStateManager csm;
154 private final WALFactory walFactory;
155
156 private MonitoredTask status;
157
158
159 protected final LastSequenceId sequenceIdChecker;
160
161 protected boolean distributedLogReplay;
162
163
164 protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
165
166
167 protected Map<String, Map<byte[], Long>> regionMaxSeqIdInStores =
168 new ConcurrentHashMap<String, Map<byte[], Long>>();
169
170
171 protected String failedServerName = "";
172
173
174 private final int numWriterThreads;
175
176
177 private final int minBatchSize;
178
179 WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
180 FileSystem fs, LastSequenceId idChecker,
181 CoordinatedStateManager csm, RecoveryMode mode) {
182 this.conf = HBaseConfiguration.create(conf);
183 String codecClassName = conf
184 .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
185 this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
186 this.rootDir = rootDir;
187 this.fs = fs;
188 this.sequenceIdChecker = idChecker;
189 this.csm = (BaseCoordinatedStateManager)csm;
190 this.walFactory = factory;
191 this.controller = new PipelineController();
192
193 entryBuffers = new EntryBuffers(controller,
194 this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
195 128*1024*1024));
196
197
198
199 this.minBatchSize = this.conf.getInt("hbase.regionserver.wal.logreplay.batch.size", 64);
200 this.distributedLogReplay = (RecoveryMode.LOG_REPLAY == mode);
201
202 this.numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
203 if (csm != null && this.distributedLogReplay) {
204 outputSink = new LogReplayOutputSink(controller, entryBuffers, numWriterThreads);
205 } else {
206 if (this.distributedLogReplay) {
207 LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
208 }
209 this.distributedLogReplay = false;
210 outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
211 }
212
213 }
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231 public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
232 Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
233 CoordinatedStateManager cp, RecoveryMode mode, final WALFactory factory) throws IOException {
234 WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, cp, mode);
235 return s.splitLogFile(logfile, reporter);
236 }
237
238
239
240
241
242 public static List<Path> split(Path walRootDir, Path logDir, Path oldLogDir,
243 FileSystem walFs, Configuration conf, final WALFactory factory) throws IOException {
244 final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
245 Collections.singletonList(logDir), null);
246 List<Path> splits = new ArrayList<Path>();
247 if (logfiles != null && logfiles.length > 0) {
248 for (FileStatus logfile: logfiles) {
249 WALSplitter s = new WALSplitter(factory, conf, walRootDir, walFs, null, null,
250 RecoveryMode.LOG_SPLITTING);
251 if (s.splitLogFile(logfile, null)) {
252 finishSplitLogFile(walRootDir, oldLogDir, logfile.getPath(), conf);
253 if (s.outputSink.splits != null) {
254 splits.addAll(s.outputSink.splits);
255 }
256 }
257 }
258 }
259 if (!walFs.delete(logDir, true)) {
260 throw new IOException("Unable to delete src dir: " + logDir);
261 }
262 return splits;
263 }
264
265
266
267
268
269 boolean splitLogFile(FileStatus logfile, CancelableProgressable reporter) throws IOException {
270 Preconditions.checkState(status == null);
271 Preconditions.checkArgument(logfile.isFile(),
272 "passed in file status is for something other than a regular file.");
273 boolean isCorrupted = false;
274 boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
275 SPLIT_SKIP_ERRORS_DEFAULT);
276 int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
277 Path logPath = logfile.getPath();
278 boolean outputSinkStarted = false;
279 boolean progress_failed = false;
280 int editsCount = 0;
281 int editsSkipped = 0;
282
283 status =
284 TaskMonitor.get().createStatus(
285 "Splitting log file " + logfile.getPath() + "into a temporary staging area.");
286 Reader in = null;
287 try {
288 long logLength = logfile.getLen();
289 LOG.info("Splitting wal: " + logPath + ", length=" + logLength);
290 LOG.info("DistributedLogReplay = " + this.distributedLogReplay);
291 status.setStatus("Opening log file");
292 if (reporter != null && !reporter.progress()) {
293 progress_failed = true;
294 return false;
295 }
296 try {
297 in = getReader(logfile, skipErrors, reporter);
298 } catch (CorruptedLogFileException e) {
299 LOG.warn("Could not get reader, corrupted log file " + logPath, e);
300 ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
301 isCorrupted = true;
302 }
303 if (in == null) {
304 LOG.warn("Nothing to split in log file " + logPath);
305 return true;
306 }
307 if (csm != null) {
308 try {
309 TableStateManager tsm = csm.getTableStateManager();
310 disablingOrDisabledTables = tsm.getTablesInStates(
311 ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
312 } catch (CoordinatedStateException e) {
313 throw new IOException("Can't get disabling/disabled tables", e);
314 }
315 }
316 int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
317 int numOpenedFilesLastCheck = 0;
318 outputSink.setReporter(reporter);
319 outputSink.startWriterThreads();
320 outputSinkStarted = true;
321 Entry entry;
322 Long lastFlushedSequenceId = -1L;
323 ServerName serverName = DefaultWALProvider.getServerNameFromWALDirectoryName(logPath);
324 failedServerName = (serverName == null) ? "" : serverName.getServerName();
325 while ((entry = getNextLogLine(in, logPath, skipErrors)) != null) {
326 byte[] region = entry.getKey().getEncodedRegionName();
327 String key = Bytes.toString(region);
328 lastFlushedSequenceId = lastFlushedSequenceIds.get(key);
329 if (lastFlushedSequenceId == null) {
330 if (this.distributedLogReplay) {
331 RegionStoreSequenceIds ids =
332 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
333 key);
334 if (ids != null) {
335 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
336 }
337 } else if (sequenceIdChecker != null) {
338 RegionStoreSequenceIds ids = sequenceIdChecker.getLastSequenceId(region);
339 Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
340 for (StoreSequenceId storeSeqId : ids.getStoreSequenceIdList()) {
341 maxSeqIdInStores.put(storeSeqId.getFamilyName().toByteArray(),
342 storeSeqId.getSequenceId());
343 }
344 regionMaxSeqIdInStores.put(key, maxSeqIdInStores);
345 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
346 }
347 if (lastFlushedSequenceId == null) {
348 lastFlushedSequenceId = -1L;
349 }
350 lastFlushedSequenceIds.put(key, lastFlushedSequenceId);
351 }
352 if (lastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
353 editsSkipped++;
354 continue;
355 }
356 entryBuffers.appendEntry(entry);
357 editsCount++;
358 int moreWritersFromLastCheck = this.getNumOpenWriters() - numOpenedFilesLastCheck;
359
360 if (editsCount % interval == 0
361 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
362 numOpenedFilesLastCheck = this.getNumOpenWriters();
363 String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
364 + " edits, skipped " + editsSkipped + " edits.";
365 status.setStatus("Split " + countsStr);
366 if (reporter != null && !reporter.progress()) {
367 progress_failed = true;
368 return false;
369 }
370 }
371 }
372 } catch (InterruptedException ie) {
373 IOException iie = new InterruptedIOException();
374 iie.initCause(ie);
375 throw iie;
376 } catch (CorruptedLogFileException e) {
377 LOG.warn("Could not parse, corrupted log file " + logPath, e);
378 csm.getSplitLogWorkerCoordination().markCorrupted(rootDir,
379 logfile.getPath().getName(), fs);
380 isCorrupted = true;
381 } catch (IOException e) {
382 e = RemoteExceptionHandler.checkIOException(e);
383 throw e;
384 } finally {
385 LOG.debug("Finishing writing output logs and closing down.");
386 try {
387 if (null != in) {
388 in.close();
389 }
390 } catch (IOException exception) {
391 LOG.warn("Could not close wal reader: " + exception.getMessage());
392 LOG.debug("exception details", exception);
393 }
394 try {
395 if (outputSinkStarted) {
396
397
398 progress_failed = true;
399 progress_failed = outputSink.finishWritingAndClose() == null;
400 }
401 } finally {
402 String msg =
403 "Processed " + editsCount + " edits across " + outputSink.getNumberOfRecoveredRegions()
404 + " regions; edits skipped=" + editsSkipped + "; log file=" + logPath +
405 ", length=" + logfile.getLen() +
406 ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
407 LOG.info(msg);
408 status.markComplete(msg);
409 }
410 }
411 return !progress_failed;
412 }
413
414
415
416
417
418
419
420
421
422
423
424
425 public static void finishSplitLogFile(String logfile,
426 Configuration conf) throws IOException {
427 Path rootdir = FSUtils.getWALRootDir(conf);
428 Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
429 Path logPath;
430 if (FSUtils.isStartingWithPath(rootdir, logfile)) {
431 logPath = new Path(logfile);
432 } else {
433 logPath = new Path(rootdir, logfile);
434 }
435 finishSplitLogFile(rootdir, oldLogDir, logPath, conf);
436 }
437
438 static void finishSplitLogFile(Path rootdir, Path oldLogDir,
439 Path logPath, Configuration conf) throws IOException {
440 List<Path> processedLogs = new ArrayList<Path>();
441 List<Path> corruptedLogs = new ArrayList<Path>();
442 FileSystem fs;
443 fs = rootdir.getFileSystem(conf);
444 if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
445 corruptedLogs.add(logPath);
446 } else {
447 processedLogs.add(logPath);
448 }
449 archiveLogs(corruptedLogs, processedLogs, oldLogDir, fs, conf);
450 Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
451 fs.delete(stagingDir, true);
452 }
453
454
455
456
457
458
459
460
461
462
463
464
465
466 private static void archiveLogs(
467 final List<Path> corruptedLogs,
468 final List<Path> processedLogs, final Path oldLogDir,
469 final FileSystem fs, final Configuration conf) throws IOException {
470 final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), conf.get(
471 "hbase.regionserver.hlog.splitlog.corrupt.dir", HConstants.CORRUPT_DIR_NAME));
472
473 if (!fs.mkdirs(corruptDir)) {
474 LOG.info("Unable to mkdir " + corruptDir);
475 }
476 fs.mkdirs(oldLogDir);
477
478
479
480 for (Path corrupted : corruptedLogs) {
481 Path p = new Path(corruptDir, corrupted.getName());
482 if (fs.exists(corrupted)) {
483 if (!fs.rename(corrupted, p)) {
484 LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
485 } else {
486 LOG.warn("Moved corrupted log " + corrupted + " to " + p);
487 }
488 }
489 }
490
491 for (Path p : processedLogs) {
492 Path newPath = FSHLog.getWALArchivePath(oldLogDir, p);
493 if (fs.exists(p)) {
494 if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
495 LOG.warn("Unable to move " + p + " to " + newPath);
496 } else {
497 LOG.info("Archived processed log " + p + " to " + newPath);
498 }
499 }
500 }
501 }
502
503
504
505
506
507
508
509
510
511
512
513
514
515 @SuppressWarnings("deprecation")
516 static Path getRegionSplitEditsPath(final FileSystem fs,
517 final Entry logEntry, final Path rootDir, boolean isCreate)
518 throws IOException {
519 Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTablename());
520 String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
521 Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
522 Path dir = getRegionDirRecoveredEditsDir(regiondir);
523
524 if (!fs.exists(regiondir)) {
525 LOG.info("This region's directory doesn't exist: "
526 + regiondir.toString() + ". It is very likely that it was" +
527 " already split so it's safe to discard those edits.");
528 return null;
529 }
530 if (fs.exists(dir) && fs.isFile(dir)) {
531 Path tmp = new Path("/tmp");
532 if (!fs.exists(tmp)) {
533 fs.mkdirs(tmp);
534 }
535 tmp = new Path(tmp,
536 HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
537 LOG.warn("Found existing old file: " + dir + ". It could be some "
538 + "leftover of an old installation. It should be a folder instead. "
539 + "So moving it to " + tmp);
540 if (!fs.rename(dir, tmp)) {
541 LOG.warn("Failed to sideline old file " + dir);
542 }
543 }
544
545 if (isCreate && !fs.exists(dir)) {
546 if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
547 }
548
549
550 String fileName = formatRecoveredEditsFileName(logEntry.getKey().getLogSeqNum());
551 fileName = getTmpRecoveredEditsFileName(fileName);
552 return new Path(dir, fileName);
553 }
554
555 static String getTmpRecoveredEditsFileName(String fileName) {
556 return fileName + RECOVERED_LOG_TMPFILE_SUFFIX;
557 }
558
559
560
561
562
563
564
565
566
567 static Path getCompletedRecoveredEditsFilePath(Path srcPath,
568 Long maximumEditLogSeqNum) {
569 String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum);
570 return new Path(srcPath.getParent(), fileName);
571 }
572
573 static String formatRecoveredEditsFileName(final long seqid) {
574 return String.format("%019d", seqid);
575 }
576
577 private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+");
578 private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp";
579
580
581
582
583
584
585
586 public static Path getRegionDirRecoveredEditsDir(final Path regiondir) {
587 return new Path(regiondir, HConstants.RECOVERED_EDITS_DIR);
588 }
589
590
591
592
593
594
595
596
597
598
599 public static NavigableSet<Path> getSplitEditFilesSorted(final FileSystem fs,
600 final Path regiondir) throws IOException {
601 NavigableSet<Path> filesSorted = new TreeSet<Path>();
602 Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
603 if (!fs.exists(editsdir))
604 return filesSorted;
605 FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
606 @Override
607 public boolean accept(Path p) {
608 boolean result = false;
609 try {
610
611
612
613
614 Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName());
615 result = fs.isFile(p) && m.matches();
616
617
618 if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) {
619 result = false;
620 }
621
622 if (isSequenceIdFile(p)) {
623 result = false;
624 }
625 } catch (IOException e) {
626 LOG.warn("Failed isFile check on " + p);
627 }
628 return result;
629 }
630 });
631 if (files == null) {
632 return filesSorted;
633 }
634 for (FileStatus status : files) {
635 filesSorted.add(status.getPath());
636 }
637 return filesSorted;
638 }
639
640
641
642
643
644
645
646
647
648
649 public static Path moveAsideBadEditsFile(final FileSystem fs, final Path edits)
650 throws IOException {
651 Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
652 + System.currentTimeMillis());
653 if (!fs.rename(edits, moveAsideName)) {
654 LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
655 }
656 return moveAsideName;
657 }
658
659 private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid";
660 private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid";
661 private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length();
662
663
664
665
666 @VisibleForTesting
667 public static boolean isSequenceIdFile(final Path file) {
668 return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX)
669 || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX);
670 }
671
672
673
674
675
676
677
678
679
680
681 public static long writeRegionSequenceIdFile(final FileSystem fs, final Path regiondir,
682 long newSeqId, long saftyBumper) throws IOException {
683
684 Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir);
685 long maxSeqId = 0;
686 FileStatus[] files = null;
687 if (fs.exists(editsdir)) {
688 files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
689 @Override
690 public boolean accept(Path p) {
691 return isSequenceIdFile(p);
692 }
693 });
694 if (files != null) {
695 for (FileStatus status : files) {
696 String fileName = status.getPath().getName();
697 try {
698 Long tmpSeqId = Long.parseLong(fileName.substring(0, fileName.length()
699 - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
700 maxSeqId = Math.max(tmpSeqId, maxSeqId);
701 } catch (NumberFormatException ex) {
702 LOG.warn("Invalid SeqId File Name=" + fileName);
703 }
704 }
705 }
706 }
707 if (maxSeqId > newSeqId) {
708 newSeqId = maxSeqId;
709 }
710 newSeqId += saftyBumper;
711
712
713 Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
714 if (newSeqId != maxSeqId) {
715 try {
716 if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
717 throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
718 }
719 if (LOG.isDebugEnabled()) {
720 LOG.debug("Wrote region seqId=" + newSeqIdFile + " to file, newSeqId=" + newSeqId
721 + ", maxSeqId=" + maxSeqId);
722 }
723 } catch (FileAlreadyExistsException ignored) {
724
725 }
726 }
727
728 if (files != null) {
729 for (FileStatus status : files) {
730 if (newSeqIdFile.equals(status.getPath())) {
731 continue;
732 }
733 fs.delete(status.getPath(), false);
734 }
735 }
736 return newSeqId;
737 }
738
739
740
741
742
743
744
745
746
747 protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
748 throws IOException, CorruptedLogFileException {
749 Path path = file.getPath();
750 long length = file.getLen();
751 Reader in;
752
753
754
755
756 if (length <= 0) {
757 LOG.warn("File " + path + " might be still open, length is 0");
758 }
759
760 try {
761 FSUtils.getInstance(fs, conf).recoverFileLease(fs, path, conf, reporter);
762 try {
763 in = getReader(path, reporter);
764 } catch (EOFException e) {
765 if (length <= 0) {
766
767
768
769
770
771 LOG.warn("Could not open " + path + " for reading. File is empty", e);
772 return null;
773 } else {
774
775 return null;
776 }
777 }
778 } catch (IOException e) {
779 if (e instanceof FileNotFoundException) {
780
781 LOG.warn("File " + path + " doesn't exist anymore.", e);
782 return null;
783 }
784 if (!skipErrors || e instanceof InterruptedIOException) {
785 throw e;
786 }
787 CorruptedLogFileException t =
788 new CorruptedLogFileException("skipErrors=true Could not open wal " +
789 path + " ignoring");
790 t.initCause(e);
791 throw t;
792 }
793 return in;
794 }
795
796 static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
797 throws CorruptedLogFileException, IOException {
798 try {
799 return in.next();
800 } catch (EOFException eof) {
801
802 LOG.info("EOF from wal " + path + ". continuing");
803 return null;
804 } catch (IOException e) {
805
806
807 if (e.getCause() != null &&
808 (e.getCause() instanceof ParseException ||
809 e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
810 LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
811 + path + ". continuing");
812 return null;
813 }
814 if (!skipErrors) {
815 throw e;
816 }
817 CorruptedLogFileException t =
818 new CorruptedLogFileException("skipErrors=true Ignoring exception" +
819 " while parsing wal " + path + ". Marking as corrupted");
820 t.initCause(e);
821 throw t;
822 }
823 }
824
825
826
827
828
829 protected Writer createWriter(Path logfile)
830 throws IOException {
831 return walFactory.createRecoveredEditsWriter(fs, logfile);
832 }
833
834
835
836
837
838 protected Reader getReader(Path curLogFile, CancelableProgressable reporter) throws IOException {
839 return walFactory.createReader(fs, curLogFile, reporter);
840 }
841
842
843
844
845 private int getNumOpenWriters() {
846 int result = 0;
847 if (this.outputSink != null) {
848 result += this.outputSink.getNumOpenWriters();
849 }
850 return result;
851 }
852
853
854
855
856 public static class PipelineController {
857
858
859 AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
860
861
862
863 public final Object dataAvailable = new Object();
864
865 void writerThreadError(Throwable t) {
866 thrown.compareAndSet(null, t);
867 }
868
869
870
871
872 void checkForErrors() throws IOException {
873 Throwable thrown = this.thrown.get();
874 if (thrown == null) return;
875 if (thrown instanceof IOException) {
876 throw new IOException(thrown);
877 } else {
878 throw new RuntimeException(thrown);
879 }
880 }
881 }
882
883
884
885
886
887
888
889
890 public static class EntryBuffers {
891 PipelineController controller;
892
893 Map<byte[], RegionEntryBuffer> buffers =
894 new TreeMap<byte[], RegionEntryBuffer>(Bytes.BYTES_COMPARATOR);
895
896
897
898
899 Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
900
901 long totalBuffered = 0;
902 long maxHeapUsage;
903
904 public EntryBuffers(PipelineController controller, long maxHeapUsage) {
905 this.controller = controller;
906 this.maxHeapUsage = maxHeapUsage;
907 }
908
909
910
911
912
913
914
915
916 public void appendEntry(Entry entry) throws InterruptedException, IOException {
917 WALKey key = entry.getKey();
918
919 RegionEntryBuffer buffer;
920 long incrHeap;
921 synchronized (this) {
922 buffer = buffers.get(key.getEncodedRegionName());
923 if (buffer == null) {
924 buffer = new RegionEntryBuffer(key.getTablename(), key.getEncodedRegionName());
925 buffers.put(key.getEncodedRegionName(), buffer);
926 }
927 incrHeap= buffer.appendEntry(entry);
928 }
929
930
931 synchronized (controller.dataAvailable) {
932 totalBuffered += incrHeap;
933 while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
934 LOG.debug("Used " + totalBuffered + " bytes of buffered edits, waiting for IO threads...");
935 controller.dataAvailable.wait(2000);
936 }
937 controller.dataAvailable.notifyAll();
938 }
939 controller.checkForErrors();
940 }
941
942
943
944
945 synchronized RegionEntryBuffer getChunkToWrite() {
946 long biggestSize = 0;
947 byte[] biggestBufferKey = null;
948
949 for (Map.Entry<byte[], RegionEntryBuffer> entry : buffers.entrySet()) {
950 long size = entry.getValue().heapSize();
951 if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) {
952 biggestSize = size;
953 biggestBufferKey = entry.getKey();
954 }
955 }
956 if (biggestBufferKey == null) {
957 return null;
958 }
959
960 RegionEntryBuffer buffer = buffers.remove(biggestBufferKey);
961 currentlyWriting.add(biggestBufferKey);
962 return buffer;
963 }
964
965 void doneWriting(RegionEntryBuffer buffer) {
966 synchronized (this) {
967 boolean removed = currentlyWriting.remove(buffer.encodedRegionName);
968 assert removed;
969 }
970 long size = buffer.heapSize();
971
972 synchronized (controller.dataAvailable) {
973 totalBuffered -= size;
974
975 controller.dataAvailable.notifyAll();
976 }
977 }
978
979 synchronized boolean isRegionCurrentlyWriting(byte[] region) {
980 return currentlyWriting.contains(region);
981 }
982
983 public void waitUntilDrained() {
984 synchronized (controller.dataAvailable) {
985 while (totalBuffered > 0) {
986 try {
987 controller.dataAvailable.wait(2000);
988 } catch (InterruptedException e) {
989 LOG.warn("Got intrerrupted while waiting for EntryBuffers is drained");
990 Thread.interrupted();
991 break;
992 }
993 }
994 }
995 }
996 }
997
998
999
1000
1001
1002
1003
1004 public static class RegionEntryBuffer implements HeapSize {
1005 long heapInBuffer = 0;
1006 List<Entry> entryBuffer;
1007 TableName tableName;
1008 byte[] encodedRegionName;
1009
1010 RegionEntryBuffer(TableName tableName, byte[] region) {
1011 this.tableName = tableName;
1012 this.encodedRegionName = region;
1013 this.entryBuffer = new LinkedList<Entry>();
1014 }
1015
1016 long appendEntry(Entry entry) {
1017 internify(entry);
1018 entryBuffer.add(entry);
1019 long incrHeap = entry.getEdit().heapSize() +
1020 ClassSize.align(2 * ClassSize.REFERENCE) +
1021 0;
1022 heapInBuffer += incrHeap;
1023 return incrHeap;
1024 }
1025
1026 private void internify(Entry entry) {
1027 WALKey k = entry.getKey();
1028 k.internTableName(this.tableName);
1029 k.internEncodedRegionName(this.encodedRegionName);
1030 }
1031
1032 @Override
1033 public long heapSize() {
1034 return heapInBuffer;
1035 }
1036
1037 public byte[] getEncodedRegionName() {
1038 return encodedRegionName;
1039 }
1040
1041 public List<Entry> getEntryBuffer() {
1042 return entryBuffer;
1043 }
1044
1045 public TableName getTableName() {
1046 return tableName;
1047 }
1048 }
1049
1050 public static class WriterThread extends Thread {
1051 private volatile boolean shouldStop = false;
1052 private PipelineController controller;
1053 private EntryBuffers entryBuffers;
1054 private OutputSink outputSink = null;
1055
1056 WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){
1057 super(Thread.currentThread().getName() + "-Writer-" + i);
1058 this.controller = controller;
1059 this.entryBuffers = entryBuffers;
1060 outputSink = sink;
1061 }
1062
1063 @Override
1064 public void run() {
1065 try {
1066 doRun();
1067 } catch (Throwable t) {
1068 LOG.error("Exiting thread", t);
1069 controller.writerThreadError(t);
1070 }
1071 }
1072
1073 private void doRun() throws IOException {
1074 LOG.debug("Writer thread " + this + ": starting");
1075 while (true) {
1076 RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
1077 if (buffer == null) {
1078
1079 synchronized (controller.dataAvailable) {
1080 if (shouldStop && !this.outputSink.flush()) {
1081 return;
1082 }
1083 try {
1084 controller.dataAvailable.wait(500);
1085 } catch (InterruptedException ie) {
1086 if (!shouldStop) {
1087 throw new RuntimeException(ie);
1088 }
1089 }
1090 }
1091 continue;
1092 }
1093
1094 assert buffer != null;
1095 try {
1096 writeBuffer(buffer);
1097 } finally {
1098 entryBuffers.doneWriting(buffer);
1099 }
1100 }
1101 }
1102
1103 private void writeBuffer(RegionEntryBuffer buffer) throws IOException {
1104 outputSink.append(buffer);
1105 }
1106
1107 void finish() {
1108 synchronized (controller.dataAvailable) {
1109 shouldStop = true;
1110 controller.dataAvailable.notifyAll();
1111 }
1112 }
1113 }
1114
1115
1116
1117
1118
1119 public static abstract class OutputSink {
1120
1121 protected PipelineController controller;
1122 protected EntryBuffers entryBuffers;
1123
1124 protected Map<byte[], SinkWriter> writers = Collections
1125 .synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
1126
1127 protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
1128 .synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
1129
1130 protected final List<WriterThread> writerThreads = Lists.newArrayList();
1131
1132
1133 protected final Set<byte[]> blacklistedRegions = Collections
1134 .synchronizedSet(new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR));
1135
1136 protected boolean closeAndCleanCompleted = false;
1137
1138 protected boolean writersClosed = false;
1139
1140 protected final int numThreads;
1141
1142 protected CancelableProgressable reporter = null;
1143
1144 protected AtomicLong skippedEdits = new AtomicLong();
1145
1146 protected List<Path> splits = null;
1147
1148 public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
1149 numThreads = numWriters;
1150 this.controller = controller;
1151 this.entryBuffers = entryBuffers;
1152 }
1153
1154 void setReporter(CancelableProgressable reporter) {
1155 this.reporter = reporter;
1156 }
1157
1158
1159
1160
1161 public synchronized void startWriterThreads() {
1162 for (int i = 0; i < numThreads; i++) {
1163 WriterThread t = new WriterThread(controller, entryBuffers, this, i);
1164 t.start();
1165 writerThreads.add(t);
1166 }
1167 }
1168
1169
1170
1171
1172
1173 void updateRegionMaximumEditLogSeqNum(Entry entry) {
1174 synchronized (regionMaximumEditLogSeqNum) {
1175 Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
1176 .getEncodedRegionName());
1177 if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
1178 regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
1179 .getLogSeqNum());
1180 }
1181 }
1182 }
1183
1184 Long getRegionMaximumEditLogSeqNum(byte[] region) {
1185 return regionMaximumEditLogSeqNum.get(region);
1186 }
1187
1188
1189
1190
1191 int getNumOpenWriters() {
1192 return this.writers.size();
1193 }
1194
1195 long getSkippedEdits() {
1196 return this.skippedEdits.get();
1197 }
1198
1199
1200
1201
1202
1203
1204 protected boolean finishWriting(boolean interrupt) throws IOException {
1205 LOG.debug("Waiting for split writer threads to finish");
1206 boolean progress_failed = false;
1207 for (WriterThread t : writerThreads) {
1208 t.finish();
1209 }
1210 if (interrupt) {
1211 for (WriterThread t : writerThreads) {
1212 t.interrupt();
1213 }
1214 }
1215
1216 for (WriterThread t : writerThreads) {
1217 if (!progress_failed && reporter != null && !reporter.progress()) {
1218 progress_failed = true;
1219 }
1220 try {
1221 t.join();
1222 } catch (InterruptedException ie) {
1223 IOException iie = new InterruptedIOException();
1224 iie.initCause(ie);
1225 throw iie;
1226 }
1227 }
1228 controller.checkForErrors();
1229 LOG.info("Split writers finished");
1230 return (!progress_failed);
1231 }
1232
1233 public abstract List<Path> finishWritingAndClose() throws IOException;
1234
1235
1236
1237
1238 public abstract Map<byte[], Long> getOutputCounts();
1239
1240
1241
1242
1243 public abstract int getNumberOfRecoveredRegions();
1244
1245
1246
1247
1248
1249 public abstract void append(RegionEntryBuffer buffer) throws IOException;
1250
1251
1252
1253
1254
1255 public boolean flush() throws IOException {
1256 return false;
1257 }
1258 }
1259
1260
1261
1262
1263 class LogRecoveredEditsOutputSink extends OutputSink {
1264
1265 public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1266 int numWriters) {
1267
1268
1269
1270
1271
1272 super(controller, entryBuffers, numWriters);
1273 }
1274
1275
1276
1277
1278
1279 @Override
1280 public List<Path> finishWritingAndClose() throws IOException {
1281 boolean isSuccessful = false;
1282 List<Path> result = null;
1283 try {
1284 isSuccessful = finishWriting(false);
1285 } finally {
1286 result = close();
1287 List<IOException> thrown = closeLogWriters(null);
1288 if (thrown != null && !thrown.isEmpty()) {
1289 throw MultipleIOException.createIOException(thrown);
1290 }
1291 }
1292 if (isSuccessful) {
1293 splits = result;
1294 }
1295 return splits;
1296 }
1297
1298
1299
1300
1301
1302 private List<Path> close() throws IOException {
1303 Preconditions.checkState(!closeAndCleanCompleted);
1304
1305 final List<Path> paths = new ArrayList<Path>();
1306 final List<IOException> thrown = Lists.newArrayList();
1307 ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
1308 TimeUnit.SECONDS, new ThreadFactory() {
1309 private int count = 1;
1310
1311 @Override
1312 public Thread newThread(Runnable r) {
1313 Thread t = new Thread(r, "split-log-closeStream-" + count++);
1314 return t;
1315 }
1316 });
1317 CompletionService<Void> completionService =
1318 new ExecutorCompletionService<Void>(closeThreadPool);
1319 for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
1320 LOG.debug("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
1321 completionService.submit(new Callable<Void>() {
1322 @Override
1323 public Void call() throws Exception {
1324 WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
1325 LOG.debug("Closing " + wap.p);
1326 try {
1327 wap.w.close();
1328 } catch (IOException ioe) {
1329 LOG.error("Couldn't close log at " + wap.p, ioe);
1330 thrown.add(ioe);
1331 return null;
1332 }
1333 if (LOG.isDebugEnabled()) {
1334 LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
1335 + " edits, skipped " + wap.editsSkipped + " edits in "
1336 + (wap.nanosSpent / 1000 / 1000) + "ms");
1337 }
1338 if (wap.editsWritten == 0) {
1339
1340 if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
1341 LOG.warn("Failed deleting empty " + wap.p);
1342 throw new IOException("Failed deleting empty " + wap.p);
1343 }
1344 return null;
1345 }
1346
1347 Path dst = getCompletedRecoveredEditsFilePath(wap.p,
1348 regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
1349 try {
1350 if (!dst.equals(wap.p) && fs.exists(dst)) {
1351 LOG.warn("Found existing old edits file. It could be the "
1352 + "result of a previous failed split attempt. Deleting " + dst + ", length="
1353 + fs.getFileStatus(dst).getLen());
1354 if (!fs.delete(dst, false)) {
1355 LOG.warn("Failed deleting of old " + dst);
1356 throw new IOException("Failed deleting of old " + dst);
1357 }
1358 }
1359
1360
1361
1362 if (fs.exists(wap.p)) {
1363 if (!fs.rename(wap.p, dst)) {
1364 throw new IOException("Failed renaming " + wap.p + " to " + dst);
1365 }
1366 LOG.info("Rename " + wap.p + " to " + dst);
1367 }
1368 } catch (IOException ioe) {
1369 LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
1370 thrown.add(ioe);
1371 return null;
1372 }
1373 paths.add(dst);
1374 return null;
1375 }
1376 });
1377 }
1378
1379 boolean progress_failed = false;
1380 try {
1381 for (int i = 0, n = this.writers.size(); i < n; i++) {
1382 Future<Void> future = completionService.take();
1383 future.get();
1384 if (!progress_failed && reporter != null && !reporter.progress()) {
1385 progress_failed = true;
1386 }
1387 }
1388 } catch (InterruptedException e) {
1389 IOException iie = new InterruptedIOException();
1390 iie.initCause(e);
1391 throw iie;
1392 } catch (ExecutionException e) {
1393 throw new IOException(e.getCause());
1394 } finally {
1395 closeThreadPool.shutdownNow();
1396 }
1397
1398 if (!thrown.isEmpty()) {
1399 throw MultipleIOException.createIOException(thrown);
1400 }
1401 writersClosed = true;
1402 closeAndCleanCompleted = true;
1403 if (progress_failed) {
1404 return null;
1405 }
1406 return paths;
1407 }
1408
1409 private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
1410 if (writersClosed) {
1411 return thrown;
1412 }
1413
1414 if (thrown == null) {
1415 thrown = Lists.newArrayList();
1416 }
1417 try {
1418 for (WriterThread t : writerThreads) {
1419 while (t.isAlive()) {
1420 t.shouldStop = true;
1421 t.interrupt();
1422 try {
1423 t.join(10);
1424 } catch (InterruptedException e) {
1425 IOException iie = new InterruptedIOException();
1426 iie.initCause(e);
1427 throw iie;
1428 }
1429 }
1430 }
1431 } finally {
1432 synchronized (writers) {
1433 WriterAndPath wap = null;
1434 for (SinkWriter tmpWAP : writers.values()) {
1435 try {
1436 wap = (WriterAndPath) tmpWAP;
1437 wap.w.close();
1438 } catch (IOException ioe) {
1439 LOG.error("Couldn't close log at " + wap.p, ioe);
1440 thrown.add(ioe);
1441 continue;
1442 }
1443 LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
1444 + (wap.nanosSpent / 1000 / 1000) + "ms)");
1445 }
1446 }
1447 writersClosed = true;
1448 }
1449
1450 return thrown;
1451 }
1452
1453
1454
1455
1456
1457
1458 private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
1459 byte region[] = entry.getKey().getEncodedRegionName();
1460 WriterAndPath ret = (WriterAndPath) writers.get(region);
1461 if (ret != null) {
1462 return ret;
1463 }
1464
1465
1466 if (blacklistedRegions.contains(region)) {
1467 return null;
1468 }
1469 ret = createWAP(region, entry, rootDir);
1470 if (ret == null) {
1471 blacklistedRegions.add(region);
1472 return null;
1473 }
1474 writers.put(region, ret);
1475 return ret;
1476 }
1477
1478
1479
1480
1481 private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
1482 Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
1483 if (regionedits == null) {
1484 return null;
1485 }
1486 if (fs.exists(regionedits)) {
1487 LOG.warn("Found old edits file. It could be the "
1488 + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
1489 + fs.getFileStatus(regionedits).getLen());
1490 if (!fs.delete(regionedits, false)) {
1491 LOG.warn("Failed delete of old " + regionedits);
1492 }
1493 }
1494 Writer w = createWriter(regionedits);
1495 LOG.info("Creating writer path=" + regionedits + " region=" + Bytes.toStringBinary(region));
1496 return (new WriterAndPath(regionedits, w));
1497 }
1498
1499 private void filterCellByStore(Entry logEntry) {
1500 Map<byte[], Long> maxSeqIdInStores =
1501 regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
1502 if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
1503 return;
1504 }
1505
1506
1507 ArrayList<Cell> keptCells = new ArrayList<Cell>(logEntry.getEdit().getCells().size());
1508 for (Cell cell : logEntry.getEdit().getCells()) {
1509 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1510 keptCells.add(cell);
1511 } else {
1512 byte[] family = CellUtil.cloneFamily(cell);
1513 Long maxSeqId = maxSeqIdInStores.get(family);
1514
1515
1516 if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getLogSeqNum()) {
1517 keptCells.add(cell);
1518 }
1519 }
1520 }
1521
1522
1523
1524
1525 logEntry.getEdit().setCells(keptCells);
1526 }
1527
1528 @Override
1529 public void append(RegionEntryBuffer buffer) throws IOException {
1530 List<Entry> entries = buffer.entryBuffer;
1531 if (entries.isEmpty()) {
1532 LOG.warn("got an empty buffer, skipping");
1533 return;
1534 }
1535
1536 WriterAndPath wap = null;
1537
1538 long startTime = System.nanoTime();
1539 try {
1540 int editsCount = 0;
1541
1542 for (Entry logEntry : entries) {
1543 if (wap == null) {
1544 wap = getWriterAndPath(logEntry);
1545 if (wap == null) {
1546 if (LOG.isDebugEnabled()) {
1547 LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
1548 }
1549 return;
1550 }
1551 }
1552 filterCellByStore(logEntry);
1553 if (!logEntry.getEdit().isEmpty()) {
1554 wap.w.append(logEntry);
1555 this.updateRegionMaximumEditLogSeqNum(logEntry);
1556 editsCount++;
1557 } else {
1558 wap.incrementSkippedEdits(1);
1559 }
1560 }
1561
1562 wap.incrementEdits(editsCount);
1563 wap.incrementNanoTime(System.nanoTime() - startTime);
1564 } catch (IOException e) {
1565 e = RemoteExceptionHandler.checkIOException(e);
1566 LOG.fatal(" Got while writing log entry to log", e);
1567 throw e;
1568 }
1569 }
1570
1571
1572
1573
1574 @Override
1575 public Map<byte[], Long> getOutputCounts() {
1576 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1577 synchronized (writers) {
1578 for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
1579 ret.put(entry.getKey(), entry.getValue().editsWritten);
1580 }
1581 }
1582 return ret;
1583 }
1584
1585 @Override
1586 public int getNumberOfRecoveredRegions() {
1587 return writers.size();
1588 }
1589 }
1590
1591
1592
1593
1594 public abstract static class SinkWriter {
1595
1596 long editsWritten = 0;
1597
1598 long editsSkipped = 0;
1599
1600 long nanosSpent = 0;
1601
1602 void incrementEdits(int edits) {
1603 editsWritten += edits;
1604 }
1605
1606 void incrementSkippedEdits(int skipped) {
1607 editsSkipped += skipped;
1608 }
1609
1610 void incrementNanoTime(long nanos) {
1611 nanosSpent += nanos;
1612 }
1613 }
1614
1615
1616
1617
1618
1619 private final static class WriterAndPath extends SinkWriter {
1620 final Path p;
1621 final Writer w;
1622
1623 WriterAndPath(final Path p, final Writer w) {
1624 this.p = p;
1625 this.w = w;
1626 }
1627 }
1628
1629
1630
1631
1632 class LogReplayOutputSink extends OutputSink {
1633 private static final double BUFFER_THRESHOLD = 0.35;
1634 private static final String KEY_DELIMITER = "#";
1635
1636 private long waitRegionOnlineTimeOut;
1637 private final Set<String> recoveredRegions = Collections.synchronizedSet(new HashSet<String>());
1638 private final Map<String, RegionServerWriter> writers =
1639 new ConcurrentHashMap<String, RegionServerWriter>();
1640
1641 private final Map<String, HRegionLocation> onlineRegions =
1642 new ConcurrentHashMap<String, HRegionLocation>();
1643
1644 private Map<TableName, HConnection> tableNameToHConnectionMap = Collections
1645 .synchronizedMap(new TreeMap<TableName, HConnection>());
1646
1647
1648
1649
1650 private Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
1651 new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
1652 private List<Throwable> thrown = new ArrayList<Throwable>();
1653
1654
1655
1656
1657
1658 private LogRecoveredEditsOutputSink logRecoveredEditsOutputSink;
1659 private boolean hasEditsInDisablingOrDisabledTables = false;
1660
1661 public LogReplayOutputSink(PipelineController controller, EntryBuffers entryBuffers,
1662 int numWriters) {
1663 super(controller, entryBuffers, numWriters);
1664 this.waitRegionOnlineTimeOut = conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT,
1665 ZKSplitLogManagerCoordination.DEFAULT_TIMEOUT);
1666 this.logRecoveredEditsOutputSink = new LogRecoveredEditsOutputSink(controller,
1667 entryBuffers, numWriters);
1668 this.logRecoveredEditsOutputSink.setReporter(reporter);
1669 }
1670
1671 @Override
1672 public void append(RegionEntryBuffer buffer) throws IOException {
1673 List<Entry> entries = buffer.entryBuffer;
1674 if (entries.isEmpty()) {
1675 LOG.warn("got an empty buffer, skipping");
1676 return;
1677 }
1678
1679
1680 if (disablingOrDisabledTables.contains(buffer.tableName)) {
1681
1682 logRecoveredEditsOutputSink.append(buffer);
1683 hasEditsInDisablingOrDisabledTables = true;
1684
1685 addToRecoveredRegions(Bytes.toString(buffer.encodedRegionName));
1686 return;
1687 }
1688
1689
1690 groupEditsByServer(entries);
1691
1692
1693 String maxLocKey = null;
1694 int maxSize = 0;
1695 List<Pair<HRegionLocation, Entry>> maxQueue = null;
1696 synchronized (this.serverToBufferQueueMap) {
1697 for (String key : this.serverToBufferQueueMap.keySet()) {
1698 List<Pair<HRegionLocation, Entry>> curQueue = this.serverToBufferQueueMap.get(key);
1699 if (curQueue.size() > maxSize) {
1700 maxSize = curQueue.size();
1701 maxQueue = curQueue;
1702 maxLocKey = key;
1703 }
1704 }
1705 if (maxSize < minBatchSize
1706 && entryBuffers.totalBuffered < BUFFER_THRESHOLD * entryBuffers.maxHeapUsage) {
1707
1708 return;
1709 } else if (maxSize > 0) {
1710 this.serverToBufferQueueMap.remove(maxLocKey);
1711 }
1712 }
1713
1714 if (maxSize > 0) {
1715 processWorkItems(maxLocKey, maxQueue);
1716 }
1717 }
1718
1719 private void addToRecoveredRegions(String encodedRegionName) {
1720 if (!recoveredRegions.contains(encodedRegionName)) {
1721 recoveredRegions.add(encodedRegionName);
1722 }
1723 }
1724
1725
1726
1727
1728
1729 private void groupEditsByServer(List<Entry> entries) throws IOException {
1730 Set<TableName> nonExistentTables = null;
1731 Long cachedLastFlushedSequenceId = -1l;
1732 for (Entry entry : entries) {
1733 WALEdit edit = entry.getEdit();
1734 TableName table = entry.getKey().getTablename();
1735
1736 entry.getKey().setScopes(null);
1737 String encodeRegionNameStr = Bytes.toString(entry.getKey().getEncodedRegionName());
1738
1739 if (nonExistentTables != null && nonExistentTables.contains(table)) {
1740 this.skippedEdits.incrementAndGet();
1741 continue;
1742 }
1743
1744 Map<byte[], Long> maxStoreSequenceIds = null;
1745 boolean needSkip = false;
1746 HRegionLocation loc = null;
1747 String locKey = null;
1748 List<Cell> cells = edit.getCells();
1749 List<Cell> skippedCells = new ArrayList<Cell>();
1750 HConnection hconn = this.getConnectionByTableName(table);
1751
1752 for (Cell cell : cells) {
1753 byte[] row = CellUtil.cloneRow(cell);
1754 byte[] family = CellUtil.cloneFamily(cell);
1755 boolean isCompactionEntry = false;
1756 if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
1757 CompactionDescriptor compaction = WALEdit.getCompaction(cell);
1758 if (compaction != null && compaction.hasRegionName()) {
1759 try {
1760 byte[][] regionName = HRegionInfo.parseRegionName(compaction.getRegionName()
1761 .toByteArray());
1762 row = regionName[1];
1763 family = compaction.getFamilyName().toByteArray();
1764 isCompactionEntry = true;
1765 } catch (Exception ex) {
1766 LOG.warn("Unexpected exception received, ignoring " + ex);
1767 skippedCells.add(cell);
1768 continue;
1769 }
1770 } else {
1771 skippedCells.add(cell);
1772 continue;
1773 }
1774 }
1775
1776 try {
1777 loc =
1778 locateRegionAndRefreshLastFlushedSequenceId(hconn, table, row,
1779 encodeRegionNameStr);
1780
1781 if (isCompactionEntry && !encodeRegionNameStr.equalsIgnoreCase(
1782 loc.getRegionInfo().getEncodedName())) {
1783 LOG.info("Not replaying a compaction marker for an older region: "
1784 + encodeRegionNameStr);
1785 needSkip = true;
1786 }
1787 } catch (TableNotFoundException ex) {
1788
1789 LOG.info("Table " + table + " doesn't exist. Skip log replay for region "
1790 + encodeRegionNameStr);
1791 lastFlushedSequenceIds.put(encodeRegionNameStr, Long.MAX_VALUE);
1792 if (nonExistentTables == null) {
1793 nonExistentTables = new TreeSet<TableName>();
1794 }
1795 nonExistentTables.add(table);
1796 this.skippedEdits.incrementAndGet();
1797 needSkip = true;
1798 break;
1799 }
1800
1801 cachedLastFlushedSequenceId =
1802 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1803 if (cachedLastFlushedSequenceId != null
1804 && cachedLastFlushedSequenceId >= entry.getKey().getLogSeqNum()) {
1805
1806 this.skippedEdits.incrementAndGet();
1807 needSkip = true;
1808 break;
1809 } else {
1810 if (maxStoreSequenceIds == null) {
1811 maxStoreSequenceIds =
1812 regionMaxSeqIdInStores.get(loc.getRegionInfo().getEncodedName());
1813 }
1814 if (maxStoreSequenceIds != null) {
1815 Long maxStoreSeqId = maxStoreSequenceIds.get(family);
1816 if (maxStoreSeqId == null || maxStoreSeqId >= entry.getKey().getLogSeqNum()) {
1817
1818 skippedCells.add(cell);
1819 continue;
1820 }
1821 }
1822 }
1823 }
1824
1825
1826 if (loc == null || needSkip) continue;
1827
1828 if (!skippedCells.isEmpty()) {
1829 cells.removeAll(skippedCells);
1830 }
1831
1832 synchronized (serverToBufferQueueMap) {
1833 locKey = loc.getHostnamePort() + KEY_DELIMITER + table;
1834 List<Pair<HRegionLocation, Entry>> queue = serverToBufferQueueMap.get(locKey);
1835 if (queue == null) {
1836 queue =
1837 Collections.synchronizedList(new ArrayList<Pair<HRegionLocation, Entry>>());
1838 serverToBufferQueueMap.put(locKey, queue);
1839 }
1840 queue.add(new Pair<HRegionLocation, Entry>(loc, entry));
1841 }
1842
1843 addToRecoveredRegions(loc.getRegionInfo().getEncodedName());
1844 }
1845 }
1846
1847
1848
1849
1850
1851
1852 private HRegionLocation locateRegionAndRefreshLastFlushedSequenceId(HConnection hconn,
1853 TableName table, byte[] row, String originalEncodedRegionName) throws IOException {
1854
1855 HRegionLocation loc = onlineRegions.get(originalEncodedRegionName);
1856 if(loc != null) return loc;
1857
1858 loc = hconn.getRegionLocation(table, row, true);
1859 if (loc == null) {
1860 throw new IOException("Can't locate location for row:" + Bytes.toString(row)
1861 + " of table:" + table);
1862 }
1863
1864 if (!originalEncodedRegionName.equalsIgnoreCase(loc.getRegionInfo().getEncodedName())) {
1865
1866 lastFlushedSequenceIds.put(originalEncodedRegionName, Long.MAX_VALUE);
1867 HRegionLocation tmpLoc = onlineRegions.get(loc.getRegionInfo().getEncodedName());
1868 if (tmpLoc != null) return tmpLoc;
1869 }
1870
1871 Long lastFlushedSequenceId = -1l;
1872 AtomicBoolean isRecovering = new AtomicBoolean(true);
1873 loc = waitUntilRegionOnline(loc, row, this.waitRegionOnlineTimeOut, isRecovering);
1874 if (!isRecovering.get()) {
1875
1876
1877 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), Long.MAX_VALUE);
1878 LOG.info("logReplay skip region: " + loc.getRegionInfo().getEncodedName()
1879 + " because it's not in recovering.");
1880 } else {
1881 Long cachedLastFlushedSequenceId =
1882 lastFlushedSequenceIds.get(loc.getRegionInfo().getEncodedName());
1883
1884
1885
1886 RegionStoreSequenceIds ids =
1887 csm.getSplitLogWorkerCoordination().getRegionFlushedSequenceId(failedServerName,
1888 loc.getRegionInfo().getEncodedName());
1889 if (ids != null) {
1890 lastFlushedSequenceId = ids.getLastFlushedSequenceId();
1891 Map<byte[], Long> storeIds = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
1892 List<StoreSequenceId> maxSeqIdInStores = ids.getStoreSequenceIdList();
1893 for (StoreSequenceId id : maxSeqIdInStores) {
1894 storeIds.put(id.getFamilyName().toByteArray(), id.getSequenceId());
1895 }
1896 regionMaxSeqIdInStores.put(loc.getRegionInfo().getEncodedName(), storeIds);
1897 }
1898
1899 if (cachedLastFlushedSequenceId == null
1900 || lastFlushedSequenceId > cachedLastFlushedSequenceId) {
1901 lastFlushedSequenceIds.put(loc.getRegionInfo().getEncodedName(), lastFlushedSequenceId);
1902 }
1903 }
1904
1905 onlineRegions.put(loc.getRegionInfo().getEncodedName(), loc);
1906 return loc;
1907 }
1908
1909 private void processWorkItems(String key, List<Pair<HRegionLocation, Entry>> actions)
1910 throws IOException {
1911 RegionServerWriter rsw = null;
1912
1913 long startTime = System.nanoTime();
1914 try {
1915 rsw = getRegionServerWriter(key);
1916 rsw.sink.replayEntries(actions);
1917
1918
1919 rsw.incrementEdits(actions.size());
1920 rsw.incrementNanoTime(System.nanoTime() - startTime);
1921 } catch (IOException e) {
1922 e = RemoteExceptionHandler.checkIOException(e);
1923 LOG.fatal(" Got while writing log entry to log", e);
1924 throw e;
1925 }
1926 }
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937 private HRegionLocation waitUntilRegionOnline(HRegionLocation loc, byte[] row,
1938 final long timeout, AtomicBoolean isRecovering)
1939 throws IOException {
1940 final long endTime = EnvironmentEdgeManager.currentTime() + timeout;
1941 final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
1942 HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
1943 boolean reloadLocation = false;
1944 TableName tableName = loc.getRegionInfo().getTable();
1945 int tries = 0;
1946 Throwable cause = null;
1947 while (endTime > EnvironmentEdgeManager.currentTime()) {
1948 try {
1949
1950 HConnection hconn = getConnectionByTableName(tableName);
1951 if(reloadLocation) {
1952 loc = hconn.getRegionLocation(tableName, row, true);
1953 }
1954 BlockingInterface remoteSvr = hconn.getAdmin(loc.getServerName());
1955 HRegionInfo region = loc.getRegionInfo();
1956 try {
1957 GetRegionInfoRequest request =
1958 RequestConverter.buildGetRegionInfoRequest(region.getRegionName());
1959 GetRegionInfoResponse response = remoteSvr.getRegionInfo(null, request);
1960 if (HRegionInfo.convert(response.getRegionInfo()) != null) {
1961 isRecovering.set((response.hasIsRecovering()) ? response.getIsRecovering() : true);
1962 return loc;
1963 }
1964 } catch (ServiceException se) {
1965 throw ProtobufUtil.getRemoteException(se);
1966 }
1967 } catch (IOException e) {
1968 cause = e.getCause();
1969 if(!(cause instanceof RegionOpeningException)) {
1970 reloadLocation = true;
1971 }
1972 }
1973 long expectedSleep = ConnectionUtils.getPauseTime(pause, tries);
1974 try {
1975 Thread.sleep(expectedSleep);
1976 } catch (InterruptedException e) {
1977 throw new IOException("Interrupted when waiting region " +
1978 loc.getRegionInfo().getEncodedName() + " online.", e);
1979 }
1980 tries++;
1981 }
1982
1983 throw new IOException("Timeout when waiting region " + loc.getRegionInfo().getEncodedName() +
1984 " online for " + timeout + " milliseconds.", cause);
1985 }
1986
1987 @Override
1988 public boolean flush() throws IOException {
1989 String curLoc = null;
1990 int curSize = 0;
1991 List<Pair<HRegionLocation, Entry>> curQueue = null;
1992 synchronized (this.serverToBufferQueueMap) {
1993 for (String locationKey : this.serverToBufferQueueMap.keySet()) {
1994 curQueue = this.serverToBufferQueueMap.get(locationKey);
1995 if (!curQueue.isEmpty()) {
1996 curSize = curQueue.size();
1997 curLoc = locationKey;
1998 break;
1999 }
2000 }
2001 if (curSize > 0) {
2002 this.serverToBufferQueueMap.remove(curLoc);
2003 }
2004 }
2005
2006 if (curSize > 0) {
2007 this.processWorkItems(curLoc, curQueue);
2008
2009 synchronized(controller.dataAvailable) {
2010 controller.dataAvailable.notifyAll();
2011 }
2012 return true;
2013 }
2014 return false;
2015 }
2016
2017 void addWriterError(Throwable t) {
2018 thrown.add(t);
2019 }
2020
2021 @Override
2022 public List<Path> finishWritingAndClose() throws IOException {
2023 try {
2024 if (!finishWriting(false)) {
2025 return null;
2026 }
2027 if (hasEditsInDisablingOrDisabledTables) {
2028 splits = logRecoveredEditsOutputSink.finishWritingAndClose();
2029 } else {
2030 splits = new ArrayList<Path>();
2031 }
2032
2033 return splits;
2034 } finally {
2035 List<IOException> thrown = closeRegionServerWriters();
2036 if (thrown != null && !thrown.isEmpty()) {
2037 throw MultipleIOException.createIOException(thrown);
2038 }
2039 }
2040 }
2041
2042 @Override
2043 int getNumOpenWriters() {
2044 return this.writers.size() + this.logRecoveredEditsOutputSink.getNumOpenWriters();
2045 }
2046
2047 private List<IOException> closeRegionServerWriters() throws IOException {
2048 List<IOException> result = null;
2049 if (!writersClosed) {
2050 result = Lists.newArrayList();
2051 try {
2052 for (WriterThread t : writerThreads) {
2053 while (t.isAlive()) {
2054 t.shouldStop = true;
2055 t.interrupt();
2056 try {
2057 t.join(10);
2058 } catch (InterruptedException e) {
2059 IOException iie = new InterruptedIOException();
2060 iie.initCause(e);
2061 throw iie;
2062 }
2063 }
2064 }
2065 } finally {
2066 synchronized (writers) {
2067 for (String locationKey : writers.keySet()) {
2068 RegionServerWriter tmpW = writers.get(locationKey);
2069 try {
2070 tmpW.close();
2071 } catch (IOException ioe) {
2072 LOG.error("Couldn't close writer for region server:" + locationKey, ioe);
2073 result.add(ioe);
2074 }
2075 }
2076 }
2077
2078
2079 synchronized (this.tableNameToHConnectionMap) {
2080 for (TableName tableName : this.tableNameToHConnectionMap.keySet()) {
2081 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2082 try {
2083 hconn.clearRegionCache();
2084 hconn.close();
2085 } catch (IOException ioe) {
2086 result.add(ioe);
2087 }
2088 }
2089 }
2090 writersClosed = true;
2091 }
2092 }
2093 return result;
2094 }
2095
2096 @Override
2097 public Map<byte[], Long> getOutputCounts() {
2098 TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
2099 synchronized (writers) {
2100 for (Map.Entry<String, RegionServerWriter> entry : writers.entrySet()) {
2101 ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
2102 }
2103 }
2104 return ret;
2105 }
2106
2107 @Override
2108 public int getNumberOfRecoveredRegions() {
2109 return this.recoveredRegions.size();
2110 }
2111
2112
2113
2114
2115
2116
2117 private RegionServerWriter getRegionServerWriter(String loc) throws IOException {
2118 RegionServerWriter ret = writers.get(loc);
2119 if (ret != null) {
2120 return ret;
2121 }
2122
2123 TableName tableName = getTableFromLocationStr(loc);
2124 if(tableName == null){
2125 throw new IOException("Invalid location string:" + loc + " found. Replay aborted.");
2126 }
2127
2128 HConnection hconn = getConnectionByTableName(tableName);
2129 synchronized (writers) {
2130 ret = writers.get(loc);
2131 if (ret == null) {
2132 ret = new RegionServerWriter(conf, tableName, hconn);
2133 writers.put(loc, ret);
2134 }
2135 }
2136 return ret;
2137 }
2138
2139 private HConnection getConnectionByTableName(final TableName tableName) throws IOException {
2140 HConnection hconn = this.tableNameToHConnectionMap.get(tableName);
2141 if (hconn == null) {
2142 synchronized (this.tableNameToHConnectionMap) {
2143 hconn = this.tableNameToHConnectionMap.get(tableName);
2144 if (hconn == null) {
2145 hconn = HConnectionManager.getConnection(conf);
2146 this.tableNameToHConnectionMap.put(tableName, hconn);
2147 }
2148 }
2149 }
2150 return hconn;
2151 }
2152 private TableName getTableFromLocationStr(String loc) {
2153
2154
2155
2156 String[] splits = loc.split(KEY_DELIMITER);
2157 if (splits.length != 2) {
2158 return null;
2159 }
2160 return TableName.valueOf(splits[1]);
2161 }
2162 }
2163
2164
2165
2166
2167
2168 private final static class RegionServerWriter extends SinkWriter {
2169 final WALEditsReplaySink sink;
2170
2171 RegionServerWriter(final Configuration conf, final TableName tableName, final HConnection conn)
2172 throws IOException {
2173 this.sink = new WALEditsReplaySink(conf, tableName, conn);
2174 }
2175
2176 void close() throws IOException {
2177 }
2178 }
2179
2180 static class CorruptedLogFileException extends Exception {
2181 private static final long serialVersionUID = 1L;
2182
2183 CorruptedLogFileException(String s) {
2184 super(s);
2185 }
2186 }
2187
2188
2189 public static class MutationReplay {
2190 public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) {
2191 this.type = type;
2192 this.mutation = mutation;
2193 if(this.mutation.getDurability() != Durability.SKIP_WAL) {
2194
2195 this.mutation.setDurability(Durability.ASYNC_WAL);
2196 }
2197 this.nonceGroup = nonceGroup;
2198 this.nonce = nonce;
2199 }
2200
2201 public final MutationType type;
2202 public final Mutation mutation;
2203 public final long nonceGroup;
2204 public final long nonce;
2205 }
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218 public static List<MutationReplay> getMutationsFromWALEntry(WALEntry entry, CellScanner cells,
2219 Pair<WALKey, WALEdit> logEntry, Durability durability)
2220 throws IOException {
2221 if (entry == null) {
2222
2223 return new ArrayList<MutationReplay>();
2224 }
2225
2226 long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
2227 entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber();
2228 int count = entry.getAssociatedCellCount();
2229 List<MutationReplay> mutations = new ArrayList<MutationReplay>();
2230 Cell previousCell = null;
2231 Mutation m = null;
2232 WALKey key = null;
2233 WALEdit val = null;
2234 if (logEntry != null) val = new WALEdit();
2235
2236 for (int i = 0; i < count; i++) {
2237
2238 if (!cells.advance()) {
2239 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
2240 }
2241 Cell cell = cells.current();
2242 if (val != null) val.add(cell);
2243
2244 boolean isNewRowOrType =
2245 previousCell == null || previousCell.getTypeByte() != cell.getTypeByte()
2246 || !CellUtil.matchingRow(previousCell, cell);
2247 if (isNewRowOrType) {
2248
2249 if (CellUtil.isDelete(cell)) {
2250 m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2251
2252 mutations.add(new MutationReplay(
2253 MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE));
2254 } else {
2255 m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
2256
2257 long nonceGroup = entry.getKey().hasNonceGroup()
2258 ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE;
2259 long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE;
2260 mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce));
2261 }
2262 }
2263 if (CellUtil.isDelete(cell)) {
2264 ((Delete) m).addDeleteMarker(cell);
2265 } else {
2266 ((Put) m).add(cell);
2267 }
2268 if (m != null) {
2269 m.setDurability(durability);
2270 }
2271 previousCell = cell;
2272 }
2273
2274
2275 if (logEntry != null) {
2276 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKeyProto = entry.getKey();
2277 List<UUID> clusterIds = new ArrayList<UUID>(walKeyProto.getClusterIdsCount());
2278 for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) {
2279 clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits()));
2280 }
2281
2282 key = new HLogKey(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf(
2283 walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(),
2284 clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce());
2285 logEntry.setFirst(key);
2286 logEntry.setSecond(val);
2287 }
2288
2289 return mutations;
2290 }
2291 }