diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go index 64ec6d163..cc038c431 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux.go @@ -22,7 +22,7 @@ import ( "time" "github.com/euank/go-kmsg-parser/kmsgparser" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" @@ -30,6 +30,15 @@ import ( "k8s.io/node-problem-detector/pkg/util/tomb" ) +const ( + // retryDelay is the time to wait before attempting to restart the kmsg parser. + retryDelay = 5 * time.Second + + // RestartOnErrorKey is the configuration key to enable restarting + // the kmsg parser when the channel closes due to an error. + RestartOnErrorKey = "restartOnError" +) + type kernelLogWatcher struct { cfg types.WatcherConfig startTime time.Time @@ -83,6 +92,12 @@ func (k *kernelLogWatcher) Stop() { k.tomb.Stop() } +// restartOnError checks if the restart on error configuration is enabled. +func (k *kernelLogWatcher) restartOnError() bool { + value, exists := k.cfg.PluginConfig[RestartOnErrorKey] + return exists && value == "true" +} + // watchLoop is the main watch loop of kernel log watcher. func (k *kernelLogWatcher) watchLoop() { kmsgs := k.kmsgParser.Parse() @@ -102,7 +117,27 @@ func (k *kernelLogWatcher) watchLoop() { case msg, ok := <-kmsgs: if !ok { klog.Error("Kmsg channel closed") - return + + // Only attempt to restart if configured to do so + if !k.restartOnError() { + return + } + + klog.Infof("Attempting to restart kmsg parser") + + // Close the old parser + if err := k.kmsgParser.Close(); err != nil { + klog.Errorf("Failed to close kmsg parser: %v", err) + } + + // Try to restart + var restarted bool + kmsgs, restarted = k.retryCreateParser() + if !restarted { + // Stopping was signaled + return + } + continue } klog.V(5).Infof("got kernel message: %+v", msg) if msg.Message == "" { @@ -122,3 +157,26 @@ func (k *kernelLogWatcher) watchLoop() { } } } + +// retryCreateParser attempts to create a new kmsg parser. +// It returns the new message channel and true on success, or nil and false if stopping was signaled. +func (k *kernelLogWatcher) retryCreateParser() (<-chan kmsgparser.Message, bool) { + for { + select { + case <-k.tomb.Stopping(): + klog.Infof("Stop watching kernel log during restart attempt") + return nil, false + case <-time.After(retryDelay): + } + + parser, err := kmsgparser.NewParser() + if err != nil { + klog.Errorf("Failed to create new kmsg parser, retrying in %v: %v", retryDelay, err) + continue + } + + k.kmsgParser = parser + klog.Infof("Successfully restarted kmsg parser") + return parser.Parse(), true + } +} diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go index fac08aea1..4ddbcaf05 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher_linux_test.go @@ -17,6 +17,7 @@ limitations under the License. package kmsg import ( + "sync" "testing" "time" @@ -27,23 +28,44 @@ import ( "k8s.io/node-problem-detector/pkg/systemlogmonitor/logwatchers/types" logtypes "k8s.io/node-problem-detector/pkg/systemlogmonitor/types" "k8s.io/node-problem-detector/pkg/util" + "k8s.io/node-problem-detector/pkg/util/tomb" ) type mockKmsgParser struct { - kmsgs []kmsgparser.Message + kmsgs []kmsgparser.Message + closeAfterSend bool + closeCalled bool + mu sync.Mutex } func (m *mockKmsgParser) SetLogger(kmsgparser.Logger) {} -func (m *mockKmsgParser) Close() error { return nil } + +func (m *mockKmsgParser) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + m.closeCalled = true + return nil +} + +func (m *mockKmsgParser) WasCloseCalled() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.closeCalled +} + func (m *mockKmsgParser) Parse() <-chan kmsgparser.Message { c := make(chan kmsgparser.Message) go func() { for _, msg := range m.kmsgs { c <- msg } + if m.closeAfterSend { + close(c) + } }() return c } + func (m *mockKmsgParser) SeekEnd() error { return nil } func TestWatch(t *testing.T) { @@ -169,3 +191,269 @@ func TestWatch(t *testing.T) { } } } + +func TestRestartOnErrorConfig(t *testing.T) { + testCases := []struct { + name string + pluginConfig map[string]string + expected bool + }{ + { + name: "nil config returns false", + pluginConfig: nil, + expected: false, + }, + { + name: "empty config returns false", + pluginConfig: map[string]string{}, + expected: false, + }, + { + name: "key not present returns false", + pluginConfig: map[string]string{"otherKey": "true"}, + expected: false, + }, + { + name: "key present but set to false returns false", + pluginConfig: map[string]string{RestartOnErrorKey: "false"}, + expected: false, + }, + { + name: "key present and set to true returns true", + pluginConfig: map[string]string{RestartOnErrorKey: "true"}, + expected: true, + }, + { + name: "key present but uppercase TRUE returns false", + pluginConfig: map[string]string{RestartOnErrorKey: "TRUE"}, + expected: false, + }, + { + name: "key present but mixed case True returns false", + pluginConfig: map[string]string{RestartOnErrorKey: "True"}, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{ + PluginConfig: tc.pluginConfig, + }, + } + assert.Equal(t, tc.expected, w.restartOnError()) + }) + } +} + +func TestWatcherStopsOnChannelCloseWhenRestartDisabled(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "test message", Timestamp: now}, + }, + closeAfterSend: true, + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{ + PluginConfig: map[string]string{ + RestartOnErrorKey: "false", + }, + }, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Should receive the message + select { + case log := <-logCh: + assert.Equal(t, "test message", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for log message") + } + + // Log channel should be closed since restart is disabled + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close") + } + + // Verify parser was closed + assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") +} + +func TestWatcherStopsOnChannelCloseWhenRestartNotConfigured(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "test message", Timestamp: now}, + }, + closeAfterSend: true, + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{ + // No PluginConfig set + }, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Should receive the message + select { + case log := <-logCh: + assert.Equal(t, "test message", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for log message") + } + + // Log channel should be closed since restart is not configured + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close") + } + + // Verify parser was closed + assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") +} + +func TestWatcherStopsGracefullyOnTombStop(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "test message", Timestamp: now}, + }, + closeAfterSend: false, // Don't close, let tomb stop it + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{ + PluginConfig: map[string]string{ + RestartOnErrorKey: "true", + }, + }, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Should receive the message + select { + case log := <-logCh: + assert.Equal(t, "test message", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for log message") + } + + // Stop the watcher + w.Stop() + + // Log channel should be closed after stop + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed after Stop()") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close after Stop()") + } + + // Verify parser was closed + assert.True(t, mock.WasCloseCalled(), "parser Close() should have been called") +} + +func TestWatcherProcessesEmptyMessages(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: "", Timestamp: now}, + {Message: "valid message", Timestamp: now.Add(time.Second)}, + {Message: "", Timestamp: now.Add(2 * time.Second)}, + }, + closeAfterSend: true, + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + // Should only receive the non-empty message + select { + case log := <-logCh: + assert.Equal(t, "valid message", log.Message) + case <-time.After(time.Second): + t.Fatal("timeout waiting for log message") + } + + // Channel should close, no more messages + select { + case _, ok := <-logCh: + assert.False(t, ok, "log channel should be closed") + case <-time.After(time.Second): + t.Fatal("timeout waiting for log channel to close") + } +} + +func TestWatcherTrimsMessageWhitespace(t *testing.T) { + now := time.Now() + + mock := &mockKmsgParser{ + kmsgs: []kmsgparser.Message{ + {Message: " message with spaces ", Timestamp: now}, + {Message: "\ttabbed message\t", Timestamp: now.Add(time.Second)}, + {Message: "\n\nnewlines\n\n", Timestamp: now.Add(2 * time.Second)}, + }, + closeAfterSend: true, + } + + w := &kernelLogWatcher{ + cfg: types.WatcherConfig{}, + startTime: now.Add(-time.Second), + tomb: tomb.NewTomb(), + logCh: make(chan *logtypes.Log, 100), + kmsgParser: mock, + } + + logCh, err := w.Watch() + assert.NoError(t, err) + + expectedMessages := []string{"message with spaces", "tabbed message", "newlines"} + + for _, expected := range expectedMessages { + select { + case log := <-logCh: + assert.Equal(t, expected, log.Message) + case <-time.After(time.Second): + t.Fatalf("timeout waiting for message: %s", expected) + } + } +}