Skip to content

Commit

Permalink
OSquery datastream rollover check moved to plugin start
Browse files Browse the repository at this point in the history
  • Loading branch information
kevinlog committed Jan 16, 2023
1 parent 0d108bf commit 788a1c0
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
14 changes: 12 additions & 2 deletions x-pack/plugins/osquery/server/lib/fleet_integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
ElasticsearchClient,
SavedObjectReference,
SavedObjectsClient,
Logger,
} from '@kbn/core/server';
import { filter, map, orderBy } from 'lodash';
import deepequal from 'fast-deep-equal';
Expand All @@ -20,14 +21,22 @@ import { packSavedObjectType } from '../../common/types';
import { OSQUERY_INTEGRATION_NAME } from '../../common';

export const getPackagePolicyUpdateCallback =
(esClient: ElasticsearchClient) => async (updatePackagePolicy: UpdatePackagePolicy) => {
(esClient: ElasticsearchClient) => async (updatePackagePolicy: UpdatePackagePolicy, logger: Logger) => {
//logger.info('trying to update.....');
if (
updatePackagePolicy.package?.name === OSQUERY_INTEGRATION_NAME &&
satisfies(updatePackagePolicy.package?.version ?? '', '>=1.6.0')
) {
const dataStreams = await esClient.indices.getDataStream({
name: `logs-${OSQUERY_INTEGRATION_NAME}.result-*`,
})

logger.debug('data streams ' + dataStreams.data_streams);

const mapping = await esClient.indices.getMapping({
index: `logs-${OSQUERY_INTEGRATION_NAME}.result-*`,
});
//logger.info('mapping', mapping);
// Sort by index name to get the latest index
const dataStreamMapping = orderBy(Object.entries(mapping), [0], 'desc')?.[0][1]?.mappings
?.properties?.data_stream;
Expand Down Expand Up @@ -65,8 +74,9 @@ export const getPackagePolicyUpdateCallback =
};

export const getPackagePolicyDeleteCallback =
(packsClient: SavedObjectsClient): PostPackagePolicyPostDeleteCallback =>
(packsClient: SavedObjectsClient, logger: Logger): PostPackagePolicyPostDeleteCallback =>
async (deletedPackagePolicy) => {
logger.info('deleting package.....');
const deletedOsqueryManagerPolicies = filter(deletedPackagePolicy, [
'package.name',
OSQUERY_INTEGRATION_NAME,
Expand Down
54 changes: 52 additions & 2 deletions x-pack/plugins/osquery/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import type {
import { SavedObjectsClient } from '@kbn/core/server';
import type { DataRequestHandlerContext } from '@kbn/data-plugin/server';
import type { DataViewsService } from '@kbn/data-views-plugin/common';
import { orderBy } from 'lodash';
import deepequal from 'fast-deep-equal';

import type { NewPackagePolicy, UpdatePackagePolicy } from '@kbn/fleet-plugin/common';
import type { PackSavedObjectAttributes } from './common/types';
Expand Down Expand Up @@ -138,6 +140,53 @@ export class OsqueryPlugin implements Plugin<OsqueryPluginSetup, OsqueryPluginSt
await this.initialize(core, dataViewsService);
}

try {

// First get all datastreams matching the pattern.
const dataStreams = await esClient.indices.getDataStream({
name: `logs-${OSQUERY_INTEGRATION_NAME}.result-*`,
});

// Then for each of those datastreams, we need to see if they need to rollover.
await dataStreams.data_streams.forEach(async dataStream => {
const mapping = await esClient.indices.getMapping({
index: dataStream.name,
});

// TODO: This sort doesn't look like its working right now.
// Sort by index name to get the latest index
const dataStreamMapping = orderBy(Object.entries(mapping), [0], 'desc')?.[0][1]?.mappings
?.properties?.data_stream;

if (
dataStreamMapping &&
deepequal(dataStreamMapping, {
properties: {
dataset: {
type: 'constant_keyword',
value: 'generic', // We can just check for this value most likely. It will signify that an osquery beat wrote the wrong data
},
namespace: {
type: 'constant_keyword',
value: 'default', // This would need to be datastream specific to make sure it's the right namespace. I think we can drop this comparison and just check dataset above.
},
type: {
type: 'constant_keyword',
value: 'osquery',
},
},
})
)

await esClient.indices.rollover({
alias: dataStream.name,
});

});
} catch(e) {
this.logger.info(e);
}

if (registerIngestCallback) {
registerIngestCallback(
'packagePolicyCreate',
Expand All @@ -163,9 +212,10 @@ export class OsqueryPlugin implements Plugin<OsqueryPluginSetup, OsqueryPluginSt
}
);

registerIngestCallback('packagePolicyUpdate', getPackagePolicyUpdateCallback(esClient));

registerIngestCallback('packagePolicyPostDelete', getPackagePolicyDeleteCallback(client));
//registerIngestCallback('packagePolicyUpdate', getPackagePolicyUpdateCallback(esClient, this.logger));

registerIngestCallback('packagePolicyPostDelete', getPackagePolicyDeleteCallback(client, this.logger));
}
});

Expand Down

0 comments on commit 788a1c0

Please sign in to comment.